Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.funfix.delayedqueue.jvm.internals.jdbc.DBTableRow
import org.funfix.delayedqueue.jvm.internals.jdbc.DBTableRowWithId
import org.funfix.delayedqueue.jvm.internals.jdbc.HSQLDBMigrations
import org.funfix.delayedqueue.jvm.internals.jdbc.MigrationRunner
import org.funfix.delayedqueue.jvm.internals.jdbc.MsSqlServerMigrations
import org.funfix.delayedqueue.jvm.internals.jdbc.RdbmsExceptionFilters
import org.funfix.delayedqueue.jvm.internals.jdbc.SQLVendorAdapter
import org.funfix.delayedqueue.jvm.internals.jdbc.filtersForDriver
Expand Down Expand Up @@ -608,7 +609,8 @@ private constructor(
val migrations =
when (config.db.driver) {
JdbcDriver.HSQLDB -> HSQLDBMigrations.getMigrations(config.tableName)
JdbcDriver.MsSqlServer,
JdbcDriver.MsSqlServer ->
MsSqlServerMigrations.getMigrations(config.tableName)
JdbcDriver.Sqlite ->
throw UnsupportedOperationException(
"Database ${config.db.driver} not yet supported"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.funfix.delayedqueue.jvm.internals.jdbc

/**
* MS-SQL Server-specific migrations for the DelayedQueue table.
*
* Uses MS-SQL data types:
* - `NVARCHAR` for Unicode string columns
* - `DATETIMEOFFSET` for timezone-aware timestamps
* - `BIGINT IDENTITY(1,1)` for auto-increment
* - Composite primary key on `(pKey, pKind)` with a separate unique index on `id`
*/
internal object MsSqlServerMigrations {
/**
* Gets the list of migrations for MS-SQL Server.
*
* @param tableName The name of the delayed queue table
* @return List of migrations in order
*/
fun getMigrations(tableName: String): List<Migration> =
listOf(
Migration.createTableIfNotExists(
tableName = tableName,
sql =
"""
CREATE TABLE $tableName (
id BIGINT IDENTITY(1,1) NOT NULL,
pKey NVARCHAR(200) NOT NULL,
pKind NVARCHAR(100) NOT NULL,
payload NVARCHAR(MAX) NOT NULL,
scheduledAt DATETIMEOFFSET NOT NULL,
scheduledAtInitially DATETIMEOFFSET NOT NULL,
lockUuid VARCHAR(36) NULL,
createdAt DATETIMEOFFSET NOT NULL,
PRIMARY KEY (pKey, pKind)
);

CREATE UNIQUE INDEX ${tableName}__IdUniqueIndex
ON $tableName (id);

CREATE INDEX ${tableName}__ScheduledAtIndex
ON $tableName (scheduledAt);

CREATE INDEX ${tableName}__KindPlusScheduledAtIndex
ON $tableName (pKind, scheduledAt);

CREATE INDEX ${tableName}__CreatedAtIndex
ON $tableName (createdAt);

CREATE INDEX ${tableName}__LockUuidPlusIdIndex
ON $tableName (lockUuid, id);
"""
.trimIndent(),
)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ internal sealed class SQLVendorAdapter(val driver: JdbcDriver, protected val tab
}

/** Selects one row by its key. */
fun selectByKey(connection: Connection, kind: String, key: String): DBTableRowWithId? {
open fun selectByKey(connection: Connection, kind: String, key: String): DBTableRowWithId? {
val sql =
"""
SELECT id, pKey, pKind, payload, scheduledAt, scheduledAtInitially, lockUuid, createdAt
Expand Down Expand Up @@ -311,7 +311,7 @@ internal sealed class SQLVendorAdapter(val driver: JdbcDriver, protected val tab
): DBTableRowWithId?

/** Selects all messages with a specific lock UUID. */
fun selectAllAvailableWithLock(
open fun selectAllAvailableWithLock(
connection: Connection,
lockUuid: String,
count: Int,
Expand Down Expand Up @@ -383,8 +383,8 @@ internal sealed class SQLVendorAdapter(val driver: JdbcDriver, protected val tab
fun create(driver: JdbcDriver, tableName: String): SQLVendorAdapter =
when (driver) {
JdbcDriver.HSQLDB -> HSQLDBAdapter(driver, tableName)
JdbcDriver.MsSqlServer,
JdbcDriver.Sqlite -> TODO("MS-SQL and SQLite support not yet implemented")
JdbcDriver.MsSqlServer -> MsSqlServerAdapter(driver, tableName)
JdbcDriver.Sqlite -> TODO("SQLite support not yet implemented")
}
}
}
Expand Down Expand Up @@ -495,6 +495,169 @@ private class HSQLDBAdapter(driver: JdbcDriver, tableName: String) :
}
}

/**
* MS-SQL Server-specific adapter.
*
* Key differences from HSQLDB:
* - Uses `TOP N` instead of `LIMIT N` for row limiting
* - Uses `WITH (UPDLOCK)` for row-level locking during SELECT FOR UPDATE
* - Uses `WITH (UPDLOCK, READPAST)` for concurrent polling (skips locked rows)
* - Uses `IF NOT EXISTS ... INSERT` pattern for conditional insert
*/
private class MsSqlServerAdapter(driver: JdbcDriver, tableName: String) :
SQLVendorAdapter(driver, tableName) {

override fun insertOneRow(connection: Connection, row: DBTableRow): Boolean {
val sql =
"""
IF NOT EXISTS (
SELECT 1 FROM $tableName WHERE pKey = ? AND pKind = ?
)
BEGIN
INSERT INTO $tableName
(pKey, pKind, payload, scheduledAt, scheduledAtInitially, createdAt)
VALUES (?, ?, ?, ?, ?, ?)
END
"""

return connection.prepareStatement(sql).use { stmt ->
// Parameters for EXISTS check
stmt.setString(1, row.pKey)
stmt.setString(2, row.pKind)
// Parameters for INSERT
stmt.setString(3, row.pKey)
stmt.setString(4, row.pKind)
stmt.setString(5, row.payload)
stmt.setTimestamp(6, java.sql.Timestamp.from(row.scheduledAt))
stmt.setTimestamp(7, java.sql.Timestamp.from(row.scheduledAtInitially))
stmt.setTimestamp(8, java.sql.Timestamp.from(row.createdAt))
stmt.executeUpdate() > 0
}
}

override fun selectForUpdateOneRow(
connection: Connection,
kind: String,
key: String,
): DBTableRowWithId? {
val sql =
"""
SELECT TOP 1
id, pKey, pKind, payload, scheduledAt, scheduledAtInitially, lockUuid, createdAt
FROM $tableName
WITH (UPDLOCK)
WHERE pKey = ? AND pKind = ?
"""

return connection.prepareStatement(sql).use { stmt ->
stmt.setString(1, key)
stmt.setString(2, kind)
stmt.executeQuery().use { rs -> if (rs.next()) rs.toDBTableRowWithId() else null }
}
}

override fun selectByKey(connection: Connection, kind: String, key: String): DBTableRowWithId? {
val sql =
"""
SELECT TOP 1
id, pKey, pKind, payload, scheduledAt, scheduledAtInitially, lockUuid, createdAt
FROM $tableName
WHERE pKey = ? AND pKind = ?
"""

return connection.prepareStatement(sql).use { stmt ->
stmt.setString(1, key)
stmt.setString(2, kind)
stmt.executeQuery().use { rs -> if (rs.next()) rs.toDBTableRowWithId() else null }
}
}

override fun selectFirstAvailableWithLock(
connection: Connection,
kind: String,
now: Instant,
): DBTableRowWithId? {
val sql =
"""
SELECT TOP 1
id, pKey, pKind, payload, scheduledAt, scheduledAtInitially, lockUuid, createdAt
FROM $tableName
WITH (UPDLOCK, READPAST)
WHERE pKind = ? AND scheduledAt <= ?
ORDER BY scheduledAt
"""

return connection.prepareStatement(sql).use { stmt ->
stmt.setString(1, kind)
stmt.setTimestamp(2, java.sql.Timestamp.from(now))
stmt.executeQuery().use { rs -> if (rs.next()) rs.toDBTableRowWithId() else null }
}
}

override fun selectAllAvailableWithLock(
connection: Connection,
lockUuid: String,
count: Int,
offsetId: Long?,
): List<DBTableRowWithId> {
val offsetClause = offsetId?.let { "AND id > ?" } ?: ""
val sql =
"""
SELECT TOP $count
id, pKey, pKind, payload, scheduledAt, scheduledAtInitially, lockUuid, createdAt
FROM $tableName
WHERE lockUuid = ? $offsetClause
ORDER BY id
"""

return connection.prepareStatement(sql).use { stmt ->
stmt.setString(1, lockUuid)
offsetId?.let { stmt.setLong(2, it) }
stmt.executeQuery().use { rs ->
val results = mutableListOf<DBTableRowWithId>()
while (rs.next()) {
results.add(rs.toDBTableRowWithId())
}
results
}
}
}

override fun acquireManyOptimistically(
connection: Connection,
kind: String,
limit: Int,
lockUuid: String,
timeout: Duration,
now: Instant,
): Int {
require(limit > 0) { "Limit must be > 0" }
val expireAt = now.plus(timeout)

val sql =
"""
UPDATE $tableName
SET lockUuid = ?,
scheduledAt = ?
WHERE id IN (
SELECT TOP $limit id
FROM $tableName
WITH (UPDLOCK, READPAST)
WHERE pKind = ? AND scheduledAt <= ?
ORDER BY scheduledAt
)
"""

return connection.prepareStatement(sql).use { stmt ->
stmt.setString(1, lockUuid)
stmt.setTimestamp(2, java.sql.Timestamp.from(expireAt))
stmt.setString(3, kind)
stmt.setTimestamp(4, java.sql.Timestamp.from(now))
stmt.executeUpdate()
}
}
}

/** Extension function to convert ResultSet to DBTableRowWithId. */
private fun ResultSet.toDBTableRowWithId(): DBTableRowWithId =
DBTableRowWithId(
Expand Down
Loading