diff --git a/packages/bot-runner/tests/bot-runner-test.ts b/packages/bot-runner/tests/bot-runner-test.ts index a9804e8608..4acd4e3165 100644 --- a/packages/bot-runner/tests/bot-runner-test.ts +++ b/packages/bot-runner/tests/bot-runner-test.ts @@ -138,6 +138,7 @@ module('timeline handler', () => { close: async () => {}, getColumnNames: async () => [], withWriteLock: async (_url, fn) => fn(undefined), + withUserCostLock: async (_userId, fn) => fn(), } as DBAdapter; queuePublisher = { diff --git a/packages/bot-runner/tests/command-runner-test.ts b/packages/bot-runner/tests/command-runner-test.ts index b99178be42..eb918c50cf 100644 --- a/packages/bot-runner/tests/command-runner-test.ts +++ b/packages/bot-runner/tests/command-runner-test.ts @@ -99,6 +99,7 @@ module('command runner', () => { close: async () => {}, getColumnNames: async () => [], withWriteLock: async (_url, fn) => fn(undefined), + withUserCostLock: async (_userId, fn) => fn(), } as DBAdapter; let commandRunner = makeRunner(dbAdapter, queuePublisher, githubClient); @@ -245,6 +246,7 @@ module('command runner', () => { close: async () => {}, getColumnNames: async () => [], withWriteLock: async (_url, fn) => fn(undefined), + withUserCostLock: async (_userId, fn) => fn(), } as DBAdapter; let commandRunner = makeRunner(dbAdapter, queuePublisher, githubClient); @@ -467,6 +469,7 @@ module('command runner', () => { close: async () => {}, getColumnNames: async () => [], withWriteLock: async (_url, fn) => fn(undefined), + withUserCostLock: async (_userId, fn) => fn(), } as DBAdapter; let commandRunner = makeRunner(dbAdapter, queuePublisher, githubClient); @@ -563,6 +566,7 @@ module('command runner', () => { close: async () => {}, getColumnNames: async () => [], withWriteLock: async (_url, fn) => fn(undefined), + withUserCostLock: async (_userId, fn) => fn(), } as DBAdapter; let commandRunner = makeRunner(dbAdapter, queuePublisher, githubClient); @@ -702,6 +706,7 @@ module('command runner', () => { close: async () => {}, getColumnNames: async () => [], withWriteLock: async (_url, fn) => fn(undefined), + withUserCostLock: async (_userId, fn) => fn(), } as DBAdapter; let commandRunner = makeRunner(dbAdapter, queuePublisher, githubClient); @@ -916,6 +921,7 @@ module('command runner', () => { close: async () => {}, getColumnNames: async () => [], withWriteLock: async (_url, fn) => fn(undefined), + withUserCostLock: async (_userId, fn) => fn(), } as DBAdapter; let commandRunner = makeRunner(dbAdapter, queuePublisher, githubClient); @@ -1037,6 +1043,7 @@ module('command runner', () => { close: async () => {}, getColumnNames: async () => [], withWriteLock: async (_url, fn) => fn(undefined), + withUserCostLock: async (_userId, fn) => fn(), } as DBAdapter; let commandRunner = makeRunner(dbAdapter, queuePublisher, githubClient); diff --git a/packages/host/app/lib/sqlite-adapter.ts b/packages/host/app/lib/sqlite-adapter.ts index 26857a2106..6809187bd6 100644 --- a/packages/host/app/lib/sqlite-adapter.ts +++ b/packages/host/app/lib/sqlite-adapter.ts @@ -72,6 +72,16 @@ export default class SQLiteAdapter implements DBAdapter { return await fn(undefined); } + // SQLite has no cross-connection concurrency to coordinate, so the + // per-user cost-barrier lock is a passthrough. The PG implementation + // provides the real serialization across replicas. + async withUserCostLock( + _matrixUserId: string, + fn: () => Promise, + ): Promise { + return await fn(); + } + private async internalExecute(sql: string, opts?: ExecuteOptions) { sql = this.adjustSQL(sql); return await this.query(sql, opts); diff --git a/packages/postgres/pg-adapter.ts b/packages/postgres/pg-adapter.ts index c8feb7dfd5..f551ecd179 100644 --- a/packages/postgres/pg-adapter.ts +++ b/packages/postgres/pg-adapter.ts @@ -31,6 +31,20 @@ export function hashRealmUrlForAdvisoryLock(url: string): string { return digest.readBigInt64BE(0).toString(); } +// Lock key for the per-matrix-user "next request waits for prior cost to land" +// barrier (see PgAdapter.withUserCostLock). Namespaced so the user-cost lock +// space cannot collide with the realm-write lock space — a matrix user id is +// extremely unlikely to hash-collide with a realm URL even without the +// namespace (input shapes are disjoint), but the prefix makes the partition +// explicit at the key-derivation site rather than implicit in input formats. +export function hashUserIdForCostLock(matrixUserId: string): string { + const digest = createHash('sha256') + .update('cost-barrier:') + .update(matrixUserId) + .digest(); + return digest.readBigInt64BE(0).toString(); +} + const log = logger('pg-adapter'); type MigrationNameFixes = { @@ -110,6 +124,15 @@ export class PgAdapter implements DBAdapter { #notificationClient?: Client; #notificationClientStarting?: Promise; #channels = new Map(); + // In-process coalescer for the per-user cost-barrier. Multiple concurrent + // same-user callers in this process chain on the same in-memory promise so + // only ONE of them is actively waiting on the cross-replica advisory lock + // (and therefore pinning a pool connection) at a time. The advisory lock + // itself serializes the holder across replicas; the in-process map keeps + // the per-replica pool footprint bounded to one connection per active + // same-user user, not one per concurrent same-user request. See + // withUserCostLock for the full rationale. + #userCostQueue = new Map>(); constructor(opts?: { autoMigrate?: boolean; migrationLogging?: boolean }) { if (opts?.autoMigrate) { @@ -422,6 +445,88 @@ export class PgAdapter implements DBAdapter { fn: (txQuerier: Querier | undefined) => Promise, ): Promise { const lockKey = hashRealmUrlForAdvisoryLock(realmUrl); + return await this.#runWithAdvisoryXactLock(lockKey, realmUrl, fn); + } + + // Per-matrix-user serialization barrier for billable upstream proxy calls. + // Two concurrent requests from the same matrix user — including across + // replicas with no stickiness — must not both kick off an upstream call + // before the prior request's cost row has landed in the credits ledger. + // + // Two coordination layers compose: + // + // 1. In-process: `#userCostQueue` chains same-user callers within this + // process on an in-memory promise. Only the head of the chain is + // actively waiting on the DB lock; later callers wait in memory. + // 2. Cross-replica: `pg_advisory_xact_lock` on a namespaced hash of the + // matrix user id serializes holders across replicas. + // + // Pool-pressure budget: this is the realm-server's main pool (also used + // by indexing / federated-search), and the critical section spans the + // upstream LLM call (potentially tens of seconds on streaming). Without + // the in-process queue, N concurrent same-user requests landing on one + // replica would each pin a pool client while blocked on the advisory + // lock — that scales badly against the 40-client default and the + // indexer's 20-client baseline. With the queue, per-replica pool + // footprint is bounded to *one* pinned client per active same-user + // user, not per concurrent request. Across N replicas a single user's + // requests fan out to at most N pinned clients cluster-wide; per-replica + // count is invariant to per-user concurrency. + // + // Failure semantics: a prior caller's rejection does NOT cascade — the + // next caller's `await previous.catch(...)` swallows it so the chain + // marches on. Each caller's own error is surfaced via the returned + // promise. The advisory lock's own rollback/release semantics are the + // same as withWriteLock (xact-lock released on transaction abort, no + // stale-lock risk). + // + // The callback does NOT receive a `txQuerier` — the barrier only needs + // serialization, not transactional grouping of the work inside it. + // Inner DB calls (validateCredits, saveUsageCost) run via the shared + // dbAdapter on separate pool connections as today. + async withUserCostLock( + matrixUserId: string, + fn: () => Promise, + ): Promise { + const previous = this.#userCostQueue.get(matrixUserId) ?? Promise.resolve(); + const lockKey = hashUserIdForCostLock(matrixUserId); + const myWork = (async (): Promise => { + try { + await previous; + } catch { + // A prior caller's failure must not cascade — the next request + // in the queue should still get its turn at the lock. + } + return await this.#runWithAdvisoryXactLock( + lockKey, + `user-cost:${matrixUserId}`, + () => fn(), + ); + })(); + // What the NEXT same-user caller waits on: outcome-erased so its + // own try/catch isn't sensitive to ours. Tee'd off myWork to keep a + // single source of truth for completion timing. + const myCompletion: Promise = myWork.then( + () => undefined, + () => undefined, + ); + this.#userCostQueue.set(matrixUserId, myCompletion); + // Compact the map once the chain is idle. Only delete if no later + // caller has overwritten the tail — otherwise we'd unlink the chain + // and let a same-user race past it. + void myCompletion.finally(() => { + if (this.#userCostQueue.get(matrixUserId) === myCompletion) { + this.#userCostQueue.delete(matrixUserId); + } + }); + return await myWork; + } + + async #runWithAdvisoryXactLock( + lockKey: string, + contextLabel: string, + fn: (txQuerier: Querier) => Promise, + ): Promise { return await this.withConnection(async (queryFn) => { await queryFn(['BEGIN']); try { @@ -442,7 +547,7 @@ export class PgAdapter implements DBAdapter { // client release), so we don't have a stale-lock problem. Log // for visibility and rethrow the original error. log.warn( - `ROLLBACK after withWriteLock error for ${realmUrl} failed: ${String(rollbackErr)}`, + `ROLLBACK after advisory-lock error for ${contextLabel} failed: ${String(rollbackErr)}`, ); } throw err; diff --git a/packages/realm-server/handlers/handle-openrouter-passthrough.ts b/packages/realm-server/handlers/handle-openrouter-passthrough.ts index d3267e2a99..623ebb72f2 100644 --- a/packages/realm-server/handlers/handle-openrouter-passthrough.ts +++ b/packages/realm-server/handlers/handle-openrouter-passthrough.ts @@ -4,11 +4,7 @@ import { logger, SupportedMimeType } from '@cardstack/runtime-common'; import * as Sentry from '@sentry/node'; import { AllowedProxyDestinations } from '../lib/allowed-proxy-destinations'; -import { - awaitPendingCost, - handleStreamingRequest, - trackCostDeduction, -} from '../lib/proxy-forward'; +import { handleStreamingRequest } from '../lib/proxy-forward'; import { fetchRequestFromContext, sendResponseForBadRequest, @@ -95,77 +91,74 @@ export default function handleOpenRouterPassthrough({ return; } - try { - await awaitPendingCost(matrixUserId); - } catch (e) { - log.error('Error waiting for pending cost:', e); - await sendResponseForSystemError( + if (isStreaming && !destinationConfig.supportsStreaming) { + await sendResponseForBadRequest( ctxt, - 'There was an error saving your Boxel credits usage. Try again or contact support if the problem persists.', + 'Streaming is not supported for the OpenRouter passthrough', ); return; } - const creditValidation = - await destinationConfig.creditStrategy.validateCredits( - dbAdapter, - matrixUserId, - ); - if (!creditValidation.hasEnoughCredits) { - await sendResponseForForbiddenRequest( - ctxt, - creditValidation.errorMessage || 'Insufficient credits', - ); - return; - } + // Serialize concurrent requests from the same matrix user across + // replicas: the next request can't kick off another billable upstream + // call before the previous request's cost row has landed in the + // credits ledger. The lock is held through validate-credits → upstream + // call → save-cost; on streaming, save-cost happens inside + // handleStreamingRequest after the `[DONE]` marker. + await dbAdapter.withUserCostLock(matrixUserId, async () => { + const creditValidation = + await destinationConfig.creditStrategy.validateCredits( + dbAdapter, + matrixUserId, + ); + if (!creditValidation.hasEnoughCredits) { + await sendResponseForForbiddenRequest( + ctxt, + creditValidation.errorMessage || 'Insufficient credits', + ); + return; + } - const headers: Record = { - 'Content-Type': 'application/json', - Authorization: `Bearer ${destinationConfig.apiKey}`, - }; - const finalBody = JSON.stringify(openAIBody); + const headers: Record = { + 'Content-Type': 'application/json', + Authorization: `Bearer ${destinationConfig.apiKey}`, + }; + const finalBody = JSON.stringify(openAIBody); - if (isStreaming) { - if (!destinationConfig.supportsStreaming) { - await sendResponseForBadRequest( + if (isStreaming) { + await handleStreamingRequest( ctxt, - 'Streaming is not supported for the OpenRouter passthrough', + OPENROUTER_CHAT_URL, + 'POST', + headers, + finalBody, + destinationConfig, + dbAdapter, + matrixUserId, ); return; } - await handleStreamingRequest( - ctxt, - OPENROUTER_CHAT_URL, - 'POST', + + const externalResponse = await globalThis.fetch(OPENROUTER_CHAT_URL, { + method: 'POST', headers, - finalBody, - destinationConfig, + body: finalBody, + }); + const responseData = await externalResponse.json(); + + await destinationConfig.creditStrategy.saveUsageCost( dbAdapter, matrixUserId, + responseData, ); - return; - } - - const externalResponse = await globalThis.fetch(OPENROUTER_CHAT_URL, { - method: 'POST', - headers, - body: finalBody, - }); - const responseData = await externalResponse.json(); - - trackCostDeduction( - destinationConfig, - dbAdapter, - matrixUserId, - responseData, - ); - const response = new Response(JSON.stringify(responseData), { - status: externalResponse.status, - statusText: externalResponse.statusText, - headers: { 'content-type': SupportedMimeType.JSON }, + const response = new Response(JSON.stringify(responseData), { + status: externalResponse.status, + statusText: externalResponse.statusText, + headers: { 'content-type': SupportedMimeType.JSON }, + }); + await setContextResponse(ctxt, response); }); - await setContextResponse(ctxt, response); } catch (error) { log.error('Error in openrouter-passthrough handler:', error); Sentry.captureException(error); diff --git a/packages/realm-server/handlers/handle-request-forward.ts b/packages/realm-server/handlers/handle-request-forward.ts index 054b7a3c44..c3a9ddef25 100644 --- a/packages/realm-server/handlers/handle-request-forward.ts +++ b/packages/realm-server/handlers/handle-request-forward.ts @@ -9,11 +9,7 @@ import { fetchRequestFromContext, } from '../middleware'; import { AllowedProxyDestinations } from '../lib/allowed-proxy-destinations'; -import { - awaitPendingCost, - handleStreamingRequest, - trackCostDeduction, -} from '../lib/proxy-forward'; +import { handleStreamingRequest } from '../lib/proxy-forward'; import * as Sentry from '@sentry/node'; const log = logger('request-forward'); @@ -183,34 +179,7 @@ export default function handleRequestForward({ return; } - // 4. Wait for any pending cost from a previous request to be recorded - try { - await awaitPendingCost(matrixUserId); - } catch (e) { - log.error('Error waiting for pending cost:', e); - await sendResponseForSystemError( - ctxt, - 'There was an error saving your Boxel credits usage. Try again or contact support if the problem persists.', - ); - return; - } - - // 5. Check user has sufficient credits using credit strategy - const creditValidation = - await destinationConfig.creditStrategy.validateCredits( - dbAdapter, - matrixUserId, - ); - - if (!creditValidation.hasEnoughCredits) { - await sendResponseForForbiddenRequest( - ctxt, - creditValidation.errorMessage || 'Insufficient credits', - ); - return; - } - - // 5. Forward request to external endpoint + // 4. Forward request to external endpoint let parsedRequestBody: unknown; if (json.requestBody) { try { @@ -292,68 +261,88 @@ export default function handleRequestForward({ setContentTypeHeader(headers, 'application/json'); } - // Handle streaming requests - if (json.stream) { - if (!(await destinationsConfig.supportsStreaming(json.url))) { - await sendResponseForBadRequest( + if ( + json.stream && + !(await destinationsConfig.supportsStreaming(json.url)) + ) { + await sendResponseForBadRequest( + ctxt, + `Streaming is not supported for endpoint ${json.url}`, + ); + return; + } + + // 5. Serialize concurrent requests from the same matrix user across + // replicas: the next request can't kick off another billable upstream + // call before the previous request's cost row has landed in the + // credits ledger. The lock is held through validate-credits → + // upstream call → save-cost; on streaming, save-cost happens inside + // handleStreamingRequest after the `[DONE]` marker. + await dbAdapter.withUserCostLock(matrixUserId, async () => { + const creditValidation = + await destinationConfig.creditStrategy.validateCredits( + dbAdapter, + matrixUserId, + ); + + if (!creditValidation.hasEnoughCredits) { + await sendResponseForForbiddenRequest( ctxt, - `Streaming is not supported for endpoint ${json.url}`, + creditValidation.errorMessage || 'Insufficient credits', ); return; } - await handleStreamingRequest( - ctxt, - finalUrl, - json.method, + if (json.stream) { + await handleStreamingRequest( + ctxt, + finalUrl, + json.method, + headers, + finalBody, + destinationConfig, + dbAdapter, + matrixUserId, + ); + return; + } + + const fetchOptions: RequestInit = { + method: json.method, headers, - finalBody, - destinationConfig, + }; + + // Only add body for non-GET requests or when requestBody is provided + if (json.method !== 'GET' && finalBody !== undefined) { + fetchOptions.body = finalBody; + } + + // FIXME undici or something is swallowing the errors, making them useless: + /* + Error in request forward handler: TypeError: fetch failed + at node:internal/deps/undici/undici:13510:13 + at processTicksAndRejections (node:internal/process/task_queues:105:5) + */ + const externalResponse = await globalThis.fetch(finalUrl, fetchOptions); + + const responseData = await externalResponse.json(); + + await destinationConfig.creditStrategy.saveUsageCost( dbAdapter, matrixUserId, + responseData, ); - return; - } - // Handle non-streaming requests - const fetchOptions: RequestInit = { - method: json.method, - headers, - }; - - // Only add body for non-GET requests or when requestBody is provided - if (json.method !== 'GET' && finalBody !== undefined) { - fetchOptions.body = finalBody; - } + const response = new Response(JSON.stringify(responseData), { + status: externalResponse.status, + statusText: externalResponse.statusText, + headers: { + 'content-type': SupportedMimeType.JSON, + }, + }); - // FIXME undici or something is swallowing the errors, making them useless: - /* - Error in request forward handler: TypeError: fetch failed - at node:internal/deps/undici/undici:13510:13 - at processTicksAndRejections (node:internal/process/task_queues:105:5) - */ - const externalResponse = await globalThis.fetch(finalUrl, fetchOptions); - - const responseData = await externalResponse.json(); - - // 6. Deduct credits in the background using the cost from the response. - trackCostDeduction( - destinationConfig, - dbAdapter, - matrixUserId, - responseData, - ); - - // 7. Return response - const response = new Response(JSON.stringify(responseData), { - status: externalResponse.status, - statusText: externalResponse.statusText, - headers: { - 'content-type': SupportedMimeType.JSON, - }, + await setContextResponse(ctxt, response); }); - - await setContextResponse(ctxt, response); } catch (error) { log.error('Error in request forward handler:', error); Sentry.captureException(error); diff --git a/packages/realm-server/lib/proxy-forward.ts b/packages/realm-server/lib/proxy-forward.ts index 854a26b145..31d4b5b4b3 100644 --- a/packages/realm-server/lib/proxy-forward.ts +++ b/packages/realm-server/lib/proxy-forward.ts @@ -7,52 +7,18 @@ import type { AllowedProxyDestination } from './allowed-proxy-destinations'; const log = logger('proxy-forward'); -/** - * Per-user barrier ensuring the previous request's billable cost has been - * recorded before a new request starts. Shared across every handler that - * forwards through a credit-bearing destination so the same user can't race - * concurrent requests through different endpoints (e.g. `_request-forward` - * and `/_openrouter/chat/completions`). - */ -const pendingCostPromises = new Map>(); - const KEEP_ALIVE_INTERVAL_MS = 15000; -export async function awaitPendingCost(matrixUserId: string): Promise { - let pending = pendingCostPromises.get(matrixUserId); - if (pending) { - await pending; - } -} - -/** Schedule cost deduction in the background, chained after any prior pending. */ -export function trackCostDeduction( - destinationConfig: AllowedProxyDestination, - dbAdapter: DBAdapter, - matrixUserId: string, - responseData: unknown, -): void { - const previous = pendingCostPromises.get(matrixUserId) ?? Promise.resolve(); - const cost = previous - .then(() => - destinationConfig.creditStrategy.saveUsageCost( - dbAdapter, - matrixUserId, - responseData, - ), - ) - .finally(() => { - if (pendingCostPromises.get(matrixUserId) === cost) { - pendingCostPromises.delete(matrixUserId); - } - }); - pendingCostPromises.set(matrixUserId, cost); -} - /** * Stream the upstream `text/event-stream` response back to the client, parsing * each `data:` line so we can capture the OpenRouter generation id / inline - * cost and schedule a credit deduction at `[DONE]`. + * cost and save the credit deduction at `[DONE]`. + * + * Cost-save is awaited inline (not fire-and-forget). Callers run this inside + * `dbAdapter.withUserCostLock(matrixUserId, ...)`, which serializes concurrent + * same-user requests across replicas; the lock must be held until the cost + * row commits so the next request can't kick off another billable upstream + * call before the previous request's debit lands in the ledger. */ export async function handleStreamingRequest( ctxt: Koa.Context, @@ -101,23 +67,23 @@ export async function handleStreamingRequest( reader, async (data) => { if (data === '[DONE]') { + ctxt.res.write(`data: [DONE]\n\n`); if ( generationId != null || (typeof costInUsd === 'number' && Number.isFinite(costInUsd) && costInUsd > 0) ) { - trackCostDeduction(endpointConfig, dbAdapter, matrixUserId, { - id: generationId, - usage: { cost: costInUsd }, - }); + await endpointConfig.creditStrategy.saveUsageCost( + dbAdapter, + matrixUserId, + { id: generationId, usage: { cost: costInUsd } }, + ); } else { log.warn( `Streaming response for user ${matrixUserId} contained no generation ID or usage cost, skipping credit deduction`, ); } - - ctxt.res.write(`data: [DONE]\n\n`); return 'stop'; } diff --git a/packages/realm-server/tests/indexing-event-sink-test.ts b/packages/realm-server/tests/indexing-event-sink-test.ts index c9b6895d30..e783518990 100644 --- a/packages/realm-server/tests/indexing-event-sink-test.ts +++ b/packages/realm-server/tests/indexing-event-sink-test.ts @@ -33,6 +33,9 @@ function makeRecordingAdapter(): { async withWriteLock(_url, fn) { return await fn(undefined); }, + async withUserCostLock(_userId, fn) { + return await fn(); + }, }, }; } @@ -382,6 +385,9 @@ module(basename(__filename), function () { async withWriteLock(_url, fn) { return await fn(undefined); }, + async withUserCostLock(_userId, fn) { + return await fn(); + }, }; let sink = new IndexingEventSink({ flushIntervalMs: 10 }); sink.setAdapter(slowAdapter); diff --git a/packages/realm-server/tests/prerender-proxy-test.ts b/packages/realm-server/tests/prerender-proxy-test.ts index 482a75392c..0abed42720 100644 --- a/packages/realm-server/tests/prerender-proxy-test.ts +++ b/packages/realm-server/tests/prerender-proxy-test.ts @@ -32,6 +32,9 @@ module(basename(__filename), function () { async withWriteLock(_url, fn) { return await fn(undefined); }, + async withUserCostLock(_userId, fn) { + return await fn(); + }, }; } diff --git a/packages/realm-server/tests/realm-advisory-locks-test.ts b/packages/realm-server/tests/realm-advisory-locks-test.ts index e9035d4ce1..63ec0e2523 100644 --- a/packages/realm-server/tests/realm-advisory-locks-test.ts +++ b/packages/realm-server/tests/realm-advisory-locks-test.ts @@ -2,6 +2,7 @@ import { module, test } from 'qunit'; import { basename } from 'path'; import { hashRealmUrlForAdvisoryLock, + hashUserIdForCostLock, type PgAdapter, } from '@cardstack/postgres'; import { setupDB } from './helpers'; @@ -133,4 +134,205 @@ module(basename(__filename), function () { assert.strictEqual(result, 'ok', 'lock released after prior failure'); }); }); + + module('hashUserIdForCostLock', function () { + test('is deterministic', function (assert) { + const userId = '@alice:localhost'; + assert.strictEqual( + hashUserIdForCostLock(userId), + hashUserIdForCostLock(userId), + ); + }); + + test('yields different keys for different user ids', function (assert) { + assert.notStrictEqual( + hashUserIdForCostLock('@alice:localhost'), + hashUserIdForCostLock('@bob:localhost'), + ); + }); + + test('is namespaced away from the realm-write lock space', function (assert) { + // A user id and a realm URL string that happened to be equal would + // still derive different lock keys, so user-cost contention can never + // serialize on a realm-write lock and vice versa. + const shared = '@alice:localhost'; + assert.notStrictEqual( + hashUserIdForCostLock(shared), + hashRealmUrlForAdvisoryLock(shared), + ); + }); + + test('returns a string parseable as a signed 64-bit integer', function (assert) { + const key = hashUserIdForCostLock('@alice:localhost'); + assert.ok( + /^-?\d+$/.test(key), + `key is a decimal integer string (got ${key})`, + ); + const asBigInt = BigInt(key); + const MAX = 2n ** 63n - 1n; + const MIN = -(2n ** 63n); + assert.ok(asBigInt <= MAX, 'within int64 upper bound'); + assert.ok(asBigInt >= MIN, 'within int64 lower bound'); + }); + }); + + module('PgAdapter.withUserCostLock', function (hooks) { + let dbAdapter: PgAdapter; + setupDB(hooks, { + beforeEach: async (adapter) => { + dbAdapter = adapter; + }, + }); + + test('runs the callback and returns its value', async function (assert) { + const result = await dbAdapter.withUserCostLock( + '@alice:localhost', + async () => 42, + ); + assert.strictEqual(result, 42); + }); + + test('serializes two concurrent callers for the same user id', async function (assert) { + const userId = '@alice:localhost'; + const events: string[] = []; + + const p1 = dbAdapter.withUserCostLock(userId, async () => { + events.push('1-start'); + await new Promise((r) => setTimeout(r, 150)); + events.push('1-end'); + }); + // Give p1 a head start so it actually holds the lock first. + await new Promise((r) => setTimeout(r, 20)); + const p2 = dbAdapter.withUserCostLock(userId, async () => { + events.push('2-start'); + events.push('2-end'); + }); + + await Promise.all([p1, p2]); + + assert.deepEqual( + events, + ['1-start', '1-end', '2-start', '2-end'], + 'second caller runs only after first releases the lock', + ); + }); + + test('runs concurrent callers for different user ids in parallel', async function (assert) { + const events: string[] = []; + + const p1 = dbAdapter.withUserCostLock('@alice:localhost', async () => { + events.push('a-start'); + await new Promise((r) => setTimeout(r, 150)); + events.push('a-end'); + }); + const p2 = dbAdapter.withUserCostLock('@bob:localhost', async () => { + events.push('b-start'); + await new Promise((r) => setTimeout(r, 20)); + events.push('b-end'); + }); + + await Promise.all([p1, p2]); + + // B should complete before A-end because they run in parallel and + // B's critical section is much shorter. + const aEndIdx = events.indexOf('a-end'); + const bEndIdx = events.indexOf('b-end'); + assert.ok( + bEndIdx < aEndIdx, + `b-end (${bEndIdx}) should come before a-end (${aEndIdx}) under parallel execution; events: ${events.join(',')}`, + ); + }); + + test('releases the lock when the callback throws', async function (assert) { + const userId = '@alice:localhost'; + await assert.rejects( + dbAdapter.withUserCostLock(userId, async () => { + throw new Error('deliberate failure'); + }), + /deliberate failure/, + ); + const result = await dbAdapter.withUserCostLock(userId, async () => 'ok'); + assert.strictEqual(result, 'ok', 'lock released after prior failure'); + }); + + test('many concurrent same-user callers serialize without overlapping critical sections', async function (assert) { + // The pool-footprint argument behind the in-process coalescer: N + // concurrent same-user callers in one process should only ever have + // ONE of them inside the advisory-lock-held critical section at a + // time. We assert this by pushing per-caller start/end events: if + // serialized, the sequence is [a,a,b,b,c,c,...]; if N requests + // were piling up against the lock (each pinning its own pool + // client), starts would interleave across callers. + const userId = '@coalesce:localhost'; + const N = 8; + const order: number[] = []; + const work = (i: number) => + dbAdapter.withUserCostLock(userId, async () => { + order.push(i); + await new Promise((r) => setTimeout(r, 25)); + order.push(i); + return i; + }); + const results = await Promise.all( + Array.from({ length: N }, (_, i) => work(i)), + ); + assert.deepEqual( + results, + Array.from({ length: N }, (_, i) => i), + 'each caller observes its own result', + ); + for (let i = 0; i < order.length; i += 2) { + assert.strictEqual( + order[i], + order[i + 1], + `start/end events for one caller are adjacent at ${i}/${i + 1}; full order: ${order.join(',')}`, + ); + } + }); + + test('a prior caller failing does not poison the in-process chain', async function (assert) { + // The chain marches on after a failure — the next same-user caller + // takes its turn instead of inheriting the rejection. + const userId = '@chain-resilience:localhost'; + await assert.rejects( + dbAdapter.withUserCostLock(userId, async () => { + throw new Error('first caller exploded'); + }), + /first caller exploded/, + ); + const result = await dbAdapter.withUserCostLock(userId, async () => 'ok'); + assert.strictEqual(result, 'ok'); + }); + + test('does not serialize against the realm-write lock', async function (assert) { + // Even if a user id string equals a realm URL string (it never does + // in practice, but the namespacing guarantees it), the two lock + // spaces are disjoint. We assert that by holding a withWriteLock and + // a withUserCostLock on the same string concurrently — they must run + // in parallel, not serialize. + const shared = 'http://localhost:4201/shared/'; + const events: string[] = []; + + const writePromise = dbAdapter.withWriteLock(shared, async () => { + events.push('write-start'); + await new Promise((r) => setTimeout(r, 100)); + events.push('write-end'); + }); + await new Promise((r) => setTimeout(r, 10)); + const costPromise = dbAdapter.withUserCostLock(shared, async () => { + events.push('cost-start'); + events.push('cost-end'); + }); + + await Promise.all([writePromise, costPromise]); + + // cost-* should slot in between write-start and write-end because + // the locks are in different namespaces. + assert.deepEqual( + events, + ['write-start', 'cost-start', 'cost-end', 'write-end'], + 'realm-write and user-cost lock spaces are disjoint', + ); + }); + }); }); diff --git a/packages/realm-server/tests/request-forward-test.ts b/packages/realm-server/tests/request-forward-test.ts index 5653f4fe64..cdb09bedd4 100644 --- a/packages/realm-server/tests/request-forward-test.ts +++ b/packages/realm-server/tests/request-forward-test.ts @@ -920,6 +920,142 @@ module(basename(__filename), function () { } }); + test('serializes concurrent same-user OpenRouter calls via the cost-barrier lock', async function (assert) { + // Two concurrent requests from the same matrix user must not both + // forward to OpenRouter against the same credit balance — the second + // must wait until the first's cost row has landed. Before the + // db-coordinated barrier the second forwarded immediately because + // its credit check raced the first's pending deduction. + const originalFetch = global.fetch; + const mockFetch = sinon.stub(global, 'fetch'); + + // Each forward should see the prior cost already debited. We don't + // assert exact debit ordering against the upstream calls (the lock + // serializes both ends of the work), only that across both + // completions the ledger reflects both costs. + const inflightChatCalls: Array<{ + release: () => void; + started: Promise; + }> = []; + + mockFetch.callsFake(async (input: string | URL | Request) => { + const url = typeof input === 'string' ? input : input.toString(); + if (!url.includes('/chat/completions')) { + return new Response(JSON.stringify({ error: 'Not found' }), { + status: 404, + }); + } + // Suspend the upstream call until the test releases it, so two + // concurrent same-user requests would actually overlap upstream + // without the lock. With the lock the second call won't even + // reach this point until the first completes. + let releaseFn!: () => void; + let startedFn!: () => void; + const startedSignal = new Promise((res) => (startedFn = res)); + const gate = new Promise((res) => (releaseFn = res)); + inflightChatCalls.push({ release: releaseFn, started: startedSignal }); + startedFn(); + await gate; + return new Response( + JSON.stringify({ + id: `gen-${inflightChatCalls.length}`, + choices: [{ text: 'ok' }], + usage: { total_tokens: 10, cost: 0.002 }, + }), + { status: 200, headers: { 'content-type': 'application/json' } }, + ); + }); + + try { + const jwt = createRealmServerJWT( + { user: '@testuser:localhost', sessionRoom: 'test-session-room' }, + realmSecretSeed, + ); + + // supertest's Test is a thenable, not an eagerly-evaluating + // Promise — it fires the HTTP request when `.then` is called. + // Wrapping in `.then((r) => r)` forces the request to start now + // so the test can wait on inflight stub activity below; otherwise + // `const p1 = send()` would never actually hit the server until + // we awaited it. + const send = () => + request + .post('/_request-forward') + .set('Accept', 'application/json') + .set('Content-Type', 'application/json') + .set('Authorization', `Bearer ${jwt}`) + .send({ + url: 'https://openrouter.ai/api/v1/chat/completions', + method: 'POST', + requestBody: JSON.stringify({ + model: 'openai/gpt-3.5-turbo', + messages: [{ role: 'user', content: 'Hi' }], + }), + }) + .then((r) => r); + + const p1 = send(); + // Wait until the first forward has hit the upstream stub so we + // know it owns the lock. waitUntil defaults to a 1s budget, which + // isn't enough on a cold realm server (JWT → body parse → DB + // destination-config lookup → advisory-lock acquire → validate- + // Credits before fetch fires); give it real headroom. + await waitUntil(async () => inflightChatCalls.length >= 1, { + timeout: 15000, + timeoutMessage: 'first upstream call should reach the fetch stub', + }); + const p2 = send(); + + // With the lock held, the second request must NOT reach upstream + // until the first releases. Give it generous wall time then assert. + await new Promise((r) => setTimeout(r, 500)); + assert.strictEqual( + inflightChatCalls.length, + 1, + 'second concurrent same-user request blocks on the cost-barrier lock', + ); + + // Release the first; the second should then proceed and reach + // upstream by itself. + inflightChatCalls[0].release(); + await p1; + + await waitUntil(async () => inflightChatCalls.length >= 2, { + timeout: 15000, + timeoutMessage: + 'second upstream call should proceed once lock releases', + }); + inflightChatCalls[1].release(); + await p2; + + // Both costs (0.002 USD × 1000 = 2 credits each) should have + // landed, leaving 50 - 4 = 46. + const user = await getUserByMatrixUserId( + dbAdapter, + '@testuser:localhost', + ); + await waitUntil( + async () => { + const credits = await sumUpCreditsLedger(dbAdapter, { + creditType: ['extra_credit', 'extra_credit_used'], + userId: user!.id, + }); + return credits === 46; + }, + { + timeoutMessage: + 'both serialized costs should be debited (50 - 4 = 46)', + }, + ); + } finally { + // If a test assertion failed mid-flight, leftover gated upstream + // calls would hang the test process — release any survivors. + for (const c of inflightChatCalls) c.release(); + mockFetch.restore(); + global.fetch = originalFetch; + } + }); + test('should return a 400 when multipart payload is not an object', async function (assert) { const jwt = createRealmServerJWT( { user: '@testuser:localhost', sessionRoom: 'test-session-room' }, diff --git a/packages/realm-server/tests/screenshot-card-test.ts b/packages/realm-server/tests/screenshot-card-test.ts index 605a92ab24..283b65d462 100644 --- a/packages/realm-server/tests/screenshot-card-test.ts +++ b/packages/realm-server/tests/screenshot-card-test.ts @@ -36,6 +36,9 @@ module(basename(__filename), function () { async withWriteLock(_url, fn) { return await fn(undefined); }, + async withUserCostLock(_userId, fn) { + return await fn(); + }, }; } diff --git a/packages/runtime-common/db.ts b/packages/runtime-common/db.ts index f1de9901a5..8a87695350 100644 --- a/packages/runtime-common/db.ts +++ b/packages/runtime-common/db.ts @@ -37,4 +37,15 @@ export interface DBAdapter { realmUrl: string, fn: (txQuerier: Querier | undefined) => Promise, ) => Promise; + // Per-matrix-user cost-barrier primitive: serializes concurrent billable + // upstream proxy calls for the same user across replicas so the next + // request can't kick off another upstream call before the previous + // request's cost row has landed in the credits ledger. PgAdapter + // implements with `pg_advisory_xact_lock` on a namespaced hash of the + // matrix user id; SQLite is a passthrough. See PgAdapter.withUserCostLock + // for design notes (pool pressure, re-entrancy). + withUserCostLock: ( + matrixUserId: string, + fn: () => Promise, + ) => Promise; } diff --git a/packages/runtime-common/tests/run-command-task-shared-tests.ts b/packages/runtime-common/tests/run-command-task-shared-tests.ts index 11a36873a8..cdbab5dc57 100644 --- a/packages/runtime-common/tests/run-command-task-shared-tests.ts +++ b/packages/runtime-common/tests/run-command-task-shared-tests.ts @@ -24,6 +24,7 @@ function makeDBAdapter( close: async () => {}, getColumnNames: async () => [], withWriteLock: async (_url, fn) => fn(undefined), + withUserCostLock: async (_userId, fn) => fn(), }; }