diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueueJDBC.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueueJDBC.kt index a4d740c..33adfa0 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueueJDBC.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/DelayedQueueJDBC.kt @@ -15,6 +15,7 @@ import org.funfix.delayedqueue.jvm.internals.jdbc.DBTableRow import org.funfix.delayedqueue.jvm.internals.jdbc.DBTableRowWithId import org.funfix.delayedqueue.jvm.internals.jdbc.HSQLDBMigrations import org.funfix.delayedqueue.jvm.internals.jdbc.MigrationRunner +import org.funfix.delayedqueue.jvm.internals.jdbc.MsSqlServerMigrations import org.funfix.delayedqueue.jvm.internals.jdbc.RdbmsExceptionFilters import org.funfix.delayedqueue.jvm.internals.jdbc.SQLVendorAdapter import org.funfix.delayedqueue.jvm.internals.jdbc.filtersForDriver @@ -608,7 +609,8 @@ private constructor( val migrations = when (config.db.driver) { JdbcDriver.HSQLDB -> HSQLDBMigrations.getMigrations(config.tableName) - JdbcDriver.MsSqlServer, + JdbcDriver.MsSqlServer -> + MsSqlServerMigrations.getMigrations(config.tableName) JdbcDriver.Sqlite -> throw UnsupportedOperationException( "Database ${config.db.driver} not yet supported" diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/MsSqlServerMigrations.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/MsSqlServerMigrations.kt new file mode 100644 index 0000000..cd0e972 --- /dev/null +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/MsSqlServerMigrations.kt @@ -0,0 +1,55 @@ +package org.funfix.delayedqueue.jvm.internals.jdbc + +/** + * MS-SQL Server-specific migrations for the DelayedQueue table. + * + * Uses MS-SQL data types: + * - `NVARCHAR` for Unicode string columns + * - `DATETIMEOFFSET` for timezone-aware timestamps + * - `BIGINT IDENTITY(1,1)` for auto-increment + * - Composite primary key on `(pKey, pKind)` with a separate unique index on `id` + */ +internal object MsSqlServerMigrations { + /** + * Gets the list of migrations for MS-SQL Server. + * + * @param tableName The name of the delayed queue table + * @return List of migrations in order + */ + fun getMigrations(tableName: String): List = + listOf( + Migration.createTableIfNotExists( + tableName = tableName, + sql = + """ + CREATE TABLE $tableName ( + id BIGINT IDENTITY(1,1) NOT NULL, + pKey NVARCHAR(200) NOT NULL, + pKind NVARCHAR(100) NOT NULL, + payload NVARCHAR(MAX) NOT NULL, + scheduledAt DATETIMEOFFSET NOT NULL, + scheduledAtInitially DATETIMEOFFSET NOT NULL, + lockUuid VARCHAR(36) NULL, + createdAt DATETIMEOFFSET NOT NULL, + PRIMARY KEY (pKey, pKind) + ); + + CREATE UNIQUE INDEX ${tableName}__IdUniqueIndex + ON $tableName (id); + + CREATE INDEX ${tableName}__ScheduledAtIndex + ON $tableName (scheduledAt); + + CREATE INDEX ${tableName}__KindPlusScheduledAtIndex + ON $tableName (pKind, scheduledAt); + + CREATE INDEX ${tableName}__CreatedAtIndex + ON $tableName (createdAt); + + CREATE INDEX ${tableName}__LockUuidPlusIdIndex + ON $tableName (lockUuid, id); + """ + .trimIndent(), + ) + ) +} diff --git a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/SQLVendorAdapter.kt b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/SQLVendorAdapter.kt index 88723b0..53145ae 100644 --- a/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/SQLVendorAdapter.kt +++ b/delayedqueue-jvm/src/main/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/SQLVendorAdapter.kt @@ -123,7 +123,7 @@ internal sealed class SQLVendorAdapter(val driver: JdbcDriver, protected val tab } /** Selects one row by its key. */ - fun selectByKey(connection: Connection, kind: String, key: String): DBTableRowWithId? { + open fun selectByKey(connection: Connection, kind: String, key: String): DBTableRowWithId? { val sql = """ SELECT id, pKey, pKind, payload, scheduledAt, scheduledAtInitially, lockUuid, createdAt @@ -311,7 +311,7 @@ internal sealed class SQLVendorAdapter(val driver: JdbcDriver, protected val tab ): DBTableRowWithId? /** Selects all messages with a specific lock UUID. */ - fun selectAllAvailableWithLock( + open fun selectAllAvailableWithLock( connection: Connection, lockUuid: String, count: Int, @@ -383,8 +383,8 @@ internal sealed class SQLVendorAdapter(val driver: JdbcDriver, protected val tab fun create(driver: JdbcDriver, tableName: String): SQLVendorAdapter = when (driver) { JdbcDriver.HSQLDB -> HSQLDBAdapter(driver, tableName) - JdbcDriver.MsSqlServer, - JdbcDriver.Sqlite -> TODO("MS-SQL and SQLite support not yet implemented") + JdbcDriver.MsSqlServer -> MsSqlServerAdapter(driver, tableName) + JdbcDriver.Sqlite -> TODO("SQLite support not yet implemented") } } } @@ -495,6 +495,169 @@ private class HSQLDBAdapter(driver: JdbcDriver, tableName: String) : } } +/** + * MS-SQL Server-specific adapter. + * + * Key differences from HSQLDB: + * - Uses `TOP N` instead of `LIMIT N` for row limiting + * - Uses `WITH (UPDLOCK)` for row-level locking during SELECT FOR UPDATE + * - Uses `WITH (UPDLOCK, READPAST)` for concurrent polling (skips locked rows) + * - Uses `IF NOT EXISTS ... INSERT` pattern for conditional insert + */ +private class MsSqlServerAdapter(driver: JdbcDriver, tableName: String) : + SQLVendorAdapter(driver, tableName) { + + override fun insertOneRow(connection: Connection, row: DBTableRow): Boolean { + val sql = + """ + IF NOT EXISTS ( + SELECT 1 FROM $tableName WHERE pKey = ? AND pKind = ? + ) + BEGIN + INSERT INTO $tableName + (pKey, pKind, payload, scheduledAt, scheduledAtInitially, createdAt) + VALUES (?, ?, ?, ?, ?, ?) + END + """ + + return connection.prepareStatement(sql).use { stmt -> + // Parameters for EXISTS check + stmt.setString(1, row.pKey) + stmt.setString(2, row.pKind) + // Parameters for INSERT + stmt.setString(3, row.pKey) + stmt.setString(4, row.pKind) + stmt.setString(5, row.payload) + stmt.setTimestamp(6, java.sql.Timestamp.from(row.scheduledAt)) + stmt.setTimestamp(7, java.sql.Timestamp.from(row.scheduledAtInitially)) + stmt.setTimestamp(8, java.sql.Timestamp.from(row.createdAt)) + stmt.executeUpdate() > 0 + } + } + + override fun selectForUpdateOneRow( + connection: Connection, + kind: String, + key: String, + ): DBTableRowWithId? { + val sql = + """ + SELECT TOP 1 + id, pKey, pKind, payload, scheduledAt, scheduledAtInitially, lockUuid, createdAt + FROM $tableName + WITH (UPDLOCK) + WHERE pKey = ? AND pKind = ? + """ + + return connection.prepareStatement(sql).use { stmt -> + stmt.setString(1, key) + stmt.setString(2, kind) + stmt.executeQuery().use { rs -> if (rs.next()) rs.toDBTableRowWithId() else null } + } + } + + override fun selectByKey(connection: Connection, kind: String, key: String): DBTableRowWithId? { + val sql = + """ + SELECT TOP 1 + id, pKey, pKind, payload, scheduledAt, scheduledAtInitially, lockUuid, createdAt + FROM $tableName + WHERE pKey = ? AND pKind = ? + """ + + return connection.prepareStatement(sql).use { stmt -> + stmt.setString(1, key) + stmt.setString(2, kind) + stmt.executeQuery().use { rs -> if (rs.next()) rs.toDBTableRowWithId() else null } + } + } + + override fun selectFirstAvailableWithLock( + connection: Connection, + kind: String, + now: Instant, + ): DBTableRowWithId? { + val sql = + """ + SELECT TOP 1 + id, pKey, pKind, payload, scheduledAt, scheduledAtInitially, lockUuid, createdAt + FROM $tableName + WITH (UPDLOCK, READPAST) + WHERE pKind = ? AND scheduledAt <= ? + ORDER BY scheduledAt + """ + + return connection.prepareStatement(sql).use { stmt -> + stmt.setString(1, kind) + stmt.setTimestamp(2, java.sql.Timestamp.from(now)) + stmt.executeQuery().use { rs -> if (rs.next()) rs.toDBTableRowWithId() else null } + } + } + + override fun selectAllAvailableWithLock( + connection: Connection, + lockUuid: String, + count: Int, + offsetId: Long?, + ): List { + val offsetClause = offsetId?.let { "AND id > ?" } ?: "" + val sql = + """ + SELECT TOP $count + id, pKey, pKind, payload, scheduledAt, scheduledAtInitially, lockUuid, createdAt + FROM $tableName + WHERE lockUuid = ? $offsetClause + ORDER BY id + """ + + return connection.prepareStatement(sql).use { stmt -> + stmt.setString(1, lockUuid) + offsetId?.let { stmt.setLong(2, it) } + stmt.executeQuery().use { rs -> + val results = mutableListOf() + while (rs.next()) { + results.add(rs.toDBTableRowWithId()) + } + results + } + } + } + + override fun acquireManyOptimistically( + connection: Connection, + kind: String, + limit: Int, + lockUuid: String, + timeout: Duration, + now: Instant, + ): Int { + require(limit > 0) { "Limit must be > 0" } + val expireAt = now.plus(timeout) + + val sql = + """ + UPDATE $tableName + SET lockUuid = ?, + scheduledAt = ? + WHERE id IN ( + SELECT TOP $limit id + FROM $tableName + WITH (UPDLOCK, READPAST) + WHERE pKind = ? AND scheduledAt <= ? + ORDER BY scheduledAt + ) + """ + + return connection.prepareStatement(sql).use { stmt -> + stmt.setString(1, lockUuid) + stmt.setTimestamp(2, java.sql.Timestamp.from(expireAt)) + stmt.setString(3, kind) + stmt.setTimestamp(4, java.sql.Timestamp.from(now)) + stmt.executeUpdate() + } + } +} + /** Extension function to convert ResultSet to DBTableRowWithId. */ private fun ResultSet.toDBTableRowWithId(): DBTableRowWithId = DBTableRowWithId( diff --git a/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/MsSqlServerAdapterTest.kt b/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/MsSqlServerAdapterTest.kt new file mode 100644 index 0000000..43ca230 --- /dev/null +++ b/delayedqueue-jvm/src/test/kotlin/org/funfix/delayedqueue/jvm/internals/jdbc/MsSqlServerAdapterTest.kt @@ -0,0 +1,175 @@ +package org.funfix.delayedqueue.jvm.internals.jdbc + +import org.funfix.delayedqueue.jvm.JdbcDriver +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Nested +import org.junit.jupiter.api.Test + +class MsSqlServerAdapterTest { + + @Test + fun `create returns MsSqlServerAdapter for MsSqlServer driver`() { + val adapter = SQLVendorAdapter.create(JdbcDriver.MsSqlServer, "test_table") + assertNotNull(adapter) + assertEquals(JdbcDriver.MsSqlServer, adapter.driver) + } + + @Test + fun `create returns HSQLDBAdapter for HSQLDB driver`() { + val adapter = SQLVendorAdapter.create(JdbcDriver.HSQLDB, "test_table") + assertNotNull(adapter) + assertEquals(JdbcDriver.HSQLDB, adapter.driver) + } + + @Test + fun `create throws for Sqlite driver`() { + val ex = + org.junit.jupiter.api.assertThrows { + SQLVendorAdapter.create(JdbcDriver.Sqlite, "test_table") + } + assertTrue(ex.message!!.contains("SQLite")) + } +} + +class MsSqlServerMigrationsTest { + + @Test + fun `getMigrations returns non-empty list`() { + val migrations = MsSqlServerMigrations.getMigrations("delayed_queue") + assertTrue(migrations.isNotEmpty()) + } + + @Test + fun `getMigrations returns single table creation migration`() { + val migrations = MsSqlServerMigrations.getMigrations("delayed_queue") + assertEquals(1, migrations.size) + } + + @Nested + inner class MigrationSqlTest { + private val tableName = "my_queue" + private val sql = MsSqlServerMigrations.getMigrations(tableName).first().sql + + @Test + fun `sql contains CREATE TABLE`() { + assertTrue(sql.contains("CREATE TABLE $tableName"), "Expected CREATE TABLE in SQL") + } + + @Test + fun `sql uses BIGINT IDENTITY for id column`() { + assertTrue( + sql.contains("BIGINT IDENTITY(1,1)"), + "Expected BIGINT IDENTITY(1,1) for auto-increment", + ) + } + + @Test + fun `sql uses NVARCHAR for pKey`() { + assertTrue(sql.contains("pKey NVARCHAR(200)"), "Expected NVARCHAR(200) for pKey") + } + + @Test + fun `sql uses NVARCHAR for pKind`() { + assertTrue(sql.contains("pKind NVARCHAR(100)"), "Expected NVARCHAR(100) for pKind") + } + + @Test + fun `sql uses NVARCHAR MAX for payload`() { + assertTrue(sql.contains("payload NVARCHAR(MAX)"), "Expected NVARCHAR(MAX) for payload") + } + + @Test + fun `sql uses DATETIMEOFFSET for scheduledAt`() { + assertTrue( + sql.contains("scheduledAt DATETIMEOFFSET"), + "Expected DATETIMEOFFSET for scheduledAt", + ) + } + + @Test + fun `sql uses DATETIMEOFFSET for scheduledAtInitially`() { + assertTrue( + sql.contains("scheduledAtInitially DATETIMEOFFSET"), + "Expected DATETIMEOFFSET for scheduledAtInitially", + ) + } + + @Test + fun `sql uses DATETIMEOFFSET for createdAt`() { + assertTrue( + sql.contains("createdAt DATETIMEOFFSET"), + "Expected DATETIMEOFFSET for createdAt", + ) + } + + @Test + fun `sql uses VARCHAR 36 for lockUuid`() { + assertTrue(sql.contains("lockUuid VARCHAR(36)"), "Expected VARCHAR(36) for lockUuid") + } + + @Test + fun `sql has composite primary key on pKey and pKind`() { + assertTrue( + sql.contains("PRIMARY KEY (pKey, pKind)"), + "Expected composite PRIMARY KEY (pKey, pKind)", + ) + } + + @Test + fun `sql creates unique index on id`() { + assertTrue( + sql.contains("CREATE UNIQUE INDEX ${tableName}__IdUniqueIndex"), + "Expected unique index on id", + ) + } + + @Test + fun `sql creates index on scheduledAt`() { + assertTrue( + sql.contains("CREATE INDEX ${tableName}__ScheduledAtIndex"), + "Expected index on scheduledAt", + ) + } + + @Test + fun `sql creates composite index on pKind and scheduledAt`() { + assertTrue( + sql.contains("CREATE INDEX ${tableName}__KindPlusScheduledAtIndex"), + "Expected composite index on pKind, scheduledAt", + ) + } + + @Test + fun `sql creates index on createdAt`() { + assertTrue( + sql.contains("CREATE INDEX ${tableName}__CreatedAtIndex"), + "Expected index on createdAt", + ) + } + + @Test + fun `sql creates composite index on lockUuid and id`() { + assertTrue( + sql.contains("CREATE INDEX ${tableName}__LockUuidPlusIdIndex"), + "Expected composite index on lockUuid, id", + ) + } + + @Test + fun `sql uses correct table name throughout`() { + val differentTableName = "custom_delayed_queue" + val customSql = MsSqlServerMigrations.getMigrations(differentTableName).first().sql + + assertTrue( + customSql.contains("CREATE TABLE $differentTableName"), + "Expected custom table name in CREATE TABLE", + ) + assertTrue( + customSql.contains("${differentTableName}__IdUniqueIndex"), + "Expected custom table name in index names", + ) + } + } +}