Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,12 @@ val sharedSettings = Seq(
// definitely not what we want.
"-sourcepath",
file(".").getAbsolutePath.replaceAll("[.]$", ""),
// Debug warnings
"-Wconf:any:warning-verbose"
)
) ++ {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((3, _)) => Seq.empty
case _ => Seq("-Wconf:any:warning-verbose")
}
}
)

lazy val root = project
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,26 +187,25 @@ private constructor(
// Step 2: Retry loop for SELECT FOR UPDATE + UPDATE (in single transaction)
// This matches the Scala implementation which retries on concurrent modification
while (true) {
val outcome =
database.withTransaction { connection ->
// Use locking SELECT to prevent concurrent modifications
val existing =
adapter.selectForUpdateOneRow(connection, pKind, key)
?: return@withTransaction null // Row disappeared, retry

// Check if the row is a duplicate
if (existing.data.isDuplicate(newRow)) {
return@withTransaction OfferOutcome.Ignored
}
val outcome = database.withTransaction { connection ->
// Use locking SELECT to prevent concurrent modifications
val existing =
adapter.selectForUpdateOneRow(connection, pKind, key)
?: return@withTransaction null // Row disappeared, retry

// Check if the row is a duplicate
if (existing.data.isDuplicate(newRow)) {
return@withTransaction OfferOutcome.Ignored
}

// Try to update with guarded CAS (compare-and-swap)
val updated = adapter.guardedUpdate(connection, existing.data, newRow)
if (updated) {
OfferOutcome.Updated
} else {
null // CAS failed, retry
}
// Try to update with guarded CAS (compare-and-swap)
val updated = adapter.guardedUpdate(connection, existing.data, newRow)
if (updated) {
OfferOutcome.Updated
} else {
null // CAS failed, retry
}
}

// If outcome is not null, we succeeded (either Updated or Ignored)
if (outcome != null) {
Expand Down Expand Up @@ -238,76 +237,74 @@ private constructor(

// Step 1: Try batch INSERT (optimistic)
// This matches the original Scala implementation's insertMany function
val insertOutcomes: Map<String, OfferOutcome> =
database.withTransaction { connection ->
// Find existing keys in a SINGLE query (not N queries)
val keys = messages.map { it.message.key }
val existingKeys = adapter.searchAvailableKeys(connection, pKind, keys)

// Filter to only insert non-existing keys
val rowsToInsert =
messages
.filter { !existingKeys.contains(it.message.key) }
.map { msg ->
DBTableRow(
pKey = msg.message.key,
pKind = pKind,
payload = serializer.serialize(msg.message.payload),
scheduledAt = msg.message.scheduleAt,
scheduledAtInitially = msg.message.scheduleAt,
lockUuid = null,
createdAt = now,
)
}
val insertOutcomes: Map<String, OfferOutcome> = database.withTransaction { connection ->
// Find existing keys in a SINGLE query (not N queries)
val keys = messages.map { it.message.key }
val existingKeys = adapter.searchAvailableKeys(connection, pKind, keys)

// Filter to only insert non-existing keys
val rowsToInsert =
messages
.filter { !existingKeys.contains(it.message.key) }
.map { msg ->
DBTableRow(
pKey = msg.message.key,
pKind = pKind,
payload = serializer.serialize(msg.message.payload),
scheduledAt = msg.message.scheduleAt,
scheduledAtInitially = msg.message.scheduleAt,
lockUuid = null,
createdAt = now,
)
}

// Attempt batch insert
if (rowsToInsert.isEmpty()) {
// All keys already exist
messages.associate { it.message.key to OfferOutcome.Ignored }
} else {
try {
val inserted = adapter.insertBatch(connection, rowsToInsert)
if (inserted.isNotEmpty()) {
lock.withLock { condition.signalAll() }
}
// Attempt batch insert
if (rowsToInsert.isEmpty()) {
// All keys already exist
messages.associate { it.message.key to OfferOutcome.Ignored }
} else {
try {
val inserted = adapter.insertBatch(connection, rowsToInsert)
if (inserted.isNotEmpty()) {
lock.withLock { condition.signalAll() }
}

// Build outcome map: Created for inserted, Ignored for existing
messages.associate { msg ->
if (existingKeys.contains(msg.message.key)) {
msg.message.key to OfferOutcome.Ignored
} else if (inserted.contains(msg.message.key)) {
msg.message.key to OfferOutcome.Created
} else {
// Failed to insert (shouldn't happen with no exception, but be
// safe)
msg.message.key to OfferOutcome.Ignored
}
// Build outcome map: Created for inserted, Ignored for existing
messages.associate { msg ->
if (existingKeys.contains(msg.message.key)) {
msg.message.key to OfferOutcome.Ignored
} else if (inserted.contains(msg.message.key)) {
msg.message.key to OfferOutcome.Created
} else {
// Failed to insert (shouldn't happen with no exception, but be
// safe)
msg.message.key to OfferOutcome.Ignored
}
} catch (e: Exception) {
when {
filters.duplicateKey.matches(e) -> {
logger.debug(
"Batch insert failed due to duplicate key (concurrent insert), " +
"falling back to one-by-one offers"
)
emptyMap() // Trigger fallback
}
else -> throw e // Other exceptions propagate
}
} catch (e: Exception) {
when {
filters.duplicateKey.matches(e) -> {
logger.debug(
"Batch insert failed due to duplicate key (concurrent insert), " +
"falling back to one-by-one offers"
)
emptyMap() // Trigger fallback
}
else -> throw e // Other exceptions propagate
}
}
}
}

// Step 2: Fallback to one-by-one for failures or updates
// This matches the Scala implementation's fallback logic
val needsRetry =
messages.filter { msg ->
when (insertOutcomes[msg.message.key]) {
null -> true // Error/not in map, retry
is OfferOutcome.Ignored -> msg.message.canUpdate // Needs update
else -> false // Created successfully
}
val needsRetry = messages.filter { msg ->
when (insertOutcomes[msg.message.key]) {
null -> true // Error/not in map, retry
is OfferOutcome.Ignored -> msg.message.canUpdate // Needs update
else -> false // Created successfully
}
}

val results = insertOutcomes.toMutableMap()

Expand Down Expand Up @@ -352,51 +349,50 @@ private constructor(
// Retry loop to handle failed acquires (concurrent modifications)
// This matches the original Scala implementation which retries if acquire fails
while (true) {
val result =
database.withTransaction { connection ->
val now = Instant.now(clock)
val lockUuid = UUID.randomUUID().toString()

// Select first available message (with locking if supported by DB)
val row =
adapter.selectFirstAvailableWithLock(connection, pKind, now)
?: return@withTransaction PollResult.NoMessages

// Try to acquire the row by updating it with our lock
val acquired =
adapter.acquireRowByUpdate(
conn = connection,
row = row.data,
lockUuid = lockUuid,
timeout = config.time.acquireTimeout,
now = now,
)
val result = database.withTransaction { connection ->
val now = Instant.now(clock)
val lockUuid = UUID.randomUUID().toString()

// Select first available message (with locking if supported by DB)
val row =
adapter.selectFirstAvailableWithLock(connection, pKind, now)
?: return@withTransaction PollResult.NoMessages

// Try to acquire the row by updating it with our lock
val acquired =
adapter.acquireRowByUpdate(
conn = connection,
row = row.data,
lockUuid = lockUuid,
timeout = config.time.acquireTimeout,
now = now,
)

if (!acquired) {
return@withTransaction PollResult.Retry
}

if (!acquired) {
return@withTransaction PollResult.Retry
// Successfully acquired the message
val payload = serializer.deserialize(row.data.payload)
val deliveryType =
if (row.data.scheduledAtInitially.isBefore(row.data.scheduledAt)) {
DeliveryType.REDELIVERY
} else {
DeliveryType.FIRST_DELIVERY
}

// Successfully acquired the message
val payload = serializer.deserialize(row.data.payload)
val deliveryType =
if (row.data.scheduledAtInitially.isBefore(row.data.scheduledAt)) {
DeliveryType.REDELIVERY
} else {
DeliveryType.FIRST_DELIVERY
}

val envelope =
AckEnvelope(
payload = payload,
messageId = MessageId(row.data.pKey),
timestamp = now,
source = config.ackEnvSource,
deliveryType = deliveryType,
acknowledge = acknowledgeByLockUuid(lockUuid),
)

PollResult.Success(envelope)
}
val envelope =
AckEnvelope(
payload = payload,
messageId = MessageId(row.data.pKey),
timestamp = now,
source = config.ackEnvSource,
deliveryType = deliveryType,
acknowledge = acknowledgeByLockUuid(lockUuid),
)

PollResult.Success(envelope)
}

return when (result) {
is PollResult.NoMessages -> null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public class JdbcDriver private constructor(public val className: String) {
* @return the [JdbcDriver] if found, null otherwise
*/
@JvmStatic
public operator fun invoke(className: String): JdbcDriver? =
entries.firstOrNull { it.className.equals(className, ignoreCase = true) }
public operator fun invoke(className: String): JdbcDriver? = entries.firstOrNull {
it.className.equals(className, ignoreCase = true)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,18 +159,17 @@ internal class CronServiceImpl<A>(
deleteOldCron(configHash, keyPrefix)

// Batch offer all messages
val batchedMessages =
messages.map { cronMessage ->
BatchedMessage(
input = Unit,
message =
cronMessage.toScheduled(
configHash = configHash,
keyPrefix = keyPrefix,
canUpdate = canUpdate,
),
)
}
val batchedMessages = messages.map { cronMessage ->
BatchedMessage(
input = Unit,
message =
cronMessage.toScheduled(
configHash = configHash,
keyPrefix = keyPrefix,
canUpdate = canUpdate,
),
)
}

if (batchedMessages.isNotEmpty()) {
queue.offerBatch(batchedMessages)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,11 @@ class DatabaseTests {
fun `Database withConnection executes block and closes connection`() {
var connectionClosedAfter: Boolean
var connectionRef: SafeConnection? = null
val result =
database.withConnection { safeConn ->
connectionRef = safeConn
safeConn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)")
"done"
}
val result = database.withConnection { safeConn ->
connectionRef = safeConn
safeConn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)")
"done"
}
// Connection is closed after block
connectionClosedAfter = connectionRef?.underlying?.isClosed ?: false
assertEquals("done", result)
Expand All @@ -88,13 +87,12 @@ class DatabaseTests {
database.withTransaction { safeConn ->
safeConn.execute("INSERT INTO test (name) VALUES ('foo')")
}
val count =
database.withConnection { safeConn ->
safeConn.query("SELECT COUNT(*) FROM test") { rs ->
rs.next()
rs.getInt(1)
}
val count = database.withConnection { safeConn ->
safeConn.query("SELECT COUNT(*) FROM test") { rs ->
rs.next()
rs.getInt(1)
}
}
assertEquals(1, count)
}

Expand All @@ -111,13 +109,12 @@ class DatabaseTests {
safeConn.execute("INSERT INTO test (id, name) VALUES (1, 'baz')")
}
}
val count =
database.withConnection { safeConn ->
safeConn.query("SELECT COUNT(*) FROM test") { rs ->
rs.next()
rs.getInt(1)
}
val count = database.withConnection { safeConn ->
safeConn.query("SELECT COUNT(*) FROM test") { rs ->
rs.next()
rs.getInt(1)
}
}
assertEquals(0, count)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,9 @@ object OfferOutcome {
case _: jvm.OfferOutcome.Created => OfferOutcome.Created
case _: jvm.OfferOutcome.Updated => OfferOutcome.Updated
case _: jvm.OfferOutcome.Ignored => OfferOutcome.Ignored
case _ =>
throw new IllegalArgumentException(
s"Unknown OfferOutcome: $javaOutcome"
)
}
}
Loading
Loading