Skip to content

MSSQL Adapter

SametGoktepe edited this page Jun 18, 2026 · 1 revision

SQL Server (Azure SQL DB / Managed Instance / on-prem 2016 SP1+) outbox store and migration SQL. Mirrors the @eventferry/postgres and @eventferry/mysql adapters — same OutboxStore contract, T-SQL-flavored claim query and reaper, with the locking hints SQL Server actually needs under RCSI.


Install

npm i @eventferry/mssql @eventferry/core mssql

mssql (^10 || ^11 || ^12) is an optional peer dependency. The store is engine-aware but driver-agnostic at the type level — you wire your own pool.

Node.js 18+.


Why SQL Server 2016 SP1+ specifically

The whole design is sized to one engine baseline: SQL Server 2016 SP1 at compatibility level 130, or any current Azure SQL Database / Managed Instance. Five features land at that line and the adapter assumes every one of them:

  • OPENJSON GAmarkDone ships a single batched UPDATE against a JSON array of ids parsed by OPENJSON. Pre-2016 has no OPENJSON, and table-valued parameters are uglier.
  • Filtered-index WHERE clauses GA — the claim index (IX_<table>_claim_ready) is a filtered index on status IN (0, 3) so the planner only walks open rows. Without filtered indexes the index covers the whole table and the cost goes up linearly with retained done rows.
  • ISJSON CHECK constraintspayload and headers are NVARCHAR(MAX) CHECK (ISJSON(col) = 1). The engine rejects malformed JSON at write time, not at relay-publish time, so a bad call site fails the business transaction (loud) instead of poisoning the outbox (quiet).
  • DATETIME2 + SYSUTCDATETIME()claimed_at, next_retry_at, created_at, processed_at are all DATETIME2(3), defaulted to SYSUTCDATETIME(). The reaper uses DATEADD(MILLISECOND, …, SYSUTCDATETIME()) so the cutoff is computed server-side in UTC. Application clock skew is not in the loop.
  • Compatibility level 130 — required for OPENJSON parsing and the modern cardinality estimator the claim CTE was tuned against.

Older engines aren't blocked at install time, but they don't run the migration as-shipped. See Running on an older engine below for the documented fallback.


Migration

import { createMigrationSql } from "@eventferry/mssql";

await pool.request().batch(createMigrationSql("outbox"));

Signature:

createMigrationSql(
  table = "outbox",
  opts?: { schema?: string; useNativeJson?: boolean },
): string;

Emits a single batch with three independently guarded blocks:

  1. IF OBJECT_ID(N'[schema].[table]', N'U') IS NULLCREATE TABLE …
  2. IF NOT EXISTS (SELECT 1 FROM sys.indexes WHERE name = N'IX_<table>_agg_id_id' …)CREATE NONCLUSTERED INDEX …
  3. Same guarded CREATE for IX_<table>_claim_ready (filtered) and IX_<table>_done_processed_at.

Idempotency story. Each object is guarded independently, so a partial deployment (table created by a DBA script, indexes missed) is repaired by re-running the migration — no 2714 object already exists errors, no dropped data, no manual reconciliation.

schema option (default "dbo")

Both the OBJECT_ID guard and the CREATE TABLE/CREATE INDEX statements are fully [schema].[table]-qualified. Without schema qualification, a non-dbo default schema (Azure AD logins, contained DB users, AG per-app schemas) causes the guard to inspect dbo.outbox while CREATE lands in the connection's default schema — duplicate tables across schemas, or a 2714 on the second run. Matches Postgres's public default semantically.

schema and table are validated by assertIdent (/^[a-zA-Z_][a-zA-Z0-9_]{0,99}$/) before interpolation into brackets. The 100-char length cap exists so embedded constraint names like CK_<table>_payload_json fit within SQL Server's 128-char object name limit.

// Default — table "outbox" in schema "dbo".
createMigrationSql();

// Non-default schema, NVARCHAR(MAX) + ISJSON CHECK columns (the default).
createMigrationSql("outbox", { schema: "messaging" });

// SQL Server 2025+ native json column type, opt-in.
createMigrationSql("outbox", { schema: "messaging", useNativeJson: true });

useNativeJson opt-in (default false)

Flag Payload / headers type CHECK constraint Engine support
false (default) NVARCHAR(MAX) CHECK (ISJSON(col) = 1) SQL Server 2016 SP1+, Azure SQL DB, MI
true json omitted (rejected on json) SQL Server 2025+, current Azure SQL DB

NVARCHAR(MAX) + ISJSON covers every engine in the support matrix and validation is engine-enforced. The wire behaviour is identical to native json — TDS still serialises as NVARCHAR(MAX) on read, so the JS code path is JSON.stringify in, JSON.parse out either way. Native json is a storage-format and engine-internal-parser optimisation, not a wire change.

Enable useNativeJson: true only when all of your environments (dev, CI, staging, prod, DR) are SQL Server 2025+ or current Azure SQL DB. Migrating the column type later requires downtime.


What the table looks like

Column SQL type Nullable Purpose
id BIGINT IDENTITY(1,1) PRIMARY KEY CLUSTERED no Monotonic per-row id. Returned to JS as a string (see below).
message_id NVARCHAR(64) NOT NULL no Idempotency key for the broker side. Read as a normal JS string.
topic NVARCHAR(255) NOT NULL no Destination topic.
aggregate_type NVARCHAR(255) NOT NULL no Aggregate kind, e.g. "order".
aggregate_id NVARCHAR(255) NOT NULL no Strict ordering key. Claims are head-of-aggregate.
partition_key NVARCHAR(255) yes Broker-side partitioner input. Defaults to aggregate_id in the publisher.
payload NVARCHAR(MAX) or json no Event body. ISJSON = 1 CHECK when NVARCHAR(MAX).
headers NVARCHAR(MAX) or json no Wire headers. Same CHECK story.
trace_id NVARCHAR(255) yes W3C traceparent id captured at enqueue time.
status TINYINT NOT NULL DEFAULT 0 no 0 = pending, 1 = claimed, 2 = done, 3 = failed, 4 = dead.
attempts INT NOT NULL DEFAULT 0 no Retry counter. Compared against maxAttempts.
claimed_at DATETIME2(3) yes Visibility-timeout anchor. Set on claim; reaper compares against SYSUTCDATETIME().
next_retry_at DATETIME2(3) yes Re-claim eligibility for failed rows. NULL with status = 3 is a TypeError at the store boundary.
created_at DATETIME2(3) NOT NULL DEFAULT SYSUTCDATETIME() no Enqueue timestamp (UTC, server clock).
processed_at DATETIME2(3) yes Set when status transitions to done or dead. Drives the retention index.

Indexes:

  • IX_<table>_agg_id_id on (aggregate_id, id) — head-of-aggregate NOT EXISTS probe.
  • IX_<table>_claim_ready filtered on WHERE status IN (0, 3) — claim scan, only over open rows.
  • IX_<table>_done_processed_at on (status, processed_at) — retention scan in purgeDone.

The clustered B-tree PK is part of the contract. Do not replace it with a clustered columnstore — see below.


MssqlStore

import { MssqlStore } from "@eventferry/mssql";

const store = new MssqlStore({
  pool,                       // mssql ConnectionPool, already connected
  table: "outbox",
  schema: "dbo",
  claimTimeoutMs: 60_000,
  claimFailedOnly: false,
});

The pool must be a connected mssql.ConnectionPool. The store does not call .connect() for you, and it does not own the pool lifecycle. Two preconditions you must satisfy before constructing the store:

1. Attach pool.on("error", …) first

// CRITICAL: attach BEFORE constructing the store.
pool.on("error", (err) => {
  // wire to your logger / metrics
  console.error("[mssql pool]", err);
});

const store = new MssqlStore({ pool });

mssql emits error for connection-level failures: TDS resets, transient Azure SQL drops, mid-query disconnects. Without a listener, Node's default behaviour for an unhandled emitter error is to crash the whole process. The store deliberately does not attach this listener — the pool lifecycle belongs to you, and attaching from a library would silently swallow errors your monitoring stack expects to see.

This is the #1 cause of "the relay process keeps dying overnight" reports. Attach the listener.

2. Set requestTimeout above your expected claim latency

const pool = await new sql.ConnectionPool({
  // …connection fields…
  requestTimeout: 30_000,   // 2x your worst claim p99
}).connect();

The mssql default requestTimeout is 15 seconds. The claim batch runs a multi-statement BEGIN TRY / BEGIN TRAN / … / COMMIT block with a TABLE variable, the head-of-aggregate NOT EXISTS probe, and an OUTPUT INTO @claimed write — under contention on large outboxes this exceeds 15s.

If requestTimeout fires mid-claim:

  • The server's TRY/CATCH rolls back the transaction (safe — no torn state).
  • The client receives an abort error.
  • The relay re-claims on the next tick, taking real on-call latency hit.

Rule of thumb: requestTimeout >= 2 × observed claim p99. For the default claimBatchSize, 30–60s is a good starting point.


Enqueue inside your transaction

import * as sql from "mssql";

const tx = new sql.Transaction(pool);
await tx.begin();
try {
  // …business changes on `tx`…
  await new sql.Request(tx)
    .input("id", sql.NVarChar(64), order.id)
    .query("INSERT INTO orders (id, total) VALUES (@id, @total)");

  await store.enqueue(tx, {
    topic: "orders.created",
    aggregateType: "order",
    aggregateId: order.id,
    payload: { orderId: order.id, total: order.total },
    headers: { "x-tenant": tenantId },
    key: order.customerId,
  });

  await tx.commit();
} catch (err) {
  await tx.rollback();
  throw err;
}

The tx you pass to enqueue is your own mssql.Transaction. The outbox row commits or rolls back atomically with the business change — that's the whole point of the outbox pattern, and the store enforces it by refusing to take a raw ConnectionPool or Request at the enqueue boundary.


The claim query

WITH cte AS (
  SELECT TOP (@batch) o.id
  FROM   [schema].[outbox] o WITH (READCOMMITTEDLOCK, READPAST, UPDLOCK, ROWLOCK)
  WHERE  ( o.status = 0
        OR (o.status = 3 AND o.next_retry_at <= SYSUTCDATETIME()) )
    AND  ( o.claimed_at IS NULL
        OR o.claimed_at < DATEADD(MILLISECOND, -@timeoutMs, SYSUTCDATETIME()) )
    AND NOT EXISTS (
      SELECT 1
      FROM   [schema].[outbox] e WITH (READCOMMITTEDLOCK)
      WHERE  e.aggregate_id = o.aggregate_id
        AND  e.id < o.id
        AND  e.status IN (1, 3)
    )
  ORDER BY o.aggregate_id, o.id
)
UPDATE o
SET    o.status = 1, o.claimed_at = SYSUTCDATETIME()
OUTPUT inserted.id, inserted.message_id, … INTO @claimed
FROM   [schema].[outbox] o
JOIN   cte ON cte.id = o.id;

Four locking hints on the outer scan, one on the inner probe. Each one is load-bearing:

Hint Why it's there
READCOMMITTEDLOCK On an RCSI database (READ_COMMITTED_SNAPSHOT ON — the Azure SQL DB default), READ COMMITTED is snapshot-based and READPAST/UPDLOCK are silently no-ops; this hint re-asserts locking semantics for this statement.
READPAST Skip rows another claimer currently holds an X/U lock on. Required so concurrent relays don't serialize.
UPDLOCK Take a U-lock at scan time, converted to X at update time. Closes the lost-update race between the CTE's SELECT phase and the UPDATE phase.
ROWLOCK Disable lock escalation. Mandatory with READPAST — page locks make READPAST silently degrade and concurrent relays serialize.

Why the inner NOT EXISTS deliberately does NOT carry READPAST. If it did, sibling claimers would skip each other's locked earlier rows, the probe would return TRUE incorrectly, and a later row of the same aggregate would get claimed while an earlier row was still in flight — breaking the head-of-aggregate invariant the design promises. Plain READCOMMITTEDLOCK makes the inner probe briefly block on a competitor's UPDLOCK and see the committed status afterwards. That's the correct behaviour.

Why server-side DATEADD(MILLISECOND, …, SYSUTCDATETIME()). The reaper cutoff is computed by the database, in UTC, against the database's own clock. If the cutoff were computed in TypeScript and bound as a parameter, application server clock skew (NTP drift, container time drift, multi-region replicas) would either stall the reaper or trigger it too eagerly. The cutoff lives where the clock lives.

Do NOT use clustered columnstore on the outbox table

ROWLOCK is not compatible with clustered columnstore indexes — CCI is delta-store + row-group based and ignores row-granularity hints. Put a CCI on the outbox table and the claim CTE's locking degrades to row-group level, READPAST no longer skips correctly, and concurrent relays serialize or double-claim. The default PRIMARY KEY CLUSTERED (id) B-tree the migration emits is exactly what this design requires. Don't change it.

Do NOT override session isolation

Do not set SET TRANSACTION ISOLATION LEVEL SERIALIZABLE or REPEATABLE READ at the session level for connections the store uses, and don't install a setup hook that does. The READCOMMITTEDLOCK table hint is precisely tuned to give the right locking semantics on both locking-RC and RCSI databases. Stronger session isolation layers key-range locks on top, serializing the claim path and defeating READPAST.

Default RC / RCSI is what you want; the hint handles the rest.


BIGINT id is a string — never Number(record.id)

tedious (the driver underneath mssql) returns SQL BIGINT columns as JavaScript strings. See tedious/lib/value-parser.js: value.toString(). This is intentional — JS number is IEEE-754 double, safe-integer ceiling 2^53 - 1. A long-running outbox crosses 2^53 after a few quadrillion rows, and Number(row.id) silently corrupts ids past that boundary.

MssqlStore keeps OutboxRecord.id as string end-to-end. Two rules:

  • Never Number(record.id) in hooks, logs, metrics labels, anywhere. Compare ids as strings.
  • Never bind it as sql.Int. Bind as sql.NVarChar(64) or sql.BigInt.

If you need it as a number for a UI, parse at the very edge and accept the truncation risk explicitly. The message_id column is NVARCHAR(64) and is read as a normal JS string — only BIGINT columns get the string treatment.


claimTimeoutMs — default 60s, 24h ceiling

Visibility timeout for claimed rows. After this many milliseconds without an ack, the reaper considers the row stuck (relay crashed, connection died mid-publish) and reclaims it.

  • Default: 60_000 ms.
  • Ceiling: 86_400_000 ms (24h). The constructor throws if you exceed it.

Two reasons for the cap:

  1. Any value higher than 24h is almost always misconfiguration. There is no real 25-hour publish — what exists is a copy-pasted claimTimeoutMs: 30 * 24 * 60 * 60 * 1000.
  2. The parameter binds as sql.Int, which is signed 32-bit (max 2^31 - 1 ≈ 2,147,483,647 ms ≈ 24.85 days). The 24h cap leaves a safe buffer before overflow and a clear error message instead of an arithmetic surprise.

markFailed(id, null, "failed") — runtime TypeError

The store throws TypeError on markFailed(id, null, "failed"). This combination would set status = 3 (failed) with next_retry_at = NULL, which the claim predicate treats as due now — producing an instant-redrive hot loop that hammers the broker until the row hits maxAttempts.

The cross-adapter contract (Postgres/MySQL) currently allows this combination but it's still a footgun there. The MSSQL adapter enforces it at runtime to prevent the hot loop outright.

markFailed(id, null, "dead") is allowed — dead(4) rows are not picked up by the claim predicate, so a NULL next_retry_at has no instant-redrive hazard.


Waker: polling only in v1

The polling claim loop has no native low-latency waker in v1. The two viable SQL Server primitives both have hard problems:

  • Service Broker WAITFOR (RECEIVE …) ties up a pool connection indefinitely waiting for a message. Pool sizing becomes mandatory and the maintained Node bindings for Service Broker are abandoned. This is the intended v2 path, but it needs a dedicated-connection pool concept that doesn't exist in mssql today.
  • CDC streaming is the right log-based source but requires SQL Server Agent (so unavailable on Azure SQL Database — see next section).

For v1, tune pollIntervalMs down (e.g. 100) for sub-second latency, or use a different adapter (@eventferry/postgres ships LISTEN/NOTIFY; @eventferry/mysql ships a binlog relay).


CDC streaming relay: deferred to v2 package

A SQL Server CDC streaming relay (the equivalent of MysqlBinlogRelay or PostgresStreamingRelay) is deferred to a separate @eventferry/mssql-cdc-relay package. Two reasons it's not bundled:

  1. CDC is unavailable on Azure SQL Database — it requires SQL Server Agent, which Azure SQL Database does not provide. Bundling it would force every consumer of @eventferry/mssql to install a dep they cannot use on a major target deployment.
  2. CDC capture needs cdc.fn_cdc_get_all_changes_<capture_instance>, change-table polling, LSN bookkeeping, and a position-persistence contract that's substantively different from the polling claim loop. It's its own package.

If you need log-based streaming on SQL Server today: on-prem / SQL MI → wait for @eventferry/mssql-cdc-relay, or tune the polling interval down on @eventferry/mssql.


purgeDone retention

// 7 days
await store.purgeDone({ olderThanMs: 7 * 24 * 60 * 60 * 1000 });

// 30 days, soft-capped at 100k rows per invocation
await store.purgeDone({
  olderThanMs: 30 * 24 * 60 * 60 * 1000,
  batchSize: 1_000,
  maxRows: 100_000,
});

The cutoff is computed in TypeScript as new Date(Date.now() - opts.olderThanMs) and bound as sql.DateTime2(3). This deliberately sidesteps sql.Int + DATEADD(MILLISECOND, …), which is signed 32-bit and overflows at ~24.85 days — exactly the 30/60/90-day retention configurations every team writes first.

maxRows is a soft cap: the loop terminates after the iteration that crosses it, so actual deletion may exceed maxRows by up to batchSize - 1 (parity with the Postgres and MySQL adapters).

Schedule from cron / a sidecar.


Running on an older engine

SQL Server 2014 / 2012 has the locking primitives (READPAST, UPDLOCK, ROWLOCK, READCOMMITTEDLOCK) but no ISJSON and no OPENJSON. The shipped migration and markDone path won't run as-is. Two surgical changes get you running:

  1. Hand-roll the migration. Take createMigrationSql() output and remove the CHECK (ISJSON(col) = 1) clauses on payload and headers. You give up engine-side JSON validation — push that check into your application boundary instead. Everything else (the table shape, the filtered index WHERE clause, DATETIME2(3), SYSUTCDATETIME()) was already available in 2012.
  2. Rewrite markDone without OPENJSON. The shipped implementation parses a JSON array of ids in one UPDATE. Without OPENJSON, replace it with either:
    • a T-SQL loop that issues a single-id UPDATE per row in the batch (simple, slightly chattier), or
    • a chained UPDATE with a derived (VALUES (id1), (id2), …) table expression generated from the batch (fewer round-trips, but you stitch the SQL string per call).

Both approaches preserve atomicity within the relay's per-batch contract. The locking and claim semantics are unchanged — you're only changing the shape of the ack write.

If you're stuck on older than 2012, the right move is to upgrade rather than patch the adapter further.


Observability

Same Logger + hook interface as the other adapters:

const relay = new Relay({
  store,
  publisher,
  logger,
  hooks: {
    onBatchClaimed: (n) => metrics.gauge("outbox.claim_size", n),
    onFailed: (record, err, willRetry) =>
      metrics.counter("outbox.failed", 1, { willRetry }),
  },
});

Full coverage → Observability.


What's next

Clone this wiki locally