Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/bot-runner/tests/bot-runner-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ module('timeline handler', () => {
close: async () => {},
getColumnNames: async () => [],
withWriteLock: async (_url, fn) => fn(undefined),
withUserCostLock: async (_userId, fn) => fn(),
} as DBAdapter;

queuePublisher = {
Expand Down
7 changes: 7 additions & 0 deletions packages/bot-runner/tests/command-runner-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ module('command runner', () => {
close: async () => {},
getColumnNames: async () => [],
withWriteLock: async (_url, fn) => fn(undefined),
withUserCostLock: async (_userId, fn) => fn(),
} as DBAdapter;

let commandRunner = makeRunner(dbAdapter, queuePublisher, githubClient);
Expand Down Expand Up @@ -245,6 +246,7 @@ module('command runner', () => {
close: async () => {},
getColumnNames: async () => [],
withWriteLock: async (_url, fn) => fn(undefined),
withUserCostLock: async (_userId, fn) => fn(),
} as DBAdapter;

let commandRunner = makeRunner(dbAdapter, queuePublisher, githubClient);
Expand Down Expand Up @@ -467,6 +469,7 @@ module('command runner', () => {
close: async () => {},
getColumnNames: async () => [],
withWriteLock: async (_url, fn) => fn(undefined),
withUserCostLock: async (_userId, fn) => fn(),
} as DBAdapter;

let commandRunner = makeRunner(dbAdapter, queuePublisher, githubClient);
Expand Down Expand Up @@ -563,6 +566,7 @@ module('command runner', () => {
close: async () => {},
getColumnNames: async () => [],
withWriteLock: async (_url, fn) => fn(undefined),
withUserCostLock: async (_userId, fn) => fn(),
} as DBAdapter;

let commandRunner = makeRunner(dbAdapter, queuePublisher, githubClient);
Expand Down Expand Up @@ -702,6 +706,7 @@ module('command runner', () => {
close: async () => {},
getColumnNames: async () => [],
withWriteLock: async (_url, fn) => fn(undefined),
withUserCostLock: async (_userId, fn) => fn(),
} as DBAdapter;

let commandRunner = makeRunner(dbAdapter, queuePublisher, githubClient);
Expand Down Expand Up @@ -916,6 +921,7 @@ module('command runner', () => {
close: async () => {},
getColumnNames: async () => [],
withWriteLock: async (_url, fn) => fn(undefined),
withUserCostLock: async (_userId, fn) => fn(),
} as DBAdapter;

let commandRunner = makeRunner(dbAdapter, queuePublisher, githubClient);
Expand Down Expand Up @@ -1037,6 +1043,7 @@ module('command runner', () => {
close: async () => {},
getColumnNames: async () => [],
withWriteLock: async (_url, fn) => fn(undefined),
withUserCostLock: async (_userId, fn) => fn(),
} as DBAdapter;

let commandRunner = makeRunner(dbAdapter, queuePublisher, githubClient);
Expand Down
10 changes: 10 additions & 0 deletions packages/host/app/lib/sqlite-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ export default class SQLiteAdapter implements DBAdapter {
return await fn(undefined);
}

// SQLite has no cross-connection concurrency to coordinate, so the
// per-user cost-barrier lock is a passthrough. The PG implementation
// provides the real serialization across replicas.
async withUserCostLock<T>(
_matrixUserId: string,
fn: () => Promise<T>,
): Promise<T> {
return await fn();
}

private async internalExecute(sql: string, opts?: ExecuteOptions) {
sql = this.adjustSQL(sql);
return await this.query(sql, opts);
Expand Down
107 changes: 106 additions & 1 deletion packages/postgres/pg-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ export function hashRealmUrlForAdvisoryLock(url: string): string {
return digest.readBigInt64BE(0).toString();
}

// Lock key for the per-matrix-user "next request waits for prior cost to land"
// barrier (see PgAdapter.withUserCostLock). Namespaced so the user-cost lock
// space cannot collide with the realm-write lock space — a matrix user id is
// extremely unlikely to hash-collide with a realm URL even without the
// namespace (input shapes are disjoint), but the prefix makes the partition
// explicit at the key-derivation site rather than implicit in input formats.
export function hashUserIdForCostLock(matrixUserId: string): string {
const digest = createHash('sha256')
.update('cost-barrier:')
.update(matrixUserId)
.digest();
return digest.readBigInt64BE(0).toString();
}

const log = logger('pg-adapter');

type MigrationNameFixes = {
Expand Down Expand Up @@ -110,6 +124,15 @@ export class PgAdapter implements DBAdapter {
#notificationClient?: Client;
#notificationClientStarting?: Promise<Client>;
#channels = new Map<string, ChannelState>();
// In-process coalescer for the per-user cost-barrier. Multiple concurrent
// same-user callers in this process chain on the same in-memory promise so
// only ONE of them is actively waiting on the cross-replica advisory lock
// (and therefore pinning a pool connection) at a time. The advisory lock
// itself serializes the holder across replicas; the in-process map keeps
// the per-replica pool footprint bounded to one connection per active
// same-user user, not one per concurrent same-user request. See
// withUserCostLock for the full rationale.
#userCostQueue = new Map<string, Promise<void>>();

constructor(opts?: { autoMigrate?: boolean; migrationLogging?: boolean }) {
if (opts?.autoMigrate) {
Expand Down Expand Up @@ -422,6 +445,88 @@ export class PgAdapter implements DBAdapter {
fn: (txQuerier: Querier | undefined) => Promise<T>,
): Promise<T> {
const lockKey = hashRealmUrlForAdvisoryLock(realmUrl);
return await this.#runWithAdvisoryXactLock(lockKey, realmUrl, fn);
}

// Per-matrix-user serialization barrier for billable upstream proxy calls.
// Two concurrent requests from the same matrix user — including across
// replicas with no stickiness — must not both kick off an upstream call
// before the prior request's cost row has landed in the credits ledger.
//
// Two coordination layers compose:
//
// 1. In-process: `#userCostQueue` chains same-user callers within this
// process on an in-memory promise. Only the head of the chain is
// actively waiting on the DB lock; later callers wait in memory.
// 2. Cross-replica: `pg_advisory_xact_lock` on a namespaced hash of the
// matrix user id serializes holders across replicas.
//
// Pool-pressure budget: this is the realm-server's main pool (also used
// by indexing / federated-search), and the critical section spans the
// upstream LLM call (potentially tens of seconds on streaming). Without
// the in-process queue, N concurrent same-user requests landing on one
// replica would each pin a pool client while blocked on the advisory
// lock — that scales badly against the 40-client default and the
// indexer's 20-client baseline. With the queue, per-replica pool
// footprint is bounded to *one* pinned client per active same-user
// user, not per concurrent request. Across N replicas a single user's
// requests fan out to at most N pinned clients cluster-wide; per-replica
// count is invariant to per-user concurrency.
//
// Failure semantics: a prior caller's rejection does NOT cascade — the
// next caller's `await previous.catch(...)` swallows it so the chain
// marches on. Each caller's own error is surfaced via the returned
// promise. The advisory lock's own rollback/release semantics are the
// same as withWriteLock (xact-lock released on transaction abort, no
// stale-lock risk).
//
// The callback does NOT receive a `txQuerier` — the barrier only needs
// serialization, not transactional grouping of the work inside it.
// Inner DB calls (validateCredits, saveUsageCost) run via the shared
// dbAdapter on separate pool connections as today.
async withUserCostLock<T>(
matrixUserId: string,
fn: () => Promise<T>,
): Promise<T> {
const previous = this.#userCostQueue.get(matrixUserId) ?? Promise.resolve();
const lockKey = hashUserIdForCostLock(matrixUserId);
const myWork = (async (): Promise<T> => {
try {
await previous;
} catch {
// A prior caller's failure must not cascade — the next request
// in the queue should still get its turn at the lock.
}
return await this.#runWithAdvisoryXactLock(
lockKey,
`user-cost:${matrixUserId}`,
() => fn(),
);
})();
// What the NEXT same-user caller waits on: outcome-erased so its
// own try/catch isn't sensitive to ours. Tee'd off myWork to keep a
// single source of truth for completion timing.
const myCompletion: Promise<void> = myWork.then(
() => undefined,
() => undefined,
);
this.#userCostQueue.set(matrixUserId, myCompletion);
// Compact the map once the chain is idle. Only delete if no later
// caller has overwritten the tail — otherwise we'd unlink the chain
// and let a same-user race past it.
void myCompletion.finally(() => {
if (this.#userCostQueue.get(matrixUserId) === myCompletion) {
this.#userCostQueue.delete(matrixUserId);
}
});
return await myWork;
}

async #runWithAdvisoryXactLock<T>(
lockKey: string,
contextLabel: string,
fn: (txQuerier: Querier) => Promise<T>,
): Promise<T> {
return await this.withConnection(async (queryFn) => {
await queryFn(['BEGIN']);
try {
Expand All @@ -442,7 +547,7 @@ export class PgAdapter implements DBAdapter {
// client release), so we don't have a stale-lock problem. Log
// for visibility and rethrow the original error.
log.warn(
`ROLLBACK after withWriteLock error for ${realmUrl} failed: ${String(rollbackErr)}`,
`ROLLBACK after advisory-lock error for ${contextLabel} failed: ${String(rollbackErr)}`,
);
}
throw err;
Expand Down
111 changes: 52 additions & 59 deletions packages/realm-server/handlers/handle-openrouter-passthrough.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@ import { logger, SupportedMimeType } from '@cardstack/runtime-common';
import * as Sentry from '@sentry/node';

import { AllowedProxyDestinations } from '../lib/allowed-proxy-destinations';
import {
awaitPendingCost,
handleStreamingRequest,
trackCostDeduction,
} from '../lib/proxy-forward';
import { handleStreamingRequest } from '../lib/proxy-forward';
import {
fetchRequestFromContext,
sendResponseForBadRequest,
Expand Down Expand Up @@ -95,77 +91,74 @@ export default function handleOpenRouterPassthrough({
return;
}

try {
await awaitPendingCost(matrixUserId);
} catch (e) {
log.error('Error waiting for pending cost:', e);
await sendResponseForSystemError(
if (isStreaming && !destinationConfig.supportsStreaming) {
await sendResponseForBadRequest(
ctxt,
'There was an error saving your Boxel credits usage. Try again or contact support if the problem persists.',
'Streaming is not supported for the OpenRouter passthrough',
);
return;
}

const creditValidation =
await destinationConfig.creditStrategy.validateCredits(
dbAdapter,
matrixUserId,
);
if (!creditValidation.hasEnoughCredits) {
await sendResponseForForbiddenRequest(
ctxt,
creditValidation.errorMessage || 'Insufficient credits',
);
return;
}
// Serialize concurrent requests from the same matrix user across
// replicas: the next request can't kick off another billable upstream
// call before the previous request's cost row has landed in the
// credits ledger. The lock is held through validate-credits → upstream
// call → save-cost; on streaming, save-cost happens inside
// handleStreamingRequest after the `[DONE]` marker.
await dbAdapter.withUserCostLock(matrixUserId, async () => {
const creditValidation =
await destinationConfig.creditStrategy.validateCredits(
dbAdapter,
matrixUserId,
);
if (!creditValidation.hasEnoughCredits) {
await sendResponseForForbiddenRequest(
ctxt,
creditValidation.errorMessage || 'Insufficient credits',
);
return;
}

const headers: Record<string, string> = {
'Content-Type': 'application/json',
Authorization: `Bearer ${destinationConfig.apiKey}`,
};
const finalBody = JSON.stringify(openAIBody);
const headers: Record<string, string> = {
'Content-Type': 'application/json',
Authorization: `Bearer ${destinationConfig.apiKey}`,
};
const finalBody = JSON.stringify(openAIBody);

if (isStreaming) {
if (!destinationConfig.supportsStreaming) {
await sendResponseForBadRequest(
if (isStreaming) {
await handleStreamingRequest(
ctxt,
'Streaming is not supported for the OpenRouter passthrough',
OPENROUTER_CHAT_URL,
'POST',
headers,
finalBody,
destinationConfig,
dbAdapter,
matrixUserId,
);
return;
}
await handleStreamingRequest(
ctxt,
OPENROUTER_CHAT_URL,
'POST',

const externalResponse = await globalThis.fetch(OPENROUTER_CHAT_URL, {
method: 'POST',
headers,
finalBody,
destinationConfig,
body: finalBody,
});
const responseData = await externalResponse.json();

await destinationConfig.creditStrategy.saveUsageCost(
dbAdapter,
matrixUserId,
responseData,
);
return;
}

const externalResponse = await globalThis.fetch(OPENROUTER_CHAT_URL, {
method: 'POST',
headers,
body: finalBody,
});
const responseData = await externalResponse.json();

trackCostDeduction(
destinationConfig,
dbAdapter,
matrixUserId,
responseData,
);

const response = new Response(JSON.stringify(responseData), {
status: externalResponse.status,
statusText: externalResponse.statusText,
headers: { 'content-type': SupportedMimeType.JSON },
const response = new Response(JSON.stringify(responseData), {
status: externalResponse.status,
statusText: externalResponse.statusText,
headers: { 'content-type': SupportedMimeType.JSON },
});
await setContextResponse(ctxt, response);
});
await setContextResponse(ctxt, response);
} catch (error) {
log.error('Error in openrouter-passthrough handler:', error);
Sentry.captureException(error);
Expand Down
Loading
Loading