diff --git a/delayedqueue-jvm/api/delayedqueue-jvm.api b/delayedqueue-jvm/api/delayedqueue-jvm.api index 05124f4..5045874 100644 --- a/delayedqueue-jvm/api/delayedqueue-jvm.api +++ b/delayedqueue-jvm/api/delayedqueue-jvm.api @@ -335,6 +335,7 @@ public final class org/funfix/delayedqueue/jvm/JdbcDatabasePoolConfig : java/lan public final class org/funfix/delayedqueue/jvm/JdbcDriver : java/lang/Enum { public static final field Companion Lorg/funfix/delayedqueue/jvm/JdbcDriver$Companion; public static final field HSQLDB Lorg/funfix/delayedqueue/jvm/JdbcDriver; + public static final field MariaDB Lorg/funfix/delayedqueue/jvm/JdbcDriver; public static final field MsSqlServer Lorg/funfix/delayedqueue/jvm/JdbcDriver; public static final field Sqlite Lorg/funfix/delayedqueue/jvm/JdbcDriver; public final fun getClassName ()Ljava/lang/String; diff --git a/delayedqueue-jvm/build.gradle.kts b/delayedqueue-jvm/build.gradle.kts index 8e0006c..7fc8f2d 100644 --- a/delayedqueue-jvm/build.gradle.kts +++ b/delayedqueue-jvm/build.gradle.kts @@ -21,11 +21,13 @@ dependencies { testImplementation(libs.jdbc.hsqldb) testImplementation(libs.jdbc.sqlite) testImplementation(libs.jdbc.mssql) + testImplementation(libs.jdbc.mariadb) testImplementation(platform(libs.junit.bom)) testImplementation(libs.junit.jupiter) testImplementation(platform(libs.testcontainers.bom)) testImplementation(libs.testcontainers.junit.jupiter) testImplementation(libs.testcontainers.mssqlserver) + testImplementation(libs.testcontainers.mariadb) testRuntimeOnly(libs.junit.platform.launcher) } diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueueJDBC.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueueJDBC.kt index 8f922b8..663d76b 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueueJDBC.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueueJDBC.kt @@ -13,13 +13,14 @@ import org.funfix.delayedqueue.jvm.internals.CronServiceImpl import org.funfix.delayedqueue.jvm.internals.PollResult import org.funfix.delayedqueue.jvm.internals.jdbc.DBTableRow import org.funfix.delayedqueue.jvm.internals.jdbc.DBTableRowWithId -import org.funfix.delayedqueue.jvm.internals.jdbc.HSQLDBMigrations import org.funfix.delayedqueue.jvm.internals.jdbc.MigrationRunner -import org.funfix.delayedqueue.jvm.internals.jdbc.MsSqlServerMigrations import org.funfix.delayedqueue.jvm.internals.jdbc.RdbmsExceptionFilters import org.funfix.delayedqueue.jvm.internals.jdbc.SQLVendorAdapter -import org.funfix.delayedqueue.jvm.internals.jdbc.SqliteMigrations import org.funfix.delayedqueue.jvm.internals.jdbc.filtersForDriver +import org.funfix.delayedqueue.jvm.internals.jdbc.hsqldb.HSQLDBMigrations +import org.funfix.delayedqueue.jvm.internals.jdbc.mariadb.MariaDBMigrations +import org.funfix.delayedqueue.jvm.internals.jdbc.mssql.MsSqlServerMigrations +import org.funfix.delayedqueue.jvm.internals.jdbc.sqlite.SqliteMigrations import org.funfix.delayedqueue.jvm.internals.jdbc.withDbRetries import org.funfix.delayedqueue.jvm.internals.utils.Database import org.funfix.delayedqueue.jvm.internals.utils.Raise @@ -267,7 +268,8 @@ private constructor( } } } catch (e: Exception) { - // On duplicate key, return empty map to trigger one-by-one fallback + // On duplicate key or transient failure (e.g., concurrent modification), + // return empty map to trigger one-by-one fallback. // This matches: recover { case SQLExceptionExtractors.DuplicateKey(_) => // Map.empty } when { @@ -278,6 +280,13 @@ private constructor( ) emptyMap() // Trigger fallback } + filters.transientFailure.matches(e) -> { + logger.debug( + "Batch insert failed due to transient failure (concurrent modification), " + + "falling back to one-by-one offers" + ) + emptyMap() // Trigger fallback + } else -> throw e // Other exceptions propagate } } @@ -613,6 +622,7 @@ private constructor( JdbcDriver.Sqlite -> SqliteMigrations.getMigrations(config.tableName) JdbcDriver.MsSqlServer -> MsSqlServerMigrations.getMigrations(config.tableName) + JdbcDriver.MariaDB -> MariaDBMigrations.getMigrations(config.tableName) } val executed = MigrationRunner.runMigrations(connection.underlying, migrations) diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/JdbcDriver.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/JdbcDriver.kt index 4abc77a..e966734 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/JdbcDriver.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/JdbcDriver.kt @@ -9,7 +9,10 @@ public enum class JdbcDriver(public val className: String) { MsSqlServer("com.microsoft.sqlserver.jdbc.SQLServerDriver"), /** SQLite driver. */ - Sqlite("org.sqlite.JDBC"); + Sqlite("org.sqlite.JDBC"), + + /** MariaDB driver. */ + MariaDB("org.mariadb.jdbc.Driver"); public companion object { /** diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/SQLVendorAdapter.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/SQLVendorAdapter.kt index b3a6421..cf99560 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/SQLVendorAdapter.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/SQLVendorAdapter.kt @@ -6,6 +6,10 @@ import java.sql.ResultSet import java.time.Duration import java.time.Instant import org.funfix.delayedqueue.jvm.JdbcDriver +import org.funfix.delayedqueue.jvm.internals.jdbc.hsqldb.HSQLDBAdapter +import org.funfix.delayedqueue.jvm.internals.jdbc.mariadb.MariaDBAdapter +import org.funfix.delayedqueue.jvm.internals.jdbc.mssql.MsSqlServerAdapter +import org.funfix.delayedqueue.jvm.internals.jdbc.sqlite.SqliteAdapter /** * Describes actual SQL queries executed — can be overridden to provide driver-specific queries. @@ -16,7 +20,7 @@ import org.funfix.delayedqueue.jvm.JdbcDriver * @property driver the JDBC driver this adapter is for * @property tableName the name of the delayed queue table */ -internal sealed class SQLVendorAdapter(val driver: JdbcDriver, protected val tableName: String) { +internal abstract class SQLVendorAdapter(val driver: JdbcDriver, protected val tableName: String) { /** Checks if a key exists in the database. */ fun checkIfKeyExists(connection: Connection, key: String, kind: String): Boolean { val sql = "SELECT 1 FROM $tableName WHERE pKey = ? AND pKind = ?" @@ -363,392 +367,13 @@ internal sealed class SQLVendorAdapter(val driver: JdbcDriver, protected val tab JdbcDriver.HSQLDB -> HSQLDBAdapter(driver, tableName) JdbcDriver.Sqlite -> SqliteAdapter(driver, tableName) JdbcDriver.MsSqlServer -> MsSqlServerAdapter(driver, tableName) + JdbcDriver.MariaDB -> MariaDBAdapter(driver, tableName) } } } -/** HSQLDB-specific adapter. */ -private class HSQLDBAdapter(driver: JdbcDriver, tableName: String) : - SQLVendorAdapter(driver, tableName) { - - override fun selectForUpdateOneRow( - connection: Connection, - kind: String, - key: String, - ): DBTableRowWithId? { - // HSQLDB has limited row-level locking support, so we fall back to plain SELECT. - // This matches the original Scala implementation's behavior for HSQLDB. - return selectByKey(connection, kind, key) - } - - override fun insertOneRow(connection: Connection, row: DBTableRow): Boolean { - val sql = - """ - INSERT INTO $tableName - (pKey, pKind, payload, scheduledAt, scheduledAtInitially, createdAt) - VALUES (?, ?, ?, ?, ?, ?) - """ - - return try { - connection.prepareStatement(sql).use { stmt -> - stmt.setString(1, row.pKey) - stmt.setString(2, row.pKind) - stmt.setBytes(3, row.payload) - stmt.setEpochMillis(4, row.scheduledAt) - stmt.setEpochMillis(5, row.scheduledAtInitially) - stmt.setEpochMillis(6, row.createdAt) - stmt.executeUpdate() > 0 - } - } catch (e: Exception) { - // If it's a duplicate key violation, return false (key already exists) - // This matches the original Scala implementation's behavior: - // insertIntoTable(...).recover { case SQLExceptionExtractors.DuplicateKey(_) => false } - if (HSQLDBFilters.duplicateKey.matches(e)) { - false - } else { - throw e - } - } - } - - override fun selectFirstAvailableWithLock( - connection: Connection, - kind: String, - now: Instant, - ): DBTableRowWithId? { - val sql = - """ - SELECT TOP 1 - id, pKey, pKind, payload, scheduledAt, scheduledAtInitially, lockUuid, createdAt - FROM $tableName - WHERE pKind = ? AND scheduledAt <= ? - ORDER BY scheduledAt - """ - - return connection.prepareStatement(sql).use { stmt -> - stmt.setString(1, kind) - stmt.setEpochMillis(2, now) - stmt.executeQuery().use { rs -> - if (rs.next()) { - rs.toDBTableRowWithId() - } else { - null - } - } - } - } - - override fun acquireManyOptimistically( - connection: Connection, - kind: String, - limit: Int, - lockUuid: String, - timeout: Duration, - now: Instant, - ): Int { - require(limit > 0) { "Limit must be > 0" } - val expireAt = now.plus(timeout) - - val sql = - """ - UPDATE $tableName - SET lockUuid = ?, - scheduledAt = ? - WHERE id IN ( - SELECT id - FROM $tableName - WHERE pKind = ? AND scheduledAt <= ? - ORDER BY scheduledAt - LIMIT $limit - ) - """ - - return connection.prepareStatement(sql).use { stmt -> - stmt.setString(1, lockUuid) - stmt.setEpochMillis(2, expireAt) - stmt.setString(3, kind) - stmt.setEpochMillis(4, now) - stmt.executeUpdate() - } - } -} - -/** MS-SQL-specific adapter. */ -private class MsSqlServerAdapter(driver: JdbcDriver, tableName: String) : - SQLVendorAdapter(driver, tableName) { - - override fun insertOneRow(connection: Connection, row: DBTableRow): Boolean { - val sql = - """ - IF NOT EXISTS ( - SELECT 1 FROM $tableName WHERE pKey = ? AND pKind = ? - ) - BEGIN - INSERT INTO $tableName - (pKey, pKind, payload, scheduledAt, scheduledAtInitially, createdAt) - VALUES (?, ?, ?, ?, ?, ?) - END - """ - - return connection.prepareStatement(sql).use { stmt -> - stmt.setString(1, row.pKey) - stmt.setString(2, row.pKind) - stmt.setString(3, row.pKey) - stmt.setString(4, row.pKind) - stmt.setBytes(5, row.payload) - stmt.setEpochMillis(6, row.scheduledAt) - stmt.setEpochMillis(7, row.scheduledAtInitially) - stmt.setEpochMillis(8, row.createdAt) - stmt.executeUpdate() > 0 - } - } - - override fun selectForUpdateOneRow( - connection: Connection, - kind: String, - key: String, - ): DBTableRowWithId? { - val sql = - """ - SELECT TOP 1 - id, pKey, pKind, payload, scheduledAt, scheduledAtInitially, lockUuid, createdAt - FROM $tableName - WITH (UPDLOCK) - WHERE pKey = ? AND pKind = ? - """ - - return connection.prepareStatement(sql).use { stmt -> - stmt.setString(1, key) - stmt.setString(2, kind) - stmt.executeQuery().use { rs -> - if (rs.next()) { - rs.toDBTableRowWithId() - } else { - null - } - } - } - } - - override fun selectFirstAvailableWithLock( - connection: Connection, - kind: String, - now: Instant, - ): DBTableRowWithId? { - val sql = - """ - SELECT TOP 1 - id, pKey, pKind, payload, scheduledAt, scheduledAtInitially, lockUuid, createdAt - FROM $tableName - WITH (UPDLOCK, READPAST) - WHERE pKind = ? AND scheduledAt <= ? - ORDER BY scheduledAt - """ - - return connection.prepareStatement(sql).use { stmt -> - stmt.setString(1, kind) - stmt.setEpochMillis(2, now) - stmt.executeQuery().use { rs -> - if (rs.next()) { - rs.toDBTableRowWithId() - } else { - null - } - } - } - } - - override fun acquireManyOptimistically( - connection: Connection, - kind: String, - limit: Int, - lockUuid: String, - timeout: Duration, - now: Instant, - ): Int { - require(limit > 0) { "Limit must be > 0" } - val expireAt = now.plus(timeout) - - val sql = - """ - UPDATE $tableName - SET lockUuid = ?, - scheduledAt = ? - WHERE id IN ( - SELECT TOP $limit id - FROM $tableName - WITH (UPDLOCK, READPAST) - WHERE pKind = ? AND scheduledAt <= ? - ORDER BY scheduledAt - ) - """ - - return connection.prepareStatement(sql).use { stmt -> - stmt.setString(1, lockUuid) - stmt.setEpochMillis(2, expireAt) - stmt.setString(3, kind) - stmt.setEpochMillis(4, now) - stmt.executeUpdate() - } - } - - override fun selectByKey(connection: Connection, kind: String, key: String): DBTableRowWithId? { - val sql = - """ - SELECT TOP 1 - id, pKey, pKind, payload, scheduledAt, scheduledAtInitially, lockUuid, createdAt - FROM $tableName - WHERE pKey = ? AND pKind = ? - """ - - return connection.prepareStatement(sql).use { stmt -> - stmt.setString(1, key) - stmt.setString(2, kind) - stmt.executeQuery().use { rs -> - if (rs.next()) { - rs.toDBTableRowWithId() - } else { - null - } - } - } - } - - override fun selectAllAvailableWithLock( - connection: Connection, - lockUuid: String, - count: Int, - offsetId: Long?, - ): List { - val offsetClause = offsetId?.let { "AND id > ?" } ?: "" - val sql = - """ - SELECT TOP $count - id, pKey, pKind, payload, scheduledAt, scheduledAtInitially, lockUuid, createdAt - FROM $tableName - WHERE lockUuid = ? $offsetClause - ORDER BY id - """ - - return connection.prepareStatement(sql).use { stmt -> - stmt.setString(1, lockUuid) - offsetId?.let { stmt.setLong(2, it) } - stmt.executeQuery().use { rs -> - val results = mutableListOf() - while (rs.next()) { - results.add(rs.toDBTableRowWithId()) - } - results - } - } - } -} - -/** - * SQLite-specific adapter. - * - * SQLite has limited concurrency support — it uses database-level locking (WAL mode helps for - * concurrent reads). There is no SELECT FOR UPDATE or row-level locking. Instead, we rely on: - * - `INSERT OR IGNORE` for conditional inserts (best performance idiom for SQLite) - * - `LIMIT` syntax (same as HSQLDB) - * - Optimistic locking via compare-and-swap UPDATE patterns - */ -private class SqliteAdapter(driver: JdbcDriver, tableName: String) : - SQLVendorAdapter(driver, tableName) { - - override fun selectForUpdateOneRow( - connection: Connection, - kind: String, - key: String, - ): DBTableRowWithId? { - // SQLite has no row-level locking; fall back to plain SELECT like HSQLDB. - return selectByKey(connection, kind, key) - } - - override fun insertOneRow(connection: Connection, row: DBTableRow): Boolean { - // INSERT OR IGNORE is the idiomatic SQLite way to skip duplicate key inserts. - val sql = - """ - INSERT OR IGNORE INTO $tableName - (pKey, pKind, payload, scheduledAt, scheduledAtInitially, createdAt) - VALUES (?, ?, ?, ?, ?, ?) - """ - - return connection.prepareStatement(sql).use { stmt -> - stmt.setString(1, row.pKey) - stmt.setString(2, row.pKind) - stmt.setBytes(3, row.payload) - stmt.setEpochMillis(4, row.scheduledAt) - stmt.setEpochMillis(5, row.scheduledAtInitially) - stmt.setEpochMillis(6, row.createdAt) - stmt.executeUpdate() > 0 - } - } - - override fun selectFirstAvailableWithLock( - connection: Connection, - kind: String, - now: Instant, - ): DBTableRowWithId? { - val sql = - """ - SELECT id, pKey, pKind, payload, scheduledAt, scheduledAtInitially, lockUuid, createdAt - FROM $tableName - WHERE pKind = ? AND scheduledAt <= ? - ORDER BY scheduledAt - LIMIT 1 - """ - - return connection.prepareStatement(sql).use { stmt -> - stmt.setString(1, kind) - stmt.setEpochMillis(2, now) - stmt.executeQuery().use { rs -> - if (rs.next()) { - rs.toDBTableRowWithId() - } else { - null - } - } - } - } - - override fun acquireManyOptimistically( - connection: Connection, - kind: String, - limit: Int, - lockUuid: String, - timeout: Duration, - now: Instant, - ): Int { - require(limit > 0) { "Limit must be > 0" } - val expireAt = now.plus(timeout) - - val sql = - """ - UPDATE $tableName - SET lockUuid = ?, - scheduledAt = ? - WHERE id IN ( - SELECT id - FROM $tableName - WHERE pKind = ? AND scheduledAt <= ? - ORDER BY scheduledAt - LIMIT $limit - ) - """ - - return connection.prepareStatement(sql).use { stmt -> - stmt.setString(1, lockUuid) - stmt.setEpochMillis(2, expireAt) - stmt.setString(3, kind) - stmt.setEpochMillis(4, now) - stmt.executeUpdate() - } - } -} - /** Extension function to convert ResultSet to DBTableRowWithId. */ -private fun ResultSet.toDBTableRowWithId(): DBTableRowWithId = +internal fun ResultSet.toDBTableRowWithId(): DBTableRowWithId = DBTableRowWithId( id = getLong("id"), data = diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/SqlExceptionFilters.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/SqlExceptionFilters.kt index 0038479..50b58a0 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/SqlExceptionFilters.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/SqlExceptionFilters.kt @@ -1,10 +1,13 @@ package org.funfix.delayedqueue.jvm.internals.jdbc -import java.sql.SQLException import java.sql.SQLIntegrityConstraintViolationException import java.sql.SQLTransactionRollbackException import java.sql.SQLTransientConnectionException import org.funfix.delayedqueue.jvm.JdbcDriver +import org.funfix.delayedqueue.jvm.internals.jdbc.hsqldb.HSQLDBFilters +import org.funfix.delayedqueue.jvm.internals.jdbc.mariadb.MariaDBFilters +import org.funfix.delayedqueue.jvm.internals.jdbc.mssql.MSSQLFilters +import org.funfix.delayedqueue.jvm.internals.jdbc.sqlite.SQLiteFilters /** * Filter for matching SQL exceptions based on specific criteria. Designed for extensibility across @@ -56,265 +59,12 @@ internal interface RdbmsExceptionFilters { val objectAlreadyExists: SqlExceptionFilter } -/** HSQLDB-specific exception filters. */ -internal object HSQLDBFilters : RdbmsExceptionFilters { - override val transientFailure: SqlExceptionFilter = CommonSqlFilters.transactionTransient - - override val duplicateKey: SqlExceptionFilter = - object : SqlExceptionFilter { - override fun matches(e: Throwable): Boolean = - when { - CommonSqlFilters.integrityConstraint.matches(e) -> true - e is SQLException && e.errorCode == -104 && e.sqlState == "23505" -> true - e is SQLException && matchesMessage(e.message, DUPLICATE_KEY_KEYWORDS) -> true - else -> false - } - } - - override val invalidTable: SqlExceptionFilter = - object : SqlExceptionFilter { - override fun matches(e: Throwable): Boolean = - e is SQLException && matchesMessage(e.message, listOf("invalid object name")) - } - - override val objectAlreadyExists: SqlExceptionFilter = - object : SqlExceptionFilter { - override fun matches(e: Throwable): Boolean = false - } - - private val DUPLICATE_KEY_KEYWORDS = - listOf("primary key constraint", "unique constraint", "integrity constraint") -} - -/** SQLite-specific exception filters. */ -internal object SQLiteFilters : RdbmsExceptionFilters { - override val transientFailure: SqlExceptionFilter = - object : SqlExceptionFilter { - override fun matches(e: Throwable): Boolean = - when { - CommonSqlFilters.transactionTransient.matches(e) -> true - // SQLite BUSY (error code 5) — database is locked by another connection - e is SQLException && isSQLiteBaseCode(e, 5) -> true - // SQLite LOCKED (error code 6) — table-level lock within a connection - e is SQLException && isSQLiteBaseCode(e, 6) -> true - else -> false - } - } - - override val duplicateKey: SqlExceptionFilter = - object : SqlExceptionFilter { - override fun matches(e: Throwable): Boolean = - when { - CommonSqlFilters.integrityConstraint.matches(e) -> true - // SQLite CONSTRAINT_PRIMARYKEY (2067) and CONSTRAINT_UNIQUE (2579) - e is SQLException && isSQLiteResultCode(e, 2067, 2579) -> true - // SQLite generic CONSTRAINT (19) - e is SQLException && - isSQLiteResultCode(e, 19) && - matchesMessage(e.message, DUPLICATE_KEY_KEYWORDS) -> true - e is SQLException && - isSQLiteException(e) && - matchesMessage(e.message, DUPLICATE_KEY_KEYWORDS) -> true - else -> false - } - } - - override val invalidTable: SqlExceptionFilter = - object : SqlExceptionFilter { - override fun matches(e: Throwable): Boolean = - e is SQLException && matchesMessage(e.message, listOf("no such table")) - } - - override val objectAlreadyExists: SqlExceptionFilter = - object : SqlExceptionFilter { - override fun matches(e: Throwable): Boolean = - e is SQLException && matchesMessage(e.message, listOf("already exists")) - } - - private val DUPLICATE_KEY_KEYWORDS = - listOf("primary key constraint", "unique constraint", "unique") -} - -/** - * Attempts to detect SQLite-specific result codes in exceptions in a defensive way. - * - * Strategy: - * 1. Walk the cause chain searching for a throwable whose class name contains "SQLiteException"; - * 2. Try JDBC-standard accessors first: if the throwable is a SQLException, use getErrorCode(); - * 3. As a last resort, attempt reflection on well-known driver methods (getResultCode -> code) but - * be defensive and return false on any reflection failure. - */ -private fun isSQLiteResultCode(e: Throwable, vararg codes: Int): Boolean { - val code = sqliteResultCode(e) ?: return false - return code in codes -} - -/** - * Checks if a SQLite result code matches a base code (e.g. BUSY/LOCKED), including extended codes. - * SQLite extended result codes preserve the base code in the low byte. - */ -private fun isSQLiteBaseCode(e: Throwable, baseCode: Int): Boolean { - val code = sqliteResultCode(e) ?: return false - return code == baseCode || (code and 0xFF) == baseCode -} - -private fun sqliteResultCode(e: Throwable): Int? { - var t: Throwable? = e - while (t != null) { - val className = t.javaClass.name - if (className.contains("SQLiteException") || className.contains("sqlite")) { - if (t is SQLException) { - val ec = - try { - t.errorCode - } catch (_: Exception) { - null - } - if (ec != null && ec != 0) return ec - } - - val reflected = trySQLiteResultCodeReflectively(t) - if (reflected != null && reflected != 0) return reflected - - if (t is SQLException) { - val ec = - try { - t.errorCode - } catch (_: Exception) { - null - } - if (ec != null) return ec - } - } - - t = t.cause - if (t === e) break - } - - return null -} - -private fun trySQLiteResultCodeReflectively(e: Throwable): Int? { - return try { - val getResultCode = - e.javaClass.methods.firstOrNull { it.name == "getResultCode" && it.parameterCount == 0 } - if (getResultCode != null) { - val resultCode = getResultCode.invoke(e) ?: return null - val codeField = - resultCode.javaClass.declaredFields.firstOrNull { - it.name.equals("code", ignoreCase = true) - } - if (codeField != null) { - codeField.isAccessible = true - return when (val v = codeField.get(resultCode)) { - is Int -> v - is Number -> v.toInt() - else -> null - } - } - } - null - } catch (_: Exception) { - null - } -} - -private fun isSQLiteException(e: Throwable): Boolean { - var t: Throwable? = e - while (t != null) { - val className = t.javaClass.name - if (className.contains("SQLiteException") || className.contains("sqlite")) { - return true - } - t = t.cause - if (t === e) break - } - return false -} - -/** Microsoft SQL Server-specific exception filters. */ -internal object MSSQLFilters : RdbmsExceptionFilters { - override val transientFailure: SqlExceptionFilter = - object : SqlExceptionFilter { - override fun matches(e: Throwable): Boolean = - when { - CommonSqlFilters.transactionTransient.matches(e) -> true - e is SQLException && hasSQLServerError(e, 1205) -> true // Deadlock - failedToResumeTransaction.matches(e) -> true - else -> false - } - } - - override val duplicateKey: SqlExceptionFilter = - object : SqlExceptionFilter { - override fun matches(e: Throwable): Boolean = - when { - CommonSqlFilters.integrityConstraint.matches(e) -> true - e is SQLException && hasSQLServerError(e, 2627, 2601) -> true - e is SQLException && - e.errorCode in setOf(2627, 2601) && - e.sqlState == "23000" -> true - e is SQLException && matchesMessage(e.message, DUPLICATE_KEY_KEYWORDS) -> true - else -> false - } - } - - override val invalidTable: SqlExceptionFilter = - object : SqlExceptionFilter { - override fun matches(e: Throwable): Boolean = - when { - e is SQLException && e.errorCode == 208 && e.sqlState == "42S02" -> true - e is SQLException && matchesMessage(e.message, listOf("invalid object name")) -> - true - else -> false - } - } - - override val objectAlreadyExists: SqlExceptionFilter = - object : SqlExceptionFilter { - override fun matches(e: Throwable): Boolean = - e is SQLException && hasSQLServerError(e, 2714, 2705, 1913, 15248, 15335) - } - - val failedToResumeTransaction: SqlExceptionFilter = - object : SqlExceptionFilter { - override fun matches(e: Throwable): Boolean = - isSQLServerException(e) && - e.message?.contains("The server failed to resume the transaction") == true - } - - private val DUPLICATE_KEY_KEYWORDS = - listOf("primary key constraint", "unique constraint", "integrity constraint") -} - -private fun matchesMessage(message: String?, keywords: List): Boolean { +internal fun matchesMessage(message: String?, keywords: List): Boolean { if (message == null) return false val lowerMessage = message.lowercase() return keywords.any { lowerMessage.contains(it.lowercase()) } } -private fun hasSQLServerError(e: Throwable, vararg errorNumbers: Int): Boolean { - if (!isSQLServerException(e)) return false - - return try { - val sqlServerErrorMethod = e.javaClass.getMethod("getSQLServerError") - val sqlServerError = sqlServerErrorMethod.invoke(e) - - if (sqlServerError != null) { - val getErrorNumberMethod = sqlServerError.javaClass.getMethod("getErrorNumber") - val errorNumber = getErrorNumberMethod.invoke(sqlServerError) as? Int - errorNumber != null && errorNumber in errorNumbers - } else { - false - } - } catch (_: Exception) { - false - } -} - -private fun isSQLServerException(e: Throwable): Boolean = - e.javaClass.name == "com.microsoft.sqlserver.jdbc.SQLServerException" - /** * Maps a JDBC driver to its corresponding exception filters. * @@ -329,4 +79,5 @@ internal fun filtersForDriver(driver: JdbcDriver): RdbmsExceptionFilters = JdbcDriver.HSQLDB -> HSQLDBFilters JdbcDriver.MsSqlServer -> MSSQLFilters JdbcDriver.Sqlite -> SQLiteFilters + JdbcDriver.MariaDB -> MariaDBFilters } diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/hsqldb/HSQLDBAdapter.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/hsqldb/HSQLDBAdapter.kt new file mode 100644 index 0000000..b3fae2b --- /dev/null +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/hsqldb/HSQLDBAdapter.kt @@ -0,0 +1,116 @@ +package org.funfix.delayedqueue.jvm.internals.jdbc.hsqldb + +import java.sql.Connection +import java.time.Duration +import java.time.Instant +import org.funfix.delayedqueue.jvm.JdbcDriver +import org.funfix.delayedqueue.jvm.internals.jdbc.DBTableRow +import org.funfix.delayedqueue.jvm.internals.jdbc.DBTableRowWithId +import org.funfix.delayedqueue.jvm.internals.jdbc.SQLVendorAdapter +import org.funfix.delayedqueue.jvm.internals.jdbc.toDBTableRowWithId + +/** HSQLDB-specific adapter. */ +internal class HSQLDBAdapter(driver: JdbcDriver, tableName: String) : + SQLVendorAdapter(driver, tableName) { + + override fun selectForUpdateOneRow( + connection: Connection, + kind: String, + key: String, + ): DBTableRowWithId? { + // HSQLDB has limited row-level locking support, so we fall back to plain SELECT. + // This matches the original Scala implementation's behavior for HSQLDB. + return selectByKey(connection, kind, key) + } + + override fun insertOneRow(connection: Connection, row: DBTableRow): Boolean { + val sql = + """ + INSERT INTO $tableName + (pKey, pKind, payload, scheduledAt, scheduledAtInitially, createdAt) + VALUES (?, ?, ?, ?, ?, ?) + """ + + return try { + connection.prepareStatement(sql).use { stmt -> + stmt.setString(1, row.pKey) + stmt.setString(2, row.pKind) + stmt.setBytes(3, row.payload) + stmt.setEpochMillis(4, row.scheduledAt) + stmt.setEpochMillis(5, row.scheduledAtInitially) + stmt.setEpochMillis(6, row.createdAt) + stmt.executeUpdate() > 0 + } + } catch (e: Exception) { + // If it's a duplicate key violation, return false (key already exists) + // This matches the original Scala implementation's behavior: + // insertIntoTable(...).recover { case SQLExceptionExtractors.DuplicateKey(_) => false } + if (HSQLDBFilters.duplicateKey.matches(e)) { + false + } else { + throw e + } + } + } + + override fun selectFirstAvailableWithLock( + connection: Connection, + kind: String, + now: Instant, + ): DBTableRowWithId? { + val sql = + """ + SELECT TOP 1 + id, pKey, pKind, payload, scheduledAt, scheduledAtInitially, lockUuid, createdAt + FROM $tableName + WHERE pKind = ? AND scheduledAt <= ? + ORDER BY scheduledAt + """ + + return connection.prepareStatement(sql).use { stmt -> + stmt.setString(1, kind) + stmt.setEpochMillis(2, now) + stmt.executeQuery().use { rs -> + if (rs.next()) { + rs.toDBTableRowWithId() + } else { + null + } + } + } + } + + override fun acquireManyOptimistically( + connection: Connection, + kind: String, + limit: Int, + lockUuid: String, + timeout: Duration, + now: Instant, + ): Int { + require(limit > 0) { "Limit must be > 0" } + val expireAt = now.plus(timeout) + + val sql = + """ + UPDATE $tableName + SET lockUuid = ?, + scheduledAt = ? + WHERE id IN ( + SELECT id + FROM $tableName + WHERE pKind = ? AND scheduledAt <= ? + ORDER BY scheduledAt + LIMIT $limit + ) + """ + + return connection.prepareStatement(sql).use { stmt -> + stmt.setString(1, lockUuid) + stmt.setEpochMillis(2, expireAt) + stmt.setString(3, kind) + stmt.setEpochMillis(4, now) + stmt.executeUpdate() + } + } +} diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/hsqldb/HSQLDBFilters.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/hsqldb/HSQLDBFilters.kt new file mode 100644 index 0000000..02506ba --- /dev/null +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/hsqldb/HSQLDBFilters.kt @@ -0,0 +1,37 @@ +package org.funfix.delayedqueue.jvm.internals.jdbc.hsqldb + +import java.sql.SQLException +import org.funfix.delayedqueue.jvm.internals.jdbc.CommonSqlFilters +import org.funfix.delayedqueue.jvm.internals.jdbc.RdbmsExceptionFilters +import org.funfix.delayedqueue.jvm.internals.jdbc.SqlExceptionFilter +import org.funfix.delayedqueue.jvm.internals.jdbc.matchesMessage + +/** HSQLDB-specific exception filters. */ +internal object HSQLDBFilters : RdbmsExceptionFilters { + override val transientFailure: SqlExceptionFilter = CommonSqlFilters.transactionTransient + + override val duplicateKey: SqlExceptionFilter = + object : SqlExceptionFilter { + override fun matches(e: Throwable): Boolean = + when { + CommonSqlFilters.integrityConstraint.matches(e) -> true + e is SQLException && e.errorCode == -104 && e.sqlState == "23505" -> true + e is SQLException && matchesMessage(e.message, DUPLICATE_KEY_KEYWORDS) -> true + else -> false + } + } + + override val invalidTable: SqlExceptionFilter = + object : SqlExceptionFilter { + override fun matches(e: Throwable): Boolean = + e is SQLException && matchesMessage(e.message, listOf("invalid object name")) + } + + override val objectAlreadyExists: SqlExceptionFilter = + object : SqlExceptionFilter { + override fun matches(e: Throwable): Boolean = false + } + + private val DUPLICATE_KEY_KEYWORDS = + listOf("primary key constraint", "unique constraint", "integrity constraint") +} diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/HSQLDBMigrations.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/hsqldb/HSQLDBMigrations.kt similarity index 89% rename from delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/HSQLDBMigrations.kt rename to delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/hsqldb/HSQLDBMigrations.kt index 2f484ff..5902e0c 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/HSQLDBMigrations.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/hsqldb/HSQLDBMigrations.kt @@ -1,4 +1,6 @@ -package org.funfix.delayedqueue.jvm.internals.jdbc +package org.funfix.delayedqueue.jvm.internals.jdbc.hsqldb + +import org.funfix.delayedqueue.jvm.internals.jdbc.Migration /** HSQLDB-specific migrations for the DelayedQueue table. */ internal object HSQLDBMigrations { @@ -10,7 +12,7 @@ internal object HSQLDBMigrations { */ fun getMigrations(tableName: String): List = listOf( - Migration.createTableIfNotExists( + Migration.Companion.createTableIfNotExists( tableName = tableName, sql = """ diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mariadb/MariaDBAdapter.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mariadb/MariaDBAdapter.kt new file mode 100644 index 0000000..404e846 --- /dev/null +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mariadb/MariaDBAdapter.kt @@ -0,0 +1,126 @@ +package org.funfix.delayedqueue.jvm.internals.jdbc.mariadb + +import java.sql.Connection +import java.time.Duration +import java.time.Instant +import org.funfix.delayedqueue.jvm.JdbcDriver +import org.funfix.delayedqueue.jvm.internals.jdbc.DBTableRow +import org.funfix.delayedqueue.jvm.internals.jdbc.DBTableRowWithId +import org.funfix.delayedqueue.jvm.internals.jdbc.SQLVendorAdapter +import org.funfix.delayedqueue.jvm.internals.jdbc.toDBTableRowWithId + +/** MariaDB-specific adapter. */ +internal class MariaDBAdapter(driver: JdbcDriver, tableName: String) : + SQLVendorAdapter(driver, tableName) { + + override fun insertOneRow(connection: Connection, row: DBTableRow): Boolean { + val sql = + """ + INSERT IGNORE INTO $tableName + (pKey, pKind, payload, scheduledAt, scheduledAtInitially, createdAt) + VALUES (?, ?, ?, ?, ?, ?) + """ + + return connection.prepareStatement(sql).use { stmt -> + stmt.setString(1, row.pKey) + stmt.setString(2, row.pKind) + stmt.setBytes(3, row.payload) + stmt.setEpochMillis(4, row.scheduledAt) + stmt.setEpochMillis(5, row.scheduledAtInitially) + stmt.setEpochMillis(6, row.createdAt) + stmt.executeUpdate() > 0 + } + } + + override fun selectForUpdateOneRow( + connection: Connection, + kind: String, + key: String, + ): DBTableRowWithId? { + val sql = + """ + SELECT id, pKey, pKind, payload, scheduledAt, scheduledAtInitially, lockUuid, createdAt + FROM $tableName + WHERE pKey = ? AND pKind = ? + LIMIT 1 + FOR UPDATE + """ + + return connection.prepareStatement(sql).use { stmt -> + stmt.setString(1, key) + stmt.setString(2, kind) + stmt.executeQuery().use { rs -> + if (rs.next()) { + rs.toDBTableRowWithId() + } else { + null + } + } + } + } + + override fun selectFirstAvailableWithLock( + connection: Connection, + kind: String, + now: Instant, + ): DBTableRowWithId? { + val sql = + """ + SELECT id, pKey, pKind, payload, scheduledAt, scheduledAtInitially, lockUuid, createdAt + FROM $tableName + WHERE pKind = ? AND scheduledAt <= ? + ORDER BY scheduledAt + LIMIT 1 + FOR UPDATE SKIP LOCKED + """ + + return connection.prepareStatement(sql).use { stmt -> + stmt.setString(1, kind) + stmt.setEpochMillis(2, now) + stmt.executeQuery().use { rs -> + if (rs.next()) { + rs.toDBTableRowWithId() + } else { + null + } + } + } + } + + override fun acquireManyOptimistically( + connection: Connection, + kind: String, + limit: Int, + lockUuid: String, + timeout: Duration, + now: Instant, + ): Int { + require(limit > 0) { "Limit must be > 0" } + val expireAt = now.plus(timeout) + + val sql = + """ + UPDATE $tableName + SET lockUuid = ?, + scheduledAt = ? + WHERE id IN ( + SELECT id FROM ( + SELECT id + FROM $tableName + WHERE pKind = ? AND scheduledAt <= ? + ORDER BY scheduledAt + LIMIT $limit + FOR UPDATE SKIP LOCKED + ) AS subq + ) + """ + + return connection.prepareStatement(sql).use { stmt -> + stmt.setString(1, lockUuid) + stmt.setEpochMillis(2, expireAt) + stmt.setString(3, kind) + stmt.setEpochMillis(4, now) + stmt.executeUpdate() + } + } +} diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mariadb/MariaDBFilters.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mariadb/MariaDBFilters.kt new file mode 100644 index 0000000..4c93b19 --- /dev/null +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mariadb/MariaDBFilters.kt @@ -0,0 +1,75 @@ +package org.funfix.delayedqueue.jvm.internals.jdbc.mariadb + +import java.sql.SQLException +import org.funfix.delayedqueue.jvm.internals.jdbc.CommonSqlFilters +import org.funfix.delayedqueue.jvm.internals.jdbc.RdbmsExceptionFilters +import org.funfix.delayedqueue.jvm.internals.jdbc.SqlExceptionFilter +import org.funfix.delayedqueue.jvm.internals.jdbc.matchesMessage + +/** MariaDB-specific exception filters. */ +internal object MariaDBFilters : RdbmsExceptionFilters { + override val transientFailure: SqlExceptionFilter = + object : SqlExceptionFilter { + override fun matches(e: Throwable): Boolean = + when { + matchesTransient(e) -> true + matchesNextExceptionChain(e) -> true + else -> { + val cause = e.cause + cause != null && cause !== e && matches(cause) + } + } + } + + override val duplicateKey: SqlExceptionFilter = + object : SqlExceptionFilter { + override fun matches(e: Throwable): Boolean = + when { + CommonSqlFilters.integrityConstraint.matches(e) -> true + e is SQLException && e.errorCode == 1062 && e.sqlState == "23000" -> true + e is SQLException && matchesMessage(e.message, DUPLICATE_KEY_KEYWORDS) -> true + else -> false + } + } + + override val invalidTable: SqlExceptionFilter = + object : SqlExceptionFilter { + override fun matches(e: Throwable): Boolean = + when { + e is SQLException && e.errorCode == 1146 && e.sqlState == "42S02" -> true + e is SQLException && + matchesMessage(e.message, listOf("doesn't exist", "table")) -> true + else -> false + } + } + + override val objectAlreadyExists: SqlExceptionFilter = + object : SqlExceptionFilter { + override fun matches(e: Throwable): Boolean = + e is SQLException && e.errorCode == 1050 // Table already exists + } + + private val DUPLICATE_KEY_KEYWORDS = + listOf("duplicate entry", "primary key constraint", "unique constraint") + + private fun matchesTransient(e: Throwable): Boolean = + when { + CommonSqlFilters.transactionTransient.matches(e) -> true + e is SQLException && e.errorCode == 1213 -> true // Deadlock + e is SQLException && e.errorCode == 1205 -> true // Lock wait timeout + e is SQLException && e.errorCode == 1020 -> true // Record changed since last read + else -> false + } + + private fun matchesNextExceptionChain(e: Throwable): Boolean { + val sqlException = e as? SQLException ?: return false + var next = sqlException.nextException + while (next != null && next !== e) { + if (matchesTransient(next)) { + return true + } + next = next.nextException + } + return false + } +} diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mariadb/MariaDBMigrations.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mariadb/MariaDBMigrations.kt new file mode 100644 index 0000000..555689a --- /dev/null +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mariadb/MariaDBMigrations.kt @@ -0,0 +1,38 @@ +package org.funfix.delayedqueue.jvm.internals.jdbc.mariadb + +import org.funfix.delayedqueue.jvm.internals.jdbc.Migration + +/** MariaDB-specific migrations for the DelayedQueue table. */ +internal object MariaDBMigrations { + /** + * Gets the list of migrations for MariaDB. + * + * @param tableName The name of the delayed queue table + * @return List of migrations in order + */ + fun getMigrations(tableName: String): List = + listOf( + Migration.Companion.createTableIfNotExists( + tableName = tableName, + sql = + """ + CREATE TABLE $tableName ( + id BIGINT NOT NULL AUTO_INCREMENT, + pKey VARCHAR(200) NOT NULL, + pKind VARCHAR(100) NOT NULL, + payload MEDIUMBLOB NOT NULL, + scheduledAt BIGINT NOT NULL, + scheduledAtInitially BIGINT NOT NULL, + lockUuid VARCHAR(36) NULL, + createdAt BIGINT NOT NULL, + PRIMARY KEY (pKey, pKind), + UNIQUE KEY ${tableName}__IdUniqueIndex (id) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + + CREATE INDEX ${tableName}__KindPlusScheduledAtIndex ON $tableName(pKind, scheduledAt); + CREATE INDEX ${tableName}__LockUuidPlusIdIndex ON $tableName(lockUuid, id) + """ + .trimIndent(), + ) + ) +} diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mssql/MSSQLFilters.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mssql/MSSQLFilters.kt new file mode 100644 index 0000000..4684805 --- /dev/null +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mssql/MSSQLFilters.kt @@ -0,0 +1,84 @@ +package org.funfix.delayedqueue.jvm.internals.jdbc.mssql + +import java.sql.SQLException +import org.funfix.delayedqueue.jvm.internals.jdbc.CommonSqlFilters +import org.funfix.delayedqueue.jvm.internals.jdbc.RdbmsExceptionFilters +import org.funfix.delayedqueue.jvm.internals.jdbc.SqlExceptionFilter +import org.funfix.delayedqueue.jvm.internals.jdbc.matchesMessage + +/** Microsoft SQL Server-specific exception filters. */ +internal object MSSQLFilters : RdbmsExceptionFilters { + override val transientFailure: SqlExceptionFilter = + object : SqlExceptionFilter { + override fun matches(e: Throwable): Boolean = + when { + CommonSqlFilters.transactionTransient.matches(e) -> true + e is SQLException && hasSQLServerError(e, 1205) -> true // Deadlock + failedToResumeTransaction.matches(e) -> true + else -> false + } + } + + override val duplicateKey: SqlExceptionFilter = + object : SqlExceptionFilter { + override fun matches(e: Throwable): Boolean = + when { + CommonSqlFilters.integrityConstraint.matches(e) -> true + e is SQLException && hasSQLServerError(e, 2627, 2601) -> true + e is SQLException && + e.errorCode in setOf(2627, 2601) && + e.sqlState == "23000" -> true + e is SQLException && matchesMessage(e.message, DUPLICATE_KEY_KEYWORDS) -> true + else -> false + } + } + + override val invalidTable: SqlExceptionFilter = + object : SqlExceptionFilter { + override fun matches(e: Throwable): Boolean = + when { + e is SQLException && e.errorCode == 208 && e.sqlState == "42S02" -> true + e is SQLException && matchesMessage(e.message, listOf("invalid object name")) -> + true + else -> false + } + } + + override val objectAlreadyExists: SqlExceptionFilter = + object : SqlExceptionFilter { + override fun matches(e: Throwable): Boolean = + e is SQLException && hasSQLServerError(e, 2714, 2705, 1913, 15248, 15335) + } + + val failedToResumeTransaction: SqlExceptionFilter = + object : SqlExceptionFilter { + override fun matches(e: Throwable): Boolean = + isSQLServerException(e) && + e.message?.contains("The server failed to resume the transaction") == true + } + + private val DUPLICATE_KEY_KEYWORDS = + listOf("primary key constraint", "unique constraint", "integrity constraint") +} + +private fun hasSQLServerError(e: Throwable, vararg errorNumbers: Int): Boolean { + if (!isSQLServerException(e)) return false + + return try { + val sqlServerErrorMethod = e.javaClass.getMethod("getSQLServerError") + val sqlServerError = sqlServerErrorMethod.invoke(e) + + if (sqlServerError != null) { + val getErrorNumberMethod = sqlServerError.javaClass.getMethod("getErrorNumber") + val errorNumber = getErrorNumberMethod.invoke(sqlServerError) as? Int + errorNumber != null && errorNumber in errorNumbers + } else { + false + } + } catch (_: Exception) { + false + } +} + +private fun isSQLServerException(e: Throwable): Boolean = + e.javaClass.name == "com.microsoft.sqlserver.jdbc.SQLServerException" diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mssql/MsSqlServerAdapter.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mssql/MsSqlServerAdapter.kt new file mode 100644 index 0000000..f2618d7 --- /dev/null +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mssql/MsSqlServerAdapter.kt @@ -0,0 +1,181 @@ +package org.funfix.delayedqueue.jvm.internals.jdbc.mssql + +import java.sql.Connection +import java.time.Duration +import java.time.Instant +import org.funfix.delayedqueue.jvm.JdbcDriver +import org.funfix.delayedqueue.jvm.internals.jdbc.DBTableRow +import org.funfix.delayedqueue.jvm.internals.jdbc.DBTableRowWithId +import org.funfix.delayedqueue.jvm.internals.jdbc.SQLVendorAdapter +import org.funfix.delayedqueue.jvm.internals.jdbc.toDBTableRowWithId + +/** MS-SQL-specific adapter. */ +internal class MsSqlServerAdapter(driver: JdbcDriver, tableName: String) : + SQLVendorAdapter(driver, tableName) { + + override fun insertOneRow(connection: Connection, row: DBTableRow): Boolean { + val sql = + """ + IF NOT EXISTS ( + SELECT 1 FROM $tableName WHERE pKey = ? AND pKind = ? + ) + BEGIN + INSERT INTO $tableName + (pKey, pKind, payload, scheduledAt, scheduledAtInitially, createdAt) + VALUES (?, ?, ?, ?, ?, ?) + END + """ + + return connection.prepareStatement(sql).use { stmt -> + stmt.setString(1, row.pKey) + stmt.setString(2, row.pKind) + stmt.setString(3, row.pKey) + stmt.setString(4, row.pKind) + stmt.setBytes(5, row.payload) + stmt.setEpochMillis(6, row.scheduledAt) + stmt.setEpochMillis(7, row.scheduledAtInitially) + stmt.setEpochMillis(8, row.createdAt) + stmt.executeUpdate() > 0 + } + } + + override fun selectForUpdateOneRow( + connection: Connection, + kind: String, + key: String, + ): DBTableRowWithId? { + val sql = + """ + SELECT TOP 1 + id, pKey, pKind, payload, scheduledAt, scheduledAtInitially, lockUuid, createdAt + FROM $tableName + WITH (UPDLOCK) + WHERE pKey = ? AND pKind = ? + """ + + return connection.prepareStatement(sql).use { stmt -> + stmt.setString(1, key) + stmt.setString(2, kind) + stmt.executeQuery().use { rs -> + if (rs.next()) { + rs.toDBTableRowWithId() + } else { + null + } + } + } + } + + override fun selectFirstAvailableWithLock( + connection: Connection, + kind: String, + now: Instant, + ): DBTableRowWithId? { + val sql = + """ + SELECT TOP 1 + id, pKey, pKind, payload, scheduledAt, scheduledAtInitially, lockUuid, createdAt + FROM $tableName + WITH (UPDLOCK, READPAST) + WHERE pKind = ? AND scheduledAt <= ? + ORDER BY scheduledAt + """ + + return connection.prepareStatement(sql).use { stmt -> + stmt.setString(1, kind) + stmt.setEpochMillis(2, now) + stmt.executeQuery().use { rs -> + if (rs.next()) { + rs.toDBTableRowWithId() + } else { + null + } + } + } + } + + override fun acquireManyOptimistically( + connection: Connection, + kind: String, + limit: Int, + lockUuid: String, + timeout: Duration, + now: Instant, + ): Int { + require(limit > 0) { "Limit must be > 0" } + val expireAt = now.plus(timeout) + + val sql = + """ + UPDATE $tableName + SET lockUuid = ?, + scheduledAt = ? + WHERE id IN ( + SELECT TOP $limit id + FROM $tableName + WITH (UPDLOCK, READPAST) + WHERE pKind = ? AND scheduledAt <= ? + ORDER BY scheduledAt + ) + """ + + return connection.prepareStatement(sql).use { stmt -> + stmt.setString(1, lockUuid) + stmt.setEpochMillis(2, expireAt) + stmt.setString(3, kind) + stmt.setEpochMillis(4, now) + stmt.executeUpdate() + } + } + + override fun selectByKey(connection: Connection, kind: String, key: String): DBTableRowWithId? { + val sql = + """ + SELECT TOP 1 + id, pKey, pKind, payload, scheduledAt, scheduledAtInitially, lockUuid, createdAt + FROM $tableName + WHERE pKey = ? AND pKind = ? + """ + + return connection.prepareStatement(sql).use { stmt -> + stmt.setString(1, key) + stmt.setString(2, kind) + stmt.executeQuery().use { rs -> + if (rs.next()) { + rs.toDBTableRowWithId() + } else { + null + } + } + } + } + + override fun selectAllAvailableWithLock( + connection: Connection, + lockUuid: String, + count: Int, + offsetId: Long?, + ): List { + val offsetClause = offsetId?.let { "AND id > ?" } ?: "" + val sql = + """ + SELECT TOP $count + id, pKey, pKind, payload, scheduledAt, scheduledAtInitially, lockUuid, createdAt + FROM $tableName + WHERE lockUuid = ? $offsetClause + ORDER BY id + """ + + return connection.prepareStatement(sql).use { stmt -> + stmt.setString(1, lockUuid) + offsetId?.let { stmt.setLong(2, it) } + stmt.executeQuery().use { rs -> + val results = mutableListOf() + while (rs.next()) { + results.add(rs.toDBTableRowWithId()) + } + results + } + } + } +} diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/MsSqlServerMigrations.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mssql/MsSqlServerMigrations.kt similarity index 90% rename from delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/MsSqlServerMigrations.kt rename to delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mssql/MsSqlServerMigrations.kt index 35acfae..1903d00 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/MsSqlServerMigrations.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/mssql/MsSqlServerMigrations.kt @@ -1,4 +1,6 @@ -package org.funfix.delayedqueue.jvm.internals.jdbc +package org.funfix.delayedqueue.jvm.internals.jdbc.mssql + +import org.funfix.delayedqueue.jvm.internals.jdbc.Migration /** MS-SQL-specific migrations for the DelayedQueue table. */ internal object MsSqlServerMigrations { @@ -10,7 +12,7 @@ internal object MsSqlServerMigrations { */ fun getMigrations(tableName: String): List = listOf( - Migration.createTableIfNotExists( + Migration.Companion.createTableIfNotExists( tableName = tableName, sql = """ diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/publicApi.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/publicApi.kt deleted file mode 100644 index aebeb0e..0000000 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/publicApi.kt +++ /dev/null @@ -1,13 +0,0 @@ -package org.funfix.delayedqueue.jvm.internals.jdbc - -import org.funfix.delayedqueue.jvm.ResourceUnavailableException -import org.funfix.delayedqueue.jvm.internals.utils.Raise -import org.funfix.delayedqueue.jvm.internals.utils.unsafeSneakyRaises - -internal typealias PublicApiBlock = - context(Raise, Raise) - () -> T - -internal inline fun publicApiThatThrows(block: PublicApiBlock): T { - return unsafeSneakyRaises { block() } -} diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/sqlite/SQLiteFilters.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/sqlite/SQLiteFilters.kt new file mode 100644 index 0000000..67a3439 --- /dev/null +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/sqlite/SQLiteFilters.kt @@ -0,0 +1,151 @@ +package org.funfix.delayedqueue.jvm.internals.jdbc.sqlite + +import java.sql.SQLException +import org.funfix.delayedqueue.jvm.internals.jdbc.CommonSqlFilters +import org.funfix.delayedqueue.jvm.internals.jdbc.RdbmsExceptionFilters +import org.funfix.delayedqueue.jvm.internals.jdbc.SqlExceptionFilter +import org.funfix.delayedqueue.jvm.internals.jdbc.matchesMessage + +/** SQLite-specific exception filters. */ +internal object SQLiteFilters : RdbmsExceptionFilters { + override val transientFailure: SqlExceptionFilter = + object : SqlExceptionFilter { + override fun matches(e: Throwable): Boolean = + when { + CommonSqlFilters.transactionTransient.matches(e) -> true + // SQLite BUSY (error code 5) — database is locked by another connection + e is SQLException && isSQLiteBaseCode(e, 5) -> true + // SQLite LOCKED (error code 6) — table-level lock within a connection + e is SQLException && isSQLiteBaseCode(e, 6) -> true + else -> false + } + } + + override val duplicateKey: SqlExceptionFilter = + object : SqlExceptionFilter { + override fun matches(e: Throwable): Boolean = + when { + CommonSqlFilters.integrityConstraint.matches(e) -> true + // SQLite CONSTRAINT_PRIMARYKEY (2067) and CONSTRAINT_UNIQUE (2579) + e is SQLException && isSQLiteResultCode(e, 2067, 2579) -> true + // SQLite generic CONSTRAINT (19) when paired with duplicate-key text + e is SQLException && + isSQLiteResultCode(e, 19) && + matchesMessage(e.message, DUPLICATE_KEY_KEYWORDS) -> true + // If the message text indicates a duplicate-key violation, accept it + e is SQLException && matchesMessage(e.message, DUPLICATE_KEY_KEYWORDS) -> true + else -> false + } + } + + override val invalidTable: SqlExceptionFilter = + object : SqlExceptionFilter { + override fun matches(e: Throwable): Boolean = + e is SQLException && matchesMessage(e.message, listOf("no such table")) + } + + override val objectAlreadyExists: SqlExceptionFilter = + object : SqlExceptionFilter { + override fun matches(e: Throwable): Boolean = + e is SQLException && matchesMessage(e.message, listOf("already exists")) + } + + private val DUPLICATE_KEY_KEYWORDS = listOf("primary key constraint", "unique constraint") +} + +/** + * Attempts to detect SQLite-specific result codes in exceptions in a defensive way. + * + * Strategy: + * 1. Walk the cause chain searching for a throwable whose class name contains "SQLiteException"; + * 2. Try JDBC-standard accessors first: if the throwable is a SQLException, use getErrorCode(); + * 3. As a last resort, attempt reflection on well-known driver methods (getResultCode -> code) but + * be defensive and return false on any reflection failure. + */ +private fun isSQLiteResultCode(e: Throwable, vararg codes: Int): Boolean { + val code = sqliteResultCode(e) ?: return false + return code in codes +} + +/** + * Checks if a SQLite result code matches a base code (e.g. BUSY/LOCKED), including extended codes. + * SQLite extended result codes preserve the base code in the low byte. + */ +private fun isSQLiteBaseCode(e: Throwable, baseCode: Int): Boolean { + val code = sqliteResultCode(e) ?: return false + return code == baseCode || (code and 0xFF) == baseCode +} + +private fun sqliteResultCode(e: Throwable): Int? { + var t: Throwable? = e + while (t != null) { + val className = t.javaClass.name + if (className.contains("SQLiteException") || className.contains("sqlite")) { + if (t is SQLException) { + val ec = + try { + t.errorCode + } catch (_: Exception) { + null + } + if (ec != null && ec != 0) return ec + } + + val reflected = trySQLiteResultCodeReflectively(t) + if (reflected != null && reflected != 0) return reflected + + if (t is SQLException) { + val ec = + try { + t.errorCode + } catch (_: Exception) { + null + } + if (ec != null) return ec + } + } + + t = t.cause + if (t === e) break + } + + return null +} + +private fun trySQLiteResultCodeReflectively(e: Throwable): Int? { + return try { + val getResultCode = + e.javaClass.methods.firstOrNull { it.name == "getResultCode" && it.parameterCount == 0 } + if (getResultCode != null) { + val resultCode = getResultCode.invoke(e) ?: return null + val codeField = + resultCode.javaClass.declaredFields.firstOrNull { + it.name.equals("code", ignoreCase = true) + } + if (codeField != null) { + codeField.isAccessible = true + return when (val v = codeField.get(resultCode)) { + is Int -> v + is Number -> v.toInt() + else -> null + } + } + } + null + } catch (_: Exception) { + null + } +} + +private fun isSQLiteException(e: Throwable): Boolean { + var t: Throwable? = e + while (t != null) { + val className = t.javaClass.name + if (className.contains("SQLiteException") || className.contains("sqlite")) { + return true + } + t = t.cause + if (t === e) break + } + return false +} diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/sqlite/SqliteAdapter.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/sqlite/SqliteAdapter.kt new file mode 100644 index 0000000..12e5702 --- /dev/null +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/sqlite/SqliteAdapter.kt @@ -0,0 +1,113 @@ +package org.funfix.delayedqueue.jvm.internals.jdbc.sqlite + +import java.sql.Connection +import java.time.Duration +import java.time.Instant +import org.funfix.delayedqueue.jvm.JdbcDriver +import org.funfix.delayedqueue.jvm.internals.jdbc.DBTableRow +import org.funfix.delayedqueue.jvm.internals.jdbc.DBTableRowWithId +import org.funfix.delayedqueue.jvm.internals.jdbc.SQLVendorAdapter +import org.funfix.delayedqueue.jvm.internals.jdbc.toDBTableRowWithId + +/** + * SQLite-specific adapter. + * + * SQLite has limited concurrency support — it uses database-level locking (WAL mode helps for + * concurrent reads). There is no SELECT FOR UPDATE or row-level locking. Instead, we rely on: + * - `INSERT OR IGNORE` for conditional inserts (best performance idiom for SQLite) + * - `LIMIT` syntax (same as HSQLDB) + * - Optimistic locking via compare-and-swap UPDATE patterns + */ +internal class SqliteAdapter(driver: JdbcDriver, tableName: String) : + SQLVendorAdapter(driver, tableName) { + + override fun selectForUpdateOneRow( + connection: Connection, + kind: String, + key: String, + ): DBTableRowWithId? { + // SQLite has no row-level locking; fall back to plain SELECT like HSQLDB. + return selectByKey(connection, kind, key) + } + + override fun insertOneRow(connection: Connection, row: DBTableRow): Boolean { + // INSERT OR IGNORE is the idiomatic SQLite way to skip duplicate key inserts. + val sql = + """ + INSERT OR IGNORE INTO $tableName + (pKey, pKind, payload, scheduledAt, scheduledAtInitially, createdAt) + VALUES (?, ?, ?, ?, ?, ?) + """ + + return connection.prepareStatement(sql).use { stmt -> + stmt.setString(1, row.pKey) + stmt.setString(2, row.pKind) + stmt.setBytes(3, row.payload) + stmt.setEpochMillis(4, row.scheduledAt) + stmt.setEpochMillis(5, row.scheduledAtInitially) + stmt.setEpochMillis(6, row.createdAt) + stmt.executeUpdate() > 0 + } + } + + override fun selectFirstAvailableWithLock( + connection: Connection, + kind: String, + now: Instant, + ): DBTableRowWithId? { + val sql = + """ + SELECT id, pKey, pKind, payload, scheduledAt, scheduledAtInitially, lockUuid, createdAt + FROM $tableName + WHERE pKind = ? AND scheduledAt <= ? + ORDER BY scheduledAt + LIMIT 1 + """ + + return connection.prepareStatement(sql).use { stmt -> + stmt.setString(1, kind) + stmt.setEpochMillis(2, now) + stmt.executeQuery().use { rs -> + if (rs.next()) { + rs.toDBTableRowWithId() + } else { + null + } + } + } + } + + override fun acquireManyOptimistically( + connection: Connection, + kind: String, + limit: Int, + lockUuid: String, + timeout: Duration, + now: Instant, + ): Int { + require(limit > 0) { "Limit must be > 0" } + val expireAt = now.plus(timeout) + + val sql = + """ + UPDATE $tableName + SET lockUuid = ?, + scheduledAt = ? + WHERE id IN ( + SELECT id + FROM $tableName + WHERE pKind = ? AND scheduledAt <= ? + ORDER BY scheduledAt + LIMIT $limit + ) + """ + + return connection.prepareStatement(sql).use { stmt -> + stmt.setString(1, lockUuid) + stmt.setEpochMillis(2, expireAt) + stmt.setString(3, kind) + stmt.setEpochMillis(4, now) + stmt.executeUpdate() + } + } +} diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/SqliteMigrations.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/sqlite/SqliteMigrations.kt similarity index 88% rename from delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/SqliteMigrations.kt rename to delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/sqlite/SqliteMigrations.kt index ee7ad67..a974b37 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/SqliteMigrations.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/sqlite/SqliteMigrations.kt @@ -1,4 +1,6 @@ -package org.funfix.delayedqueue.jvm.internals.jdbc +package org.funfix.delayedqueue.jvm.internals.jdbc.sqlite + +import org.funfix.delayedqueue.jvm.internals.jdbc.Migration /** SQLite-specific migrations for the DelayedQueue table. */ internal object SqliteMigrations { @@ -10,7 +12,7 @@ internal object SqliteMigrations { */ fun getMigrations(tableName: String): List = listOf( - Migration.createTableIfNotExists( + Migration.Companion.createTableIfNotExists( tableName = tableName, sql = """ diff --git a/delayedqueue-jvm/src/test/java/org/funfix/delayedqueue/api/DelayedQueueJDBCMariaDbAdvancedTest.java b/delayedqueue-jvm/src/test/java/org/funfix/delayedqueue/api/DelayedQueueJDBCMariaDbAdvancedTest.java new file mode 100644 index 0000000..eab79d3 --- /dev/null +++ b/delayedqueue-jvm/src/test/java/org/funfix/delayedqueue/api/DelayedQueueJDBCMariaDbAdvancedTest.java @@ -0,0 +1,63 @@ +package org.funfix.delayedqueue.api; + +import org.funfix.delayedqueue.jvm.*; +public class DelayedQueueJDBCMariaDbAdvancedTest extends DelayedQueueJDBCAdvancedTestBase { + @Override + protected DelayedQueueJDBC createQueue(String tableName, MutableClock clock) throws Exception { + var container = MariaDbTestContainer.container(); + var dbConfig = new JdbcConnectionConfig( + container.getJdbcUrl(), + JdbcDriver.MariaDB, + container.getUsername(), + container.getPassword(), + null + ); + + var queueConfig = new DelayedQueueJDBCConfig( + dbConfig, + tableName, + DelayedQueueTimeConfig.DEFAULT, + "advanced-mariadb-test-queue" + ); + + DelayedQueueJDBC.runMigrations(queueConfig); + + return DelayedQueueJDBC.create( + MessageSerializer.forStrings(), + queueConfig, + clock + ); + } + + @Override + protected DelayedQueueJDBC createQueueOnSameDB(String url, String tableName, MutableClock clock) throws Exception { + var container = MariaDbTestContainer.container(); + var dbConfig = new JdbcConnectionConfig( + url, + JdbcDriver.MariaDB, + container.getUsername(), + container.getPassword(), + null + ); + + var queueConfig = new DelayedQueueJDBCConfig( + dbConfig, + tableName, + DelayedQueueTimeConfig.DEFAULT, + "shared-mariadb-test-queue-" + tableName + ); + + DelayedQueueJDBC.runMigrations(queueConfig); + + return DelayedQueueJDBC.create( + MessageSerializer.forStrings(), + queueConfig, + clock + ); + } + + @Override + protected String databaseUrl() { + return MariaDbTestContainer.container().getJdbcUrl(); + } +} diff --git a/delayedqueue-jvm/src/test/java/org/funfix/delayedqueue/api/DelayedQueueJDBCMariaDbConcurrencyTest.java b/delayedqueue-jvm/src/test/java/org/funfix/delayedqueue/api/DelayedQueueJDBCMariaDbConcurrencyTest.java new file mode 100644 index 0000000..4fe4601 --- /dev/null +++ b/delayedqueue-jvm/src/test/java/org/funfix/delayedqueue/api/DelayedQueueJDBCMariaDbConcurrencyTest.java @@ -0,0 +1,25 @@ +package org.funfix.delayedqueue.api; + +import org.funfix.delayedqueue.jvm.*; +public class DelayedQueueJDBCMariaDbConcurrencyTest extends DelayedQueueJDBCConcurrencyTestBase { + @Override + protected DelayedQueueJDBC createQueue() throws Exception { + var container = MariaDbTestContainer.container(); + var dbConfig = new JdbcConnectionConfig( + container.getJdbcUrl(), + JdbcDriver.MariaDB, + container.getUsername(), + container.getPassword(), + null + ); + + var queueConfig = DelayedQueueJDBCConfig.create(dbConfig, "delayed_queue_test", "concurrency-mariadb-queue"); + + DelayedQueueJDBC.runMigrations(queueConfig); + + return DelayedQueueJDBC.create( + MessageSerializer.forStrings(), + queueConfig + ); + } +} diff --git a/delayedqueue-jvm/src/test/java/org/funfix/delayedqueue/api/DelayedQueueJDBCMariaDbTest.java b/delayedqueue-jvm/src/test/java/org/funfix/delayedqueue/api/DelayedQueueJDBCMariaDbTest.java new file mode 100644 index 0000000..b6d48cc --- /dev/null +++ b/delayedqueue-jvm/src/test/java/org/funfix/delayedqueue/api/DelayedQueueJDBCMariaDbTest.java @@ -0,0 +1,72 @@ +package org.funfix.delayedqueue.api; + +import org.funfix.delayedqueue.jvm.*; +public class DelayedQueueJDBCMariaDbTest extends DelayedQueueJDBCContractTestBase { + @Override + protected DelayedQueueJDBC createQueue() throws Exception { + var container = MariaDbTestContainer.container(); + var dbConfig = new JdbcConnectionConfig( + container.getJdbcUrl(), + JdbcDriver.MariaDB, + container.getUsername(), + container.getPassword(), + null + ); + + var queueConfig = DelayedQueueJDBCConfig.create(dbConfig, "delayed_queue_test", "jdbc-mariadb-queue"); + + DelayedQueueJDBC.runMigrations(queueConfig); + + return DelayedQueueJDBC.create( + MessageSerializer.forStrings(), + queueConfig + ); + } + + @Override + protected DelayedQueueJDBC createQueueWithClock(MutableClock clock) throws Exception { + var container = MariaDbTestContainer.container(); + var dbConfig = new JdbcConnectionConfig( + container.getJdbcUrl(), + JdbcDriver.MariaDB, + container.getUsername(), + container.getPassword(), + null + ); + + var queueConfig = DelayedQueueJDBCConfig.create(dbConfig, "delayed_queue_test", "jdbc-mariadb-queue"); + + DelayedQueueJDBC.runMigrations(queueConfig); + + return DelayedQueueJDBC.create( + MessageSerializer.forStrings(), + queueConfig, + clock + ); + } + + @Override + protected DelayedQueueJDBC createQueueWithClock( + MutableClock clock, + DelayedQueueTimeConfig timeConfig + ) throws Exception { + var container = MariaDbTestContainer.container(); + var dbConfig = new JdbcConnectionConfig( + container.getJdbcUrl(), + JdbcDriver.MariaDB, + container.getUsername(), + container.getPassword(), + null + ); + + var queueConfig = new DelayedQueueJDBCConfig(dbConfig, "delayed_queue_test", timeConfig, "jdbc-mariadb-queue"); + + DelayedQueueJDBC.runMigrations(queueConfig); + + return DelayedQueueJDBC.create( + MessageSerializer.forStrings(), + queueConfig, + clock + ); + } +} diff --git a/delayedqueue-jvm/src/test/java/org/funfix/delayedqueue/api/JdbcDriverTest.java b/delayedqueue-jvm/src/test/java/org/funfix/delayedqueue/api/JdbcDriverTest.java index 14d5c5e..4a14150 100644 --- a/delayedqueue-jvm/src/test/java/org/funfix/delayedqueue/api/JdbcDriverTest.java +++ b/delayedqueue-jvm/src/test/java/org/funfix/delayedqueue/api/JdbcDriverTest.java @@ -93,6 +93,7 @@ private String switchOnDriver(JdbcDriver driver) { case HSQLDB -> "hsqldb"; case MsSqlServer -> "mssqlserver"; case Sqlite -> "sqlite"; + case MariaDB -> "mariadb"; }; } @@ -132,6 +133,7 @@ void testSwitchExpressionCoverage() { case HSQLDB -> "hsqldb"; case MsSqlServer -> "mssql"; case Sqlite -> "sqlite"; + case MariaDB -> "mariadb"; }; assertEquals("hsqldb", result); @@ -141,6 +143,7 @@ void testSwitchExpressionCoverage() { case HSQLDB -> "hsqldb"; //noinspection DataFlowIssue case MsSqlServer -> "mssql"; + case MariaDB -> "mariadb"; }; assertEquals("mssql", result); } diff --git a/delayedqueue-jvm/src/test/java/org/funfix/delayedqueue/api/MariaDbTestContainer.java b/delayedqueue-jvm/src/test/java/org/funfix/delayedqueue/api/MariaDbTestContainer.java new file mode 100644 index 0000000..a466da9 --- /dev/null +++ b/delayedqueue-jvm/src/test/java/org/funfix/delayedqueue/api/MariaDbTestContainer.java @@ -0,0 +1,36 @@ +package org.funfix.delayedqueue.api; + +import org.junit.jupiter.api.Assumptions; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.MariaDBContainer; + +final class MariaDbTestContainer { + private static final String IMAGE = "mariadb:11.7"; + + private static volatile MariaDBContainer container; + + private MariaDbTestContainer() {} + + static MariaDBContainer container() { + assumeDockerAvailable(); + if (container == null) { + synchronized (MariaDbTestContainer.class) { + if (container == null) { + assumeDockerAvailable(); + container = + new MariaDBContainer<>(IMAGE) + .withDatabaseName("testdb") + .withUsername("test") + .withPassword("test"); + container.start(); + } + } + } + return container; + } + + private static void assumeDockerAvailable() { + boolean dockerAvailable = DockerClientFactory.instance().isDockerAvailable(); + Assumptions.assumeTrue(dockerAvailable, "Docker is not available; skipping MariaDB tests"); + } +} diff --git a/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/MariaDBAdapterSqlTests.kt b/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/MariaDBAdapterSqlTests.kt new file mode 100644 index 0000000..19c06b3 --- /dev/null +++ b/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/MariaDBAdapterSqlTests.kt @@ -0,0 +1,191 @@ +package org.funfix.delayedqueue.jvm.internals.jdbc + +import java.lang.reflect.InvocationHandler +import java.lang.reflect.Proxy +import java.sql.Connection +import java.sql.PreparedStatement +import java.sql.ResultSet +import java.sql.Timestamp +import java.time.Duration +import java.time.Instant +import org.funfix.delayedqueue.jvm.JdbcDriver +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test + +class MariaDBAdapterSqlTests { + private val tableName = "dq_table" + + @Test + fun `insert uses INSERT IGNORE pattern`() { + val capture = SqlCapture() + val connection = capture.connection(executeUpdateResult = 1) + val adapter = SQLVendorAdapter.create(JdbcDriver.MariaDB, tableName) + val row = + DBTableRow( + pKey = "key-1", + pKind = "kind-1", + payload = "payload".toByteArray(), + scheduledAt = Instant.parse("2024-01-01T00:00:00Z"), + scheduledAtInitially = Instant.parse("2024-01-01T00:00:00Z"), + lockUuid = null, + createdAt = Instant.parse("2024-01-01T00:00:00Z"), + ) + + val inserted = adapter.insertOneRow(connection, row) + + assertTrue(inserted) + val sql = capture.normalizedLast() + assertTrue(sql.contains("INSERT IGNORE INTO $tableName")) + } + + @Test + fun `selectForUpdateOneRow uses FOR UPDATE and LIMIT 1`() { + val capture = SqlCapture() + val connection = capture.connection() + val adapter = SQLVendorAdapter.create(JdbcDriver.MariaDB, tableName) + + adapter.selectForUpdateOneRow(connection, "kind-1", "key-1") + + val sql = capture.normalizedLast() + assertTrue(sql.contains("LIMIT 1")) + assertTrue(sql.contains("FOR UPDATE")) + assertTrue(sql.contains("WHERE pKey = ? AND pKind = ?")) + } + + @Test + fun `selectFirstAvailableWithLock uses FOR UPDATE SKIP LOCKED and LIMIT 1`() { + val capture = SqlCapture() + val connection = capture.connection() + val adapter = SQLVendorAdapter.create(JdbcDriver.MariaDB, tableName) + + adapter.selectFirstAvailableWithLock(connection, "kind-1", Instant.EPOCH) + + val sql = capture.normalizedLast() + assertTrue(sql.contains("LIMIT 1")) + assertTrue(sql.contains("FOR UPDATE SKIP LOCKED")) + assertTrue(sql.contains("ORDER BY scheduledAt")) + } + + @Test + fun `acquireManyOptimistically uses LIMIT and FOR UPDATE SKIP LOCKED`() { + val capture = SqlCapture() + val connection = capture.connection() + val adapter = SQLVendorAdapter.create(JdbcDriver.MariaDB, tableName) + + adapter.acquireManyOptimistically( + connection = connection, + kind = "kind-1", + limit = 5, + lockUuid = "lock-uuid", + timeout = Duration.ofSeconds(30), + now = Instant.EPOCH, + ) + + val sql = capture.normalizedLast() + assertTrue(sql.contains("UPDATE $tableName")) + assertTrue(sql.contains("LIMIT 5")) + assertTrue(sql.contains("FOR UPDATE SKIP LOCKED")) + } + + @Test + fun `selectByKey uses LIMIT 1`() { + val capture = SqlCapture() + val connection = capture.connection() + val adapter = SQLVendorAdapter.create(JdbcDriver.MariaDB, tableName) + + adapter.selectByKey(connection, "kind-1", "key-1") + + val sql = capture.normalizedLast() + assertTrue(sql.contains("LIMIT 1")) + assertFalse(sql.contains("TOP")) + } + + @Test + fun `selectAllAvailableWithLock uses LIMIT`() { + val capture = SqlCapture() + val connection = capture.connection() + val adapter = SQLVendorAdapter.create(JdbcDriver.MariaDB, tableName) + + adapter.selectAllAvailableWithLock(connection, "lock-uuid", 10, null) + + val sql = capture.normalizedLast() + assertTrue(sql.contains("LIMIT 10")) + assertTrue(sql.contains("ORDER BY id")) + assertFalse(sql.contains("TOP")) + } + + private class SqlCapture { + private val statements = mutableListOf() + + fun connection(executeUpdateResult: Int = 0): Connection { + val resultSet = + Proxy.newProxyInstance( + ResultSet::class.java.classLoader, + arrayOf(ResultSet::class.java), + InvocationHandler { _, method, _ -> + when (method.name) { + "next" -> false + "close" -> null + else -> defaultReturn(method.returnType) + } + }, + ) as ResultSet + + val preparedStatement = + Proxy.newProxyInstance( + PreparedStatement::class.java.classLoader, + arrayOf(PreparedStatement::class.java), + InvocationHandler { _, method, _ -> + when (method.name) { + "executeUpdate" -> executeUpdateResult + "executeQuery" -> resultSet + "executeBatch" -> IntArray(0) + "addBatch" -> null + "setString" -> null + "setBytes" -> null + "setTimestamp" -> null + "setLong" -> null + "setNull" -> null + "close" -> null + else -> defaultReturn(method.returnType) + } + }, + ) as PreparedStatement + + return Proxy.newProxyInstance( + Connection::class.java.classLoader, + arrayOf(Connection::class.java), + InvocationHandler { _, method, args -> + when (method.name) { + "prepareStatement" -> { + statements.add(args?.get(0) as String) + preparedStatement + } + "close" -> null + else -> defaultReturn(method.returnType) + } + }, + ) as Connection + } + + fun normalizedLast(): String = normalizeSql(statements.last()) + + private fun normalizeSql(sql: String): String = sql.trim().replace(Regex("\\s+"), " ") + + private fun defaultReturn(returnType: Class<*>): Any? { + return when { + returnType == Boolean::class.javaPrimitiveType -> false + returnType == Int::class.javaPrimitiveType -> 0 + returnType == Long::class.javaPrimitiveType -> 0L + returnType == Double::class.javaPrimitiveType -> 0.0 + returnType == Float::class.javaPrimitiveType -> 0f + returnType == Short::class.javaPrimitiveType -> 0.toShort() + returnType == Byte::class.javaPrimitiveType -> 0.toByte() + returnType == Void.TYPE -> null + returnType == Timestamp::class.java -> Timestamp(0L) + else -> null + } + } + } +} diff --git a/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/SqlExceptionFiltersTest.kt b/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/SqlExceptionFiltersTest.kt index 7e0f7db..a685b8d 100644 --- a/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/SqlExceptionFiltersTest.kt +++ b/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/SqlExceptionFiltersTest.kt @@ -1,10 +1,15 @@ package org.funfix.delayedqueue.jvm.internals.jdbc +import java.sql.BatchUpdateException import java.sql.SQLException import java.sql.SQLIntegrityConstraintViolationException import java.sql.SQLTransactionRollbackException import java.sql.SQLTransientConnectionException import org.funfix.delayedqueue.jvm.JdbcDriver +import org.funfix.delayedqueue.jvm.internals.jdbc.hsqldb.HSQLDBFilters +import org.funfix.delayedqueue.jvm.internals.jdbc.mariadb.MariaDBFilters +import org.funfix.delayedqueue.jvm.internals.jdbc.mssql.MSSQLFilters +import org.funfix.delayedqueue.jvm.internals.jdbc.sqlite.SQLiteFilters import org.junit.jupiter.api.Assertions.assertFalse import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Nested @@ -262,6 +267,84 @@ class SqlExceptionFiltersTest { } } + @Nested + inner class MariaDBFiltersTest { + @Test + fun `transientFailure should match transient exceptions`() { + val ex = SQLTransactionRollbackException("rollback") + assertTrue(MariaDBFilters.transientFailure.matches(ex)) + } + + @Test + fun `transientFailure should match MariaDB deadlock error code`() { + val ex = SQLException("Deadlock found", "40001", 1213) + assertTrue(MariaDBFilters.transientFailure.matches(ex)) + } + + @Test + fun `transientFailure should match MariaDB lock wait timeout error code`() { + val ex = SQLException("Lock wait timeout exceeded", "HY000", 1205) + assertTrue(MariaDBFilters.transientFailure.matches(ex)) + } + + @Test + fun `transientFailure should match MariaDB record changed error code`() { + val ex = SQLException("Record has changed since last read", "HY000", 1020) + assertTrue(MariaDBFilters.transientFailure.matches(ex)) + } + + @Test + fun `transientFailure should match record changed in cause chain`() { + val cause = SQLException("Record has changed since last read", "HY000", 1020) + val wrapper = RuntimeException("batch failed", cause) + assertTrue(MariaDBFilters.transientFailure.matches(wrapper)) + } + + @Test + fun `transientFailure should match record changed in nextException chain`() { + val next = SQLException("Record has changed since last read", "HY000", 1020) + val batch = BatchUpdateException("batch failed", IntArray(0)) + batch.nextException = next + assertTrue(MariaDBFilters.transientFailure.matches(batch)) + } + + @Test + fun `duplicateKey should match MariaDB error code 1062`() { + val ex = SQLException("Duplicate entry 'key1' for key 'PRIMARY'", "23000", 1062) + assertTrue(MariaDBFilters.duplicateKey.matches(ex)) + } + + @Test + fun `duplicateKey should match SQLIntegrityConstraintViolationException`() { + val ex = SQLIntegrityConstraintViolationException("constraint violation") + assertTrue(MariaDBFilters.duplicateKey.matches(ex)) + } + + @Test + fun `duplicateKey should match duplicate entry message`() { + val ex = SQLException("Duplicate entry '123' for key 'PRIMARY'") + assertTrue(MariaDBFilters.duplicateKey.matches(ex)) + } + + @Test + fun `duplicateKey should not match unrelated SQLException`() { + val ex = SQLException("Some other error") + assertFalse(MariaDBFilters.duplicateKey.matches(ex)) + } + + @Test + fun `invalidTable should match MariaDB error code 1146`() { + val ex = SQLException("Table 'testdb.my_table' doesn't exist", "42S02", 1146) + assertTrue(MariaDBFilters.invalidTable.matches(ex)) + } + + @Test + fun `objectAlreadyExists should match MariaDB error code 1050`() { + val ex = SQLException("Table 'my_table' already exists", "42S01", 1050) + assertTrue(MariaDBFilters.objectAlreadyExists.matches(ex)) + } + } + @Nested inner class FiltersForDriverTest { @Test @@ -281,5 +364,11 @@ class SqlExceptionFiltersTest { val filters = filtersForDriver(JdbcDriver.Sqlite) assertTrue(filters === SQLiteFilters) } + + @Test + fun `should return MariaDBFilters for MariaDB driver`() { + val filters = filtersForDriver(JdbcDriver.MariaDB) + assertTrue(filters === MariaDBFilters) + } } } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index f0451e7..7add54a 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -24,6 +24,7 @@ logback-classic = { module = "ch.qos.logback:logback-classic", version = "1.5.27 jdbc-hsqldb = { module = "org.hsqldb:hsqldb", version = "2.7.4" } jdbc-sqlite = { module = "org.xerial:sqlite-jdbc", version = "3.51.1.0" } jdbc-mssql = { module = "com.microsoft.sqlserver:mssql-jdbc", version = "12.4.2.jre11" } +jdbc-mariadb = { module = "org.mariadb.jdbc:mariadb-java-client", version = "3.5.3" } junit-bom = { module = "org.junit:junit-bom", version = "6.0.2" } junit-jupiter = { module = "org.junit.jupiter:junit-jupiter" } junit-platform-launcher = { module = "org.junit.platform:junit-platform-launcher" } @@ -31,3 +32,4 @@ junit-platform-launcher = { module = "org.junit.platform:junit-platform-launcher testcontainers-bom = { module = "org.testcontainers:testcontainers-bom", version = "2.0.3" } testcontainers-junit-jupiter = { module = "org.testcontainers:junit-jupiter", version = "1.21.4" } testcontainers-mssqlserver = { module = "org.testcontainers:testcontainers-mssqlserver" } +testcontainers-mariadb = { module = "org.testcontainers:testcontainers-mariadb" }