From 7673571589a16ca00fb72c4b65a73f007fbf5597 Mon Sep 17 00:00:00 2001 From: "opencode-agent[bot]" Date: Sun, 3 May 2026 16:58:13 +0000 Subject: [PATCH] Deps upgraded: ktfmt, 5 sbt plugins, sbt Co-authored-by: alexandru --- build.sbt | 9 +- .../delayedqueue/jvm/DelayedQueueJDBC.kt | 234 +++++++++--------- .../org/funfix/delayedqueue/jvm/JdbcDriver.kt | 5 +- .../jvm/internals/CronServiceImpl.kt | 23 +- .../jvm/internals/utils/DatabaseTests.kt | 33 ++- .../delayedqueue/scala/OfferOutcome.scala | 4 + gradle/libs.versions.toml | 2 +- project/build.properties | 2 +- project/plugins.sbt | 8 +- 9 files changed, 160 insertions(+), 160 deletions(-) diff --git a/build.sbt b/build.sbt index dc7652c..7158fdf 100644 --- a/build.sbt +++ b/build.sbt @@ -120,9 +120,12 @@ val sharedSettings = Seq( // definitely not what we want. "-sourcepath", file(".").getAbsolutePath.replaceAll("[.]$", ""), - // Debug warnings - "-Wconf:any:warning-verbose" - ) + ) ++ { + CrossVersion.partialVersion(scalaVersion.value) match { + case Some((3, _)) => Seq.empty + case _ => Seq("-Wconf:any:warning-verbose") + } + } ) lazy val root = project diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueueJDBC.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueueJDBC.kt index e366b98..5cab897 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueueJDBC.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueueJDBC.kt @@ -187,26 +187,25 @@ private constructor( // Step 2: Retry loop for SELECT FOR UPDATE + UPDATE (in single transaction) // This matches the Scala implementation which retries on concurrent modification while (true) { - val outcome = - database.withTransaction { connection -> - // Use locking SELECT to prevent concurrent modifications - val existing = - adapter.selectForUpdateOneRow(connection, pKind, key) - ?: return@withTransaction null // Row disappeared, retry - - // Check if the row is a duplicate - if (existing.data.isDuplicate(newRow)) { - return@withTransaction OfferOutcome.Ignored - } + val outcome = database.withTransaction { connection -> + // Use locking SELECT to prevent concurrent modifications + val existing = + adapter.selectForUpdateOneRow(connection, pKind, key) + ?: return@withTransaction null // Row disappeared, retry + + // Check if the row is a duplicate + if (existing.data.isDuplicate(newRow)) { + return@withTransaction OfferOutcome.Ignored + } - // Try to update with guarded CAS (compare-and-swap) - val updated = adapter.guardedUpdate(connection, existing.data, newRow) - if (updated) { - OfferOutcome.Updated - } else { - null // CAS failed, retry - } + // Try to update with guarded CAS (compare-and-swap) + val updated = adapter.guardedUpdate(connection, existing.data, newRow) + if (updated) { + OfferOutcome.Updated + } else { + null // CAS failed, retry } + } // If outcome is not null, we succeeded (either Updated or Ignored) if (outcome != null) { @@ -238,76 +237,74 @@ private constructor( // Step 1: Try batch INSERT (optimistic) // This matches the original Scala implementation's insertMany function - val insertOutcomes: Map = - database.withTransaction { connection -> - // Find existing keys in a SINGLE query (not N queries) - val keys = messages.map { it.message.key } - val existingKeys = adapter.searchAvailableKeys(connection, pKind, keys) - - // Filter to only insert non-existing keys - val rowsToInsert = - messages - .filter { !existingKeys.contains(it.message.key) } - .map { msg -> - DBTableRow( - pKey = msg.message.key, - pKind = pKind, - payload = serializer.serialize(msg.message.payload), - scheduledAt = msg.message.scheduleAt, - scheduledAtInitially = msg.message.scheduleAt, - lockUuid = null, - createdAt = now, - ) - } + val insertOutcomes: Map = database.withTransaction { connection -> + // Find existing keys in a SINGLE query (not N queries) + val keys = messages.map { it.message.key } + val existingKeys = adapter.searchAvailableKeys(connection, pKind, keys) + + // Filter to only insert non-existing keys + val rowsToInsert = + messages + .filter { !existingKeys.contains(it.message.key) } + .map { msg -> + DBTableRow( + pKey = msg.message.key, + pKind = pKind, + payload = serializer.serialize(msg.message.payload), + scheduledAt = msg.message.scheduleAt, + scheduledAtInitially = msg.message.scheduleAt, + lockUuid = null, + createdAt = now, + ) + } - // Attempt batch insert - if (rowsToInsert.isEmpty()) { - // All keys already exist - messages.associate { it.message.key to OfferOutcome.Ignored } - } else { - try { - val inserted = adapter.insertBatch(connection, rowsToInsert) - if (inserted.isNotEmpty()) { - lock.withLock { condition.signalAll() } - } + // Attempt batch insert + if (rowsToInsert.isEmpty()) { + // All keys already exist + messages.associate { it.message.key to OfferOutcome.Ignored } + } else { + try { + val inserted = adapter.insertBatch(connection, rowsToInsert) + if (inserted.isNotEmpty()) { + lock.withLock { condition.signalAll() } + } - // Build outcome map: Created for inserted, Ignored for existing - messages.associate { msg -> - if (existingKeys.contains(msg.message.key)) { - msg.message.key to OfferOutcome.Ignored - } else if (inserted.contains(msg.message.key)) { - msg.message.key to OfferOutcome.Created - } else { - // Failed to insert (shouldn't happen with no exception, but be - // safe) - msg.message.key to OfferOutcome.Ignored - } + // Build outcome map: Created for inserted, Ignored for existing + messages.associate { msg -> + if (existingKeys.contains(msg.message.key)) { + msg.message.key to OfferOutcome.Ignored + } else if (inserted.contains(msg.message.key)) { + msg.message.key to OfferOutcome.Created + } else { + // Failed to insert (shouldn't happen with no exception, but be + // safe) + msg.message.key to OfferOutcome.Ignored } - } catch (e: Exception) { - when { - filters.duplicateKey.matches(e) -> { - logger.debug( - "Batch insert failed due to duplicate key (concurrent insert), " + - "falling back to one-by-one offers" - ) - emptyMap() // Trigger fallback - } - else -> throw e // Other exceptions propagate + } + } catch (e: Exception) { + when { + filters.duplicateKey.matches(e) -> { + logger.debug( + "Batch insert failed due to duplicate key (concurrent insert), " + + "falling back to one-by-one offers" + ) + emptyMap() // Trigger fallback } + else -> throw e // Other exceptions propagate } } } + } // Step 2: Fallback to one-by-one for failures or updates // This matches the Scala implementation's fallback logic - val needsRetry = - messages.filter { msg -> - when (insertOutcomes[msg.message.key]) { - null -> true // Error/not in map, retry - is OfferOutcome.Ignored -> msg.message.canUpdate // Needs update - else -> false // Created successfully - } + val needsRetry = messages.filter { msg -> + when (insertOutcomes[msg.message.key]) { + null -> true // Error/not in map, retry + is OfferOutcome.Ignored -> msg.message.canUpdate // Needs update + else -> false // Created successfully } + } val results = insertOutcomes.toMutableMap() @@ -352,51 +349,50 @@ private constructor( // Retry loop to handle failed acquires (concurrent modifications) // This matches the original Scala implementation which retries if acquire fails while (true) { - val result = - database.withTransaction { connection -> - val now = Instant.now(clock) - val lockUuid = UUID.randomUUID().toString() - - // Select first available message (with locking if supported by DB) - val row = - adapter.selectFirstAvailableWithLock(connection, pKind, now) - ?: return@withTransaction PollResult.NoMessages - - // Try to acquire the row by updating it with our lock - val acquired = - adapter.acquireRowByUpdate( - conn = connection, - row = row.data, - lockUuid = lockUuid, - timeout = config.time.acquireTimeout, - now = now, - ) + val result = database.withTransaction { connection -> + val now = Instant.now(clock) + val lockUuid = UUID.randomUUID().toString() + + // Select first available message (with locking if supported by DB) + val row = + adapter.selectFirstAvailableWithLock(connection, pKind, now) + ?: return@withTransaction PollResult.NoMessages + + // Try to acquire the row by updating it with our lock + val acquired = + adapter.acquireRowByUpdate( + conn = connection, + row = row.data, + lockUuid = lockUuid, + timeout = config.time.acquireTimeout, + now = now, + ) + + if (!acquired) { + return@withTransaction PollResult.Retry + } - if (!acquired) { - return@withTransaction PollResult.Retry + // Successfully acquired the message + val payload = serializer.deserialize(row.data.payload) + val deliveryType = + if (row.data.scheduledAtInitially.isBefore(row.data.scheduledAt)) { + DeliveryType.REDELIVERY + } else { + DeliveryType.FIRST_DELIVERY } - // Successfully acquired the message - val payload = serializer.deserialize(row.data.payload) - val deliveryType = - if (row.data.scheduledAtInitially.isBefore(row.data.scheduledAt)) { - DeliveryType.REDELIVERY - } else { - DeliveryType.FIRST_DELIVERY - } - - val envelope = - AckEnvelope( - payload = payload, - messageId = MessageId(row.data.pKey), - timestamp = now, - source = config.ackEnvSource, - deliveryType = deliveryType, - acknowledge = acknowledgeByLockUuid(lockUuid), - ) - - PollResult.Success(envelope) - } + val envelope = + AckEnvelope( + payload = payload, + messageId = MessageId(row.data.pKey), + timestamp = now, + source = config.ackEnvSource, + deliveryType = deliveryType, + acknowledge = acknowledgeByLockUuid(lockUuid), + ) + + PollResult.Success(envelope) + } return when (result) { is PollResult.NoMessages -> null diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/JdbcDriver.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/JdbcDriver.kt index 116af9a..4444db6 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/JdbcDriver.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/JdbcDriver.kt @@ -48,7 +48,8 @@ public class JdbcDriver private constructor(public val className: String) { * @return the [JdbcDriver] if found, null otherwise */ @JvmStatic - public operator fun invoke(className: String): JdbcDriver? = - entries.firstOrNull { it.className.equals(className, ignoreCase = true) } + public operator fun invoke(className: String): JdbcDriver? = entries.firstOrNull { + it.className.equals(className, ignoreCase = true) + } } } diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/CronServiceImpl.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/CronServiceImpl.kt index f066438..9eda715 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/CronServiceImpl.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/CronServiceImpl.kt @@ -159,18 +159,17 @@ internal class CronServiceImpl( deleteOldCron(configHash, keyPrefix) // Batch offer all messages - val batchedMessages = - messages.map { cronMessage -> - BatchedMessage( - input = Unit, - message = - cronMessage.toScheduled( - configHash = configHash, - keyPrefix = keyPrefix, - canUpdate = canUpdate, - ), - ) - } + val batchedMessages = messages.map { cronMessage -> + BatchedMessage( + input = Unit, + message = + cronMessage.toScheduled( + configHash = configHash, + keyPrefix = keyPrefix, + canUpdate = canUpdate, + ), + ) + } if (batchedMessages.isNotEmpty()) { queue.offerBatch(batchedMessages) diff --git a/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/utils/DatabaseTests.kt b/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/utils/DatabaseTests.kt index 4055500..3454c51 100644 --- a/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/utils/DatabaseTests.kt +++ b/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/utils/DatabaseTests.kt @@ -68,12 +68,11 @@ class DatabaseTests { fun `Database withConnection executes block and closes connection`() { var connectionClosedAfter: Boolean var connectionRef: SafeConnection? = null - val result = - database.withConnection { safeConn -> - connectionRef = safeConn - safeConn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)") - "done" - } + val result = database.withConnection { safeConn -> + connectionRef = safeConn + safeConn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)") + "done" + } // Connection is closed after block connectionClosedAfter = connectionRef?.underlying?.isClosed ?: false assertEquals("done", result) @@ -88,13 +87,12 @@ class DatabaseTests { database.withTransaction { safeConn -> safeConn.execute("INSERT INTO test (name) VALUES ('foo')") } - val count = - database.withConnection { safeConn -> - safeConn.query("SELECT COUNT(*) FROM test") { rs -> - rs.next() - rs.getInt(1) - } + val count = database.withConnection { safeConn -> + safeConn.query("SELECT COUNT(*) FROM test") { rs -> + rs.next() + rs.getInt(1) } + } assertEquals(1, count) } @@ -111,13 +109,12 @@ class DatabaseTests { safeConn.execute("INSERT INTO test (id, name) VALUES (1, 'baz')") } } - val count = - database.withConnection { safeConn -> - safeConn.query("SELECT COUNT(*) FROM test") { rs -> - rs.next() - rs.getInt(1) - } + val count = database.withConnection { safeConn -> + safeConn.query("SELECT COUNT(*) FROM test") { rs -> + rs.next() + rs.getInt(1) } + } assertEquals(0, count) } diff --git a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/OfferOutcome.scala b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/OfferOutcome.scala index b87c6e8..a6cecdd 100644 --- a/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/OfferOutcome.scala +++ b/delayedqueue-scala/src/main/scala/org/funfix/delayedqueue/scala/OfferOutcome.scala @@ -57,5 +57,9 @@ object OfferOutcome { case _: jvm.OfferOutcome.Created => OfferOutcome.Created case _: jvm.OfferOutcome.Updated => OfferOutcome.Updated case _: jvm.OfferOutcome.Ignored => OfferOutcome.Ignored + case _ => + throw new IllegalArgumentException( + s"Unknown OfferOutcome: $javaOutcome" + ) } } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 8e2097a..7092d18 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -5,7 +5,7 @@ kotlin = "2.3.21" kover = "0.9.8" publish-plugin = "0.36.0" versions-plugin = "0.53.0" -ktfmt-plugin = "0.25.0" +ktfmt-plugin = "0.26.0" [libraries] # Plugins specified in buildSrc/build.gradle.kts diff --git a/project/build.properties b/project/build.properties index 4d6c567..dabdb15 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.12.2 +sbt.version=1.12.11 diff --git a/project/plugins.sbt b/project/plugins.sbt index 973eaf5..3160ed4 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,8 +1,8 @@ addSbtPlugin("com.github.sbt" % "sbt-pgp" % "2.3.1") -addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.6") -addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.2") -addSbtPlugin("org.wartremover" % "sbt-wartremover" % "3.5.5") +addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.6.0") +addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.3") +addSbtPlugin("org.wartremover" % "sbt-wartremover" % "3.5.7") addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.12.2") // https://github.com/typelevel/sbt-tpolecat/issues/291 -libraryDependencies += "org.typelevel" %% "scalac-options" % "0.1.9" +libraryDependencies += "org.typelevel" %% "scalac-options" % "0.1.10"