From 8e921f9ad74a780ea3a69e964ea44fe493d01648 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Sun, 19 Oct 2025 21:31:28 +0200 Subject: [PATCH 1/4] fix: read-after-write issues --- apps/api/src/controllers/ai.controller.ts | 4 +- .../api/src/controllers/webhook.controller.ts | 24 +- apps/api/src/index.ts | 15 +- apps/start/src/hooks/use-session-extension.ts | 4 +- packages/auth/src/session.ts | 8 +- packages/db/index.ts | 1 + packages/db/src/logger.ts | 3 + packages/db/src/prisma-client.ts | 24 +- .../db/src/services/organization.service.ts | 2 +- packages/db/src/session-consistency.ts | 216 ++++++++++++++++++ packages/db/src/session-context.ts | 12 + packages/redis/cachable.ts | 4 + packages/trpc/src/access.ts | 107 ++++----- packages/trpc/src/routers/auth.ts | 2 + packages/trpc/src/routers/chart.ts | 4 +- packages/trpc/src/routers/event.ts | 4 +- packages/trpc/src/routers/integration.ts | 6 +- packages/trpc/src/trpc.ts | 81 ++++--- 18 files changed, 397 insertions(+), 124 deletions(-) create mode 100644 packages/db/src/logger.ts create mode 100644 packages/db/src/session-consistency.ts create mode 100644 packages/db/src/session-context.ts diff --git a/apps/api/src/controllers/ai.controller.ts b/apps/api/src/controllers/ai.controller.ts index 6162d0362..02ede1cee 100644 --- a/apps/api/src/controllers/ai.controller.ts +++ b/apps/api/src/controllers/ai.controller.ts @@ -9,7 +9,7 @@ import { } from '@/utils/ai-tools'; import { HttpError } from '@/utils/errors'; import { db, getOrganizationByProjectIdCached } from '@openpanel/db'; -import { getProjectAccessCached } from '@openpanel/trpc/src/access'; +import { getProjectAccess } from '@openpanel/trpc/src/access'; import { type Message, appendResponseMessages, streamText } from 'ai'; import type { FastifyReply, FastifyRequest } from 'fastify'; @@ -37,7 +37,7 @@ export async function chat( } const organization = await getOrganizationByProjectIdCached(projectId); - const access = await getProjectAccessCached({ + const access = await getProjectAccess({ projectId, userId: session.userId, }); diff --git a/apps/api/src/controllers/webhook.controller.ts b/apps/api/src/controllers/webhook.controller.ts index c2e1c4321..b164c9c3e 100644 --- a/apps/api/src/controllers/webhook.controller.ts +++ b/apps/api/src/controllers/webhook.controller.ts @@ -113,6 +113,17 @@ export async function slackWebhook( } } +async function clearOrganizationCache(organizationId: string) { + const projects = await db.project.findMany({ + where: { + organizationId, + }, + }); + for (const project of projects) { + await getOrganizationByProjectIdCached.clear(project.id); + } +} + export async function polarWebhook( request: FastifyRequest<{ Querystring: unknown; @@ -141,8 +152,11 @@ export async function polarWebhook( }, data: { subscriptionPeriodEventsCount: 0, + subscriptionPeriodEventsCountExceededAt: null, }, }); + + await clearOrganizationCache(metadata.organizationId); } break; } @@ -205,15 +219,7 @@ export async function polarWebhook( }, }); - const projects = await db.project.findMany({ - where: { - organizationId: metadata.organizationId, - }, - }); - - for (const project of projects) { - await getOrganizationByProjectIdCached.clear(project.id); - } + await clearOrganizationCache(metadata.organizationId); await publishEvent('organization', 'subscription_updated', { organizationId: metadata.organizationId, diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index cbf28ee32..a6a483c3d 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -8,14 +8,18 @@ import Fastify from 'fastify'; import metricsPlugin from 'fastify-metrics'; import { generateId } from '@openpanel/common'; -import type { IServiceClientWithProject } from '@openpanel/db'; -import { getRedisPub } from '@openpanel/redis'; +import { + type IServiceClientWithProject, + runWithAlsSession, +} from '@openpanel/db'; +import { getCache, getRedisPub } from '@openpanel/redis'; import type { AppRouter } from '@openpanel/trpc'; import { appRouter, createContext } from '@openpanel/trpc'; import { EMPTY_SESSION, type SessionValidationResult, + decodeSessionToken, validateSessionToken, } from '@openpanel/auth'; import sourceMapSupport from 'source-map-support'; @@ -140,7 +144,12 @@ const startServer = async () => { instance.addHook('onRequest', async (req) => { if (req.cookies?.session) { try { - const session = await validateSessionToken(req.cookies.session); + const sessionId = await decodeSessionToken(req.cookies.session); + const session = await runWithAlsSession(sessionId, () => + getCache(`validateSession:${sessionId}`, 60 * 5, async () => + validateSessionToken(req.cookies.session), + ), + ); if (session.session) { req.session = session; } diff --git a/apps/start/src/hooks/use-session-extension.ts b/apps/start/src/hooks/use-session-extension.ts index fa33e48b8..c27163afb 100644 --- a/apps/start/src/hooks/use-session-extension.ts +++ b/apps/start/src/hooks/use-session-extension.ts @@ -24,9 +24,11 @@ export function useSessionExtension() { 1000 * 60 * 5, ); - extendSessionFn(); + // Delay initial call a bit to prioritize other requests + const timer = setTimeout(() => extendSessionFn(), 5000); return () => { + clearTimeout(timer); if (intervalRef.current) { clearInterval(intervalRef.current); } diff --git a/packages/auth/src/session.ts b/packages/auth/src/session.ts index 359b3f996..d31b4a476 100644 --- a/packages/auth/src/session.ts +++ b/packages/auth/src/session.ts @@ -59,8 +59,12 @@ export async function createDemoSession( }; } +export const decodeSessionToken = (token: string): string => { + return encodeHexLowerCase(sha256(new TextEncoder().encode(token))); +}; + export async function validateSessionToken( - token: string | null, + token: string | null | undefined, ): Promise { if (process.env.DEMO_USER_ID) { return createDemoSession(process.env.DEMO_USER_ID); @@ -69,7 +73,7 @@ export async function validateSessionToken( if (!token) { return EMPTY_SESSION; } - const sessionId = encodeHexLowerCase(sha256(new TextEncoder().encode(token))); + const sessionId = decodeSessionToken(token); const result = await db.$primary().session.findUnique({ where: { id: sessionId, diff --git a/packages/db/index.ts b/packages/db/index.ts index 623eb6ee1..522f12998 100644 --- a/packages/db/index.ts +++ b/packages/db/index.ts @@ -23,3 +23,4 @@ export * from './src/buffers'; export * from './src/types'; export * from './src/clickhouse/query-builder'; export * from './src/services/overview.service'; +export * from './src/session-context'; diff --git a/packages/db/src/logger.ts b/packages/db/src/logger.ts new file mode 100644 index 000000000..6e023bf6a --- /dev/null +++ b/packages/db/src/logger.ts @@ -0,0 +1,3 @@ +import { createLogger } from '@openpanel/logger'; + +export const logger = createLogger({ name: 'db:prisma' }); diff --git a/packages/db/src/prisma-client.ts b/packages/db/src/prisma-client.ts index 6943e4ec3..97a2136cd 100644 --- a/packages/db/src/prisma-client.ts +++ b/packages/db/src/prisma-client.ts @@ -1,11 +1,15 @@ import { createLogger } from '@openpanel/logger'; import { readReplicas } from '@prisma/extension-read-replicas'; -import { type Organization, PrismaClient } from './generated/prisma/client'; +import { + type Organization, + Prisma, + PrismaClient, +} from './generated/prisma/client'; +import { logger } from './logger'; +import { sessionConsistency } from './session-consistency'; export * from './generated/prisma/client'; -const logger = createLogger({ name: 'db' }); - const isWillBeCanceled = ( organization: Pick< Organization, @@ -30,11 +34,6 @@ const getPrismaClient = () => { const prisma = new PrismaClient({ log: ['error'], }) - .$extends( - readReplicas({ - url: process.env.DATABASE_URL_REPLICA ?? process.env.DATABASE_URL!, - }), - ) .$extends({ query: { async $allOperations({ operation, model, args, query }) { @@ -53,6 +52,8 @@ const getPrismaClient = () => { }, }, }) + + .$extends(sessionConsistency()) .$extends({ result: { organization: { @@ -258,7 +259,12 @@ const getPrismaClient = () => { }, }, }, - }); + }) + .$extends( + readReplicas({ + url: process.env.DATABASE_URL_REPLICA ?? process.env.DATABASE_URL!, + }), + ); return prisma; }; diff --git a/packages/db/src/services/organization.service.ts b/packages/db/src/services/organization.service.ts index e20c6185b..5a3732eef 100644 --- a/packages/db/src/services/organization.service.ts +++ b/packages/db/src/services/organization.service.ts @@ -61,7 +61,7 @@ export async function getOrganizationByProjectId(projectId: string) { export const getOrganizationByProjectIdCached = cacheable( getOrganizationByProjectId, - 60 * 60 * 24, + 60 * 5, ); export async function getInvites(organizationId: string) { diff --git a/packages/db/src/session-consistency.ts b/packages/db/src/session-consistency.ts new file mode 100644 index 000000000..5a3c4467c --- /dev/null +++ b/packages/db/src/session-consistency.ts @@ -0,0 +1,216 @@ +import { getRedisCache } from '@openpanel/redis'; +import type { Operation } from '@prisma/client/runtime/client'; +import { Prisma, type PrismaClient } from './generated/prisma/client'; +import { logger } from './logger'; +import { getAlsSessionId } from './session-context'; + +// WAL LSN tracking for read-after-write consistency +const LSN_CACHE_PREFIX = 'db:wal_lsn:'; +const LSN_CACHE_TTL = 5; +const MAX_RETRY_ATTEMPTS = 5; +const INITIAL_RETRY_DELAY_MS = 10; + +const READ_OPERATIONS: Operation[] = [ + 'findUnique', + 'findUniqueOrThrow', + 'findFirst', + 'findFirstOrThrow', + 'findMany', + 'aggregate', + 'groupBy', + 'count', +]; + +const WRITE_OPERATIONS: Operation[] = [ + 'create', + 'update', + 'delete', + 'createMany', + 'createManyAndReturn', + 'updateMany', + 'deleteMany', + 'upsert', +]; + +const isWriteOperation = (operation: string) => + WRITE_OPERATIONS.includes(operation as Operation); + +const isReadOperation = (operation: string) => + READ_OPERATIONS.includes(operation as Operation); + +async function getCurrentWalLsn( + prismaClient: PrismaClient, +): Promise { + try { + const result = await prismaClient.$queryRaw<[{ lsn: string }]>` + SELECT pg_current_wal_lsn()::text AS lsn + `; + return result[0]?.lsn || null; + } catch (error) { + logger.error('Failed to get WAL LSN', { error }); + return null; + } +} + +async function cacheWalLsnForSession( + sessionId: string, + lsn: string, +): Promise { + try { + const redis = getRedisCache(); + await redis.setex(`${LSN_CACHE_PREFIX}${sessionId}`, LSN_CACHE_TTL, lsn); + } catch (error) { + logger.error('Failed to cache WAL LSN', { error, sessionId }); + } +} + +async function getCachedWalLsn(sessionId: string): Promise { + try { + const redis = getRedisCache(); + return await redis.get(`${LSN_CACHE_PREFIX}${sessionId}`); + } catch (error) { + logger.error('Failed to get cached WAL LSN', { error, sessionId }); + return null; + } +} + +function compareWalLsn(lsn1: string, lsn2: string): number { + const [x1, y1] = lsn1.split('/').map((x) => BigInt(`0x${x}`)); + const [x2, y2] = lsn2.split('/').map((x) => BigInt(`0x${x}`)); + + const v1 = ((x1 ?? 0n) << 32n) + (y1 ?? 0n); + const v2 = ((x2 ?? 0n) << 32n) + (y2 ?? 0n); + + if (v1 < v2) return -1; + if (v1 > v2) return 1; + return 0; +} + +async function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +async function waitForReplicaCatchup( + prismaClient: PrismaClient, + sessionId: string, +): Promise { + const expectedLsn = await getCachedWalLsn(sessionId); + if (!expectedLsn) { + return true; + } + + for (let attempt = 0; attempt < MAX_RETRY_ATTEMPTS; attempt++) { + const currentLsn = await getCurrentWalLsn(prismaClient); + if (!currentLsn) { + return true; + } + + // Check if replica has caught up (current >= expected) + if (compareWalLsn(currentLsn, expectedLsn) >= 0) { + logger.debug('Replica caught up', { + attempt: attempt + 1, + currentLsn, + expectedLsn, + sessionId, + }); + return true; + } + + // Exponential backoff + if (attempt < MAX_RETRY_ATTEMPTS - 1) { + const delayMs = INITIAL_RETRY_DELAY_MS * 2 ** attempt; + logger.debug('Waiting for replica to catch up', { + attempt: attempt + 1, + delayMs, + currentLsn, + expectedLsn, + sessionId, + }); + await sleep(delayMs); + } + } + + logger.warn( + 'Replica did not catch up after max retries, falling back to primary', + { + sessionId, + expectedLsn, + }, + ); + return false; +} + +/** + * Prisma extension for session-based read-after-write consistency. + * + * This extension tracks WAL LSN positions after writes and ensures that + * subsequent reads within the same session see those writes, even when + * using read replicas. + * + * How it works: + * 1. After any write operation with a session ID, it captures the WAL LSN + * 2. Before read operations with a session ID, it checks if the replica has caught up + * 3. If the replica hasn't caught up after retries, it forces the read to the primary + * + */ +export function sessionConsistency() { + return Prisma.defineExtension({ + name: 'session-consistency', + query: { + $allOperations: async ({ + operation, + model, + args, + query, + // This is a hack to force reads to primary when replica hasn't caught up. + // The readReplicas extension routes queries to primary when in a transaction, + // so we set __internalParams.transaction = true to achieve this. + // @ts-expect-error - __internalParams is not in the types + __internalParams, + }) => { + const sessionId = getAlsSessionId(); + + // For write operations with session: cache WAL LSN after write + if (isWriteOperation(operation)) { + logger.info('Prisma operation', { + operation, + args, + model, + }); + + const result = await query(args); + + if (sessionId) { + // Get current WAL LSN and cache it for this session + // @ts-expect-error - 'this' refers to the Prisma client + const lsn = await getCurrentWalLsn(this); + if (lsn) { + await cacheWalLsnForSession(sessionId, lsn); + logger.debug('Cached WAL LSN after write', { + sessionId, + lsn, + operation, + model, + }); + } + } + + return result; + } + + // For read operations with session: try replica first, fallback to primary + if (isReadOperation(operation) && sessionId) { + // @ts-expect-error - 'this' refers to the Prisma client + const replicaCaughtUp = await waitForReplicaCatchup(this, sessionId); + + if (!replicaCaughtUp) { + // This will force readReplicas extension to use primary + __internalParams.transaction = true; + } + } + + return query(args); + }, + }, + }); +} diff --git a/packages/db/src/session-context.ts b/packages/db/src/session-context.ts new file mode 100644 index 000000000..275287bb3 --- /dev/null +++ b/packages/db/src/session-context.ts @@ -0,0 +1,12 @@ +import { AsyncLocalStorage } from 'node:async_hooks'; + +type Ctx = { sessionId: string | null }; + +export const als = new AsyncLocalStorage(); + +export const runWithAlsSession = ( + sid: string | null | undefined, + fn: () => Promise, +) => als.run({ sessionId: sid || null }, fn); + +export const getAlsSessionId = () => als.getStore()?.sessionId ?? null; diff --git a/packages/redis/cachable.ts b/packages/redis/cachable.ts index ae59750da..55221d33f 100644 --- a/packages/redis/cachable.ts +++ b/packages/redis/cachable.ts @@ -1,5 +1,9 @@ import { getRedisCache } from './redis'; +export const deleteCache = async (key: string) => { + return getRedisCache().del(key); +}; + export async function getCache( key: string, expireInSec: number, diff --git a/packages/trpc/src/access.ts b/packages/trpc/src/access.ts index 0cbfefe19..64d9249ed 100644 --- a/packages/trpc/src/access.ts +++ b/packages/trpc/src/access.ts @@ -1,64 +1,67 @@ import { db, getProjectById } from '@openpanel/db'; import { cacheable } from '@openpanel/redis'; -export const getProjectAccessCached = cacheable(getProjectAccess, 60 * 5); -export async function getProjectAccess({ - userId, - projectId, -}: { - userId: string; - projectId: string; -}) { - try { - // Check if user has access to the project - const project = await getProjectById(projectId); - if (!project?.organizationId) { - return false; - } +export const getProjectAccess = cacheable( + 'getProjectAccess', + async ({ + userId, + projectId, + }: { + userId: string; + projectId: string; + }) => { + try { + // Check if user has access to the project + const project = await getProjectById(projectId); + if (!project?.organizationId) { + return false; + } - const [projectAccess, member] = await Promise.all([ - db.projectAccess.findMany({ - where: { - userId, - organizationId: project.organizationId, - }, - }), - db.member.findFirst({ - where: { - organizationId: project.organizationId, - userId, - }, - }), - ]); + const [projectAccess, member] = await Promise.all([ + db.$primary().projectAccess.findMany({ + where: { + userId, + organizationId: project.organizationId, + }, + }), + db.$primary().member.findFirst({ + where: { + organizationId: project.organizationId, + userId, + }, + }), + ]); - if (projectAccess.length === 0 && member) { - return true; - } + if (projectAccess.length === 0 && member) { + return true; + } - return projectAccess.find((item) => item.projectId === projectId); - } catch (err) { - return false; - } -} + return projectAccess.find((item) => item.projectId === projectId); + } catch (err) { + return false; + } + }, + 60 * 5, +); -export const getOrganizationAccessCached = cacheable( - getOrganizationAccess, +export const getOrganizationAccess = cacheable( + 'getOrganizationAccess', + async ({ + userId, + organizationId, + }: { + userId: string; + organizationId: string; + }) => { + return db.$primary().member.findFirst({ + where: { + userId, + organizationId, + }, + }); + }, 60 * 5, ); -export async function getOrganizationAccess({ - userId, - organizationId, -}: { - userId: string; - organizationId: string; -}) { - return db.member.findFirst({ - where: { - userId, - organizationId, - }, - }); -} export const getClientAccessCached = cacheable(getClientAccess, 60 * 5); export async function getClientAccess({ diff --git a/packages/trpc/src/routers/auth.ts b/packages/trpc/src/routers/auth.ts index 5d139d3e4..be264d0d3 100644 --- a/packages/trpc/src/routers/auth.ts +++ b/packages/trpc/src/routers/auth.ts @@ -20,6 +20,7 @@ import { getUserAccount, } from '@openpanel/db'; import { sendEmail } from '@openpanel/email'; +import { deleteCache } from '@openpanel/redis'; import { zRequestResetPassword, zResetPassword, @@ -333,6 +334,7 @@ export const authRouter = createTRPCRouter({ const session = await validateSessionToken(token); if (session.session) { + await deleteCache(`validateSession:${session.session.id}`); // Re-set the cookie with updated expiration setSessionTokenCookie(ctx.setCookie, token, session.session.expiresAt); return { diff --git a/packages/trpc/src/routers/chart.ts b/packages/trpc/src/routers/chart.ts index c4438ac3b..bd4afa33e 100644 --- a/packages/trpc/src/routers/chart.ts +++ b/packages/trpc/src/routers/chart.ts @@ -32,7 +32,7 @@ import { differenceInWeeks, formatISO, } from 'date-fns'; -import { getProjectAccessCached } from '../access'; +import { getProjectAccess } from '../access'; import { TRPCAccessError } from '../errors'; import { cacheMiddleware, @@ -367,7 +367,7 @@ export const chartRouter = createTRPCRouter({ .input(zChartInput) .query(async ({ input, ctx }) => { if (ctx.session.userId) { - const access = await getProjectAccessCached({ + const access = await getProjectAccess({ projectId: input.projectId, userId: ctx.session.userId, }); diff --git a/packages/trpc/src/routers/event.ts b/packages/trpc/src/routers/event.ts index d9f299bec..0506fe6c7 100644 --- a/packages/trpc/src/routers/event.ts +++ b/packages/trpc/src/routers/event.ts @@ -27,7 +27,7 @@ import { } from '@openpanel/validation'; import { clone } from 'ramda'; -import { getProjectAccessCached } from '../access'; +import { getProjectAccess } from '../access'; import { TRPCAccessError } from '../errors'; import { createTRPCRouter, protectedProcedure, publicProcedure } from '../trpc'; @@ -266,7 +266,7 @@ export const eventRouter = createTRPCRouter({ ) .query(async ({ input: { projectId, cursor, limit }, ctx }) => { if (ctx.session.userId) { - const access = await getProjectAccessCached({ + const access = await getProjectAccess({ projectId, userId: ctx.session.userId, }); diff --git a/packages/trpc/src/routers/integration.ts b/packages/trpc/src/routers/integration.ts index d97be17bf..a25fc586a 100644 --- a/packages/trpc/src/routers/integration.ts +++ b/packages/trpc/src/routers/integration.ts @@ -9,7 +9,7 @@ import { zCreateSlackIntegration, zCreateWebhookIntegration, } from '@openpanel/validation'; -import { getOrganizationAccessCached } from '../access'; +import { getOrganizationAccess } from '../access'; import { TRPCAccessError } from '../errors'; import { createTRPCRouter, protectedProcedure } from '../trpc'; @@ -23,7 +23,7 @@ export const integrationRouter = createTRPCRouter({ }, }); - const access = await getOrganizationAccessCached({ + const access = await getOrganizationAccess({ userId: ctx.session.userId, organizationId: integration.organizationId, }); @@ -122,7 +122,7 @@ export const integrationRouter = createTRPCRouter({ }, }); - const access = await getOrganizationAccessCached({ + const access = await getOrganizationAccess({ userId: ctx.session.userId, organizationId: integration.organizationId, }); diff --git a/packages/trpc/src/trpc.ts b/packages/trpc/src/trpc.ts index a2080d68e..1ffd28364 100644 --- a/packages/trpc/src/trpc.ts +++ b/packages/trpc/src/trpc.ts @@ -4,18 +4,15 @@ import { has } from 'ramda'; import superjson from 'superjson'; import { ZodError } from 'zod'; -import { - COOKIE_OPTIONS, - EMPTY_SESSION, - validateSessionToken, -} from '@openpanel/auth'; -import { getCache, getRedisCache } from '@openpanel/redis'; +import { COOKIE_OPTIONS, type SessionValidationResult } from '@openpanel/auth'; +import { runWithAlsSession } from '@openpanel/db'; +import { getRedisCache } from '@openpanel/redis'; import type { ISetCookie } from '@openpanel/validation'; import { createTrpcRedisLimiter, defaultFingerPrint, } from '@trpc-limiter/redis'; -import { getOrganizationAccessCached, getProjectAccessCached } from './access'; +import { getOrganizationAccess, getProjectAccess } from './access'; import { TRPCAccessError } from './errors'; export const rateLimitMiddleware = ({ @@ -44,10 +41,6 @@ export async function createContext({ req, res }: CreateFastifyContextOptions) { }); }; - const session = cookies?.session - ? await validateSessionToken(cookies.session!) - : EMPTY_SESSION; - if (process.env.NODE_ENV !== 'production') { await new Promise((res) => setTimeout(() => res(1), Math.min(Math.random() * 500, 200)), @@ -57,7 +50,7 @@ export async function createContext({ req, res }: CreateFastifyContextOptions) { return { req, res, - session, + session: (req as any).session as SessionValidationResult, // we do not get types for `setCookie` from fastify // so define it here and be safe in routers setCookie, @@ -102,37 +95,39 @@ const enforceUserIsAuthed = t.middleware(async ({ ctx, next }) => { // Only used on protected routes const enforceAccess = t.middleware(async ({ ctx, next, type, getRawInput }) => { - const rawInput = await getRawInput(); - if (type === 'mutation' && process.env.DEMO_USER_ID) { - throw new TRPCError({ - code: 'UNAUTHORIZED', - message: 'You are not allowed to do this in demo mode', - }); - } + return runWithAlsSession(ctx.session.session?.id, async () => { + const rawInput = await getRawInput(); + if (type === 'mutation' && process.env.DEMO_USER_ID) { + throw new TRPCError({ + code: 'UNAUTHORIZED', + message: 'You are not allowed to do this in demo mode', + }); + } - if (has('projectId', rawInput)) { - const access = await getProjectAccessCached({ - userId: ctx.session.userId!, - projectId: rawInput.projectId as string, - }); + if (has('projectId', rawInput)) { + const access = await getProjectAccess({ + userId: ctx.session.userId!, + projectId: rawInput.projectId as string, + }); - if (!access) { - throw TRPCAccessError('You do not have access to this project'); + if (!access) { + throw TRPCAccessError('You do not have access to this project'); + } } - } - if (has('organizationId', rawInput)) { - const access = await getOrganizationAccessCached({ - userId: ctx.session.userId!, - organizationId: rawInput.organizationId as string, - }); + if (has('organizationId', rawInput)) { + const access = await getOrganizationAccess({ + userId: ctx.session.userId!, + organizationId: rawInput.organizationId as string, + }); - if (!access) { - throw TRPCAccessError('You do not have access to this organization'); + if (!access) { + throw TRPCAccessError('You do not have access to this organization'); + } } - } - return next(); + return next(); + }); }); export const createTRPCRouter = t.router; @@ -157,11 +152,21 @@ const loggerMiddleware = t.middleware( }, ); -export const publicProcedure = t.procedure.use(loggerMiddleware); +const sessionScopeMiddleware = t.middleware(async ({ ctx, next }) => { + const sessionId = ctx.session.session?.id ?? null; + return runWithAlsSession(sessionId, async () => { + return next(); + }); +}); + +export const publicProcedure = t.procedure + .use(loggerMiddleware) + .use(sessionScopeMiddleware); export const protectedProcedure = t.procedure .use(enforceUserIsAuthed) .use(enforceAccess) - .use(loggerMiddleware); + .use(loggerMiddleware) + .use(sessionScopeMiddleware); const middlewareMarker = 'middlewareMarker' as 'middlewareMarker' & { __brand: 'middlewareMarker'; From e473772a110fabeb488b3cbaa308a9225f5a0e51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Mon, 20 Oct 2025 09:56:23 +0200 Subject: [PATCH 2/4] fix: coderabbit comments --- apps/api/src/index.ts | 10 ++- packages/auth/src/session.ts | 9 +- packages/db/src/session-consistency.ts | 116 +++++++++++++------------ packages/trpc/src/routers/auth.ts | 1 + 4 files changed, 76 insertions(+), 60 deletions(-) diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index a6a483c3d..7592c9dea 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -144,11 +144,13 @@ const startServer = async () => { instance.addHook('onRequest', async (req) => { if (req.cookies?.session) { try { - const sessionId = await decodeSessionToken(req.cookies.session); + const sessionId = decodeSessionToken(req.cookies.session); const session = await runWithAlsSession(sessionId, () => - getCache(`validateSession:${sessionId}`, 60 * 5, async () => - validateSessionToken(req.cookies.session), - ), + sessionId + ? getCache(`validateSession:${sessionId}`, 60 * 5, async () => + validateSessionToken(req.cookies.session), + ) + : validateSessionToken(req.cookies.session), ); if (session.session) { req.session = session; diff --git a/packages/auth/src/session.ts b/packages/auth/src/session.ts index d31b4a476..3d4bcebc2 100644 --- a/packages/auth/src/session.ts +++ b/packages/auth/src/session.ts @@ -59,8 +59,10 @@ export async function createDemoSession( }; } -export const decodeSessionToken = (token: string): string => { - return encodeHexLowerCase(sha256(new TextEncoder().encode(token))); +export const decodeSessionToken = (token: string): string | null => { + return token + ? encodeHexLowerCase(sha256(new TextEncoder().encode(token))) + : null; }; export async function validateSessionToken( @@ -74,6 +76,9 @@ export async function validateSessionToken( return EMPTY_SESSION; } const sessionId = decodeSessionToken(token); + if (!sessionId) { + return EMPTY_SESSION; + } const result = await db.$primary().session.findUnique({ where: { id: sessionId, diff --git a/packages/db/src/session-consistency.ts b/packages/db/src/session-consistency.ts index 5a3c4467c..b833746a3 100644 --- a/packages/db/src/session-consistency.ts +++ b/packages/db/src/session-consistency.ts @@ -4,6 +4,10 @@ import { Prisma, type PrismaClient } from './generated/prisma/client'; import { logger } from './logger'; import { getAlsSessionId } from './session-context'; +type BarePrismaClient = { + $queryRaw: (query: TemplateStringsArray, ...args: unknown[]) => Promise; +}; + // WAL LSN tracking for read-after-write consistency const LSN_CACHE_PREFIX = 'db:wal_lsn:'; const LSN_CACHE_TTL = 5; @@ -39,7 +43,7 @@ const isReadOperation = (operation: string) => READ_OPERATIONS.includes(operation as Operation); async function getCurrentWalLsn( - prismaClient: PrismaClient, + prismaClient: BarePrismaClient, ): Promise { try { const result = await prismaClient.$queryRaw<[{ lsn: string }]>` @@ -91,10 +95,11 @@ async function sleep(ms: number): Promise { } async function waitForReplicaCatchup( - prismaClient: PrismaClient, + prismaClient: BarePrismaClient, sessionId: string, ): Promise { const expectedLsn = await getCachedWalLsn(sessionId); + if (!expectedLsn) { return true; } @@ -154,63 +159,66 @@ async function waitForReplicaCatchup( * */ export function sessionConsistency() { - return Prisma.defineExtension({ - name: 'session-consistency', - query: { - $allOperations: async ({ - operation, - model, - args, - query, - // This is a hack to force reads to primary when replica hasn't caught up. - // The readReplicas extension routes queries to primary when in a transaction, - // so we set __internalParams.transaction = true to achieve this. - // @ts-expect-error - __internalParams is not in the types - __internalParams, - }) => { - const sessionId = getAlsSessionId(); - - // For write operations with session: cache WAL LSN after write - if (isWriteOperation(operation)) { - logger.info('Prisma operation', { - operation, - args, - model, - }); - - const result = await query(args); - - if (sessionId) { - // Get current WAL LSN and cache it for this session - // @ts-expect-error - 'this' refers to the Prisma client - const lsn = await getCurrentWalLsn(this); - if (lsn) { - await cacheWalLsnForSession(sessionId, lsn); - logger.debug('Cached WAL LSN after write', { - sessionId, - lsn, - operation, - model, - }); + return Prisma.defineExtension((client) => + client.$extends({ + name: 'session-consistency', + query: { + $allOperations: async ({ + operation, + model, + args, + query, + // This is a hack to force reads to primary when replica hasn't caught up. + // The readReplicas extension routes queries to primary when in a transaction, + // so we set __internalParams.transaction = true to achieve this. + // @ts-expect-error - __internalParams is not in the types + __internalParams, + }) => { + const sessionId = getAlsSessionId(); + + // For write operations with session: cache WAL LSN after write + if (isWriteOperation(operation)) { + logger.info('Prisma operation', { + operation, + args, + model, + }); + + const result = await query(args); + + if (sessionId) { + // Get current WAL LSN and cache it for this session + const lsn = await getCurrentWalLsn(client); + if (lsn) { + await cacheWalLsnForSession(sessionId, lsn); + logger.debug('Cached WAL LSN after write', { + sessionId, + lsn, + operation, + model, + }); + } } - } - return result; - } + return result; + } - // For read operations with session: try replica first, fallback to primary - if (isReadOperation(operation) && sessionId) { - // @ts-expect-error - 'this' refers to the Prisma client - const replicaCaughtUp = await waitForReplicaCatchup(this, sessionId); + // For read operations with session: try replica first, fallback to primary + if (isReadOperation(operation) && sessionId) { + const replicaCaughtUp = await waitForReplicaCatchup( + client, + sessionId, + ); - if (!replicaCaughtUp) { - // This will force readReplicas extension to use primary - __internalParams.transaction = true; + if (!replicaCaughtUp) { + // This will force readReplicas extension to use primary + __internalParams.transaction = true; + } } - } - return query(args); + return query(args); + }, }, - }, - }); + }), + ); } diff --git a/packages/trpc/src/routers/auth.ts b/packages/trpc/src/routers/auth.ts index be264d0d3..ea4d7f8d7 100644 --- a/packages/trpc/src/routers/auth.ts +++ b/packages/trpc/src/routers/auth.ts @@ -75,6 +75,7 @@ export const authRouter = createTRPCRouter({ deleteSessionTokenCookie(ctx.setCookie); if (ctx.session?.session?.id) { await invalidateSession(ctx.session.session.id); + await deleteCache(`validateSession:${ctx.session.session.id}`); } }), signInOAuth: publicProcedure From cdca52156ae243f7b3d85598da5cef4cccdbed0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Fri, 31 Oct 2025 08:01:18 +0100 Subject: [PATCH 3/4] fix: clear cache on invite --- packages/db/index.ts | 1 + packages/db/src/services/access.service.ts | 96 +++++++++++++++++ .../db/src/services/organization.service.ts | 9 +- packages/trpc/src/access.ts | 101 +----------------- 4 files changed, 110 insertions(+), 97 deletions(-) create mode 100644 packages/db/src/services/access.service.ts diff --git a/packages/db/index.ts b/packages/db/index.ts index 522f12998..b0aef4e3d 100644 --- a/packages/db/index.ts +++ b/packages/db/index.ts @@ -19,6 +19,7 @@ export * from './src/services/reference.service'; export * from './src/services/id.service'; export * from './src/services/retention.service'; export * from './src/services/notification.service'; +export * from './src/services/access.service'; export * from './src/buffers'; export * from './src/types'; export * from './src/clickhouse/query-builder'; diff --git a/packages/db/src/services/access.service.ts b/packages/db/src/services/access.service.ts new file mode 100644 index 000000000..0b66ae64d --- /dev/null +++ b/packages/db/src/services/access.service.ts @@ -0,0 +1,96 @@ +import { cacheable } from '@openpanel/redis'; +import { db } from '../prisma-client'; +import { getProjectById } from './project.service'; + +export const getProjectAccess = cacheable( + 'getProjectAccess', + async ({ + userId, + projectId, + }: { + userId: string; + projectId: string; + }) => { + try { + // Check if user has access to the project + const project = await getProjectById(projectId); + if (!project?.organizationId) { + return false; + } + + const [projectAccess, member] = await Promise.all([ + db.$primary().projectAccess.findMany({ + where: { + userId, + organizationId: project.organizationId, + }, + }), + db.$primary().member.findFirst({ + where: { + organizationId: project.organizationId, + userId, + }, + }), + ]); + + if (projectAccess.length === 0 && member) { + return true; + } + + return projectAccess.find((item) => item.projectId === projectId); + } catch (err) { + return false; + } + }, + 60 * 5, +); + +export const getOrganizationAccess = cacheable( + 'getOrganizationAccess', + async ({ + userId, + organizationId, + }: { + userId: string; + organizationId: string; + }) => { + return db.$primary().member.findFirst({ + where: { + userId, + organizationId, + }, + }); + }, + 60 * 5, +); + +export async function getClientAccess({ + userId, + clientId, +}: { + userId: string; + clientId: string; +}) { + const client = await db.client.findFirst({ + where: { + id: clientId, + }, + }); + + if (!client) { + return false; + } + + if (client.projectId) { + return getProjectAccess({ userId, projectId: client.projectId }); + } + + if (client.organizationId) { + return getOrganizationAccess({ + userId, + organizationId: client.organizationId, + }); + } + + return false; +} diff --git a/packages/db/src/services/organization.service.ts b/packages/db/src/services/organization.service.ts index 5a3732eef..e96a9003c 100644 --- a/packages/db/src/services/organization.service.ts +++ b/packages/db/src/services/organization.service.ts @@ -5,7 +5,8 @@ import { chQuery, formatClickhouseDate } from '../clickhouse/client'; import type { Invite, Prisma, ProjectAccess, User } from '../prisma-client'; import { db } from '../prisma-client'; import { createSqlBuilder } from '../sql-builder'; -import type { IServiceProject } from './project.service'; +import { getOrganizationAccess, getProjectAccess } from './access.service'; +import { type IServiceProject, getProjectById } from './project.service'; export type IServiceOrganization = Awaited< ReturnType >; @@ -168,8 +169,14 @@ export async function connectUserToOrganization({ }, }); + await getOrganizationAccess.clear({ + userId: user.id, + organizationId: invite.organizationId, + }); + if (invite.projectAccess.length > 0) { for (const projectId of invite.projectAccess) { + await getProjectAccess.clear({ userId: user.id, projectId }); await db.projectAccess.create({ data: { projectId, diff --git a/packages/trpc/src/access.ts b/packages/trpc/src/access.ts index 64d9249ed..29e24fd08 100644 --- a/packages/trpc/src/access.ts +++ b/packages/trpc/src/access.ts @@ -1,96 +1,5 @@ -import { db, getProjectById } from '@openpanel/db'; -import { cacheable } from '@openpanel/redis'; - -export const getProjectAccess = cacheable( - 'getProjectAccess', - async ({ - userId, - projectId, - }: { - userId: string; - projectId: string; - }) => { - try { - // Check if user has access to the project - const project = await getProjectById(projectId); - if (!project?.organizationId) { - return false; - } - - const [projectAccess, member] = await Promise.all([ - db.$primary().projectAccess.findMany({ - where: { - userId, - organizationId: project.organizationId, - }, - }), - db.$primary().member.findFirst({ - where: { - organizationId: project.organizationId, - userId, - }, - }), - ]); - - if (projectAccess.length === 0 && member) { - return true; - } - - return projectAccess.find((item) => item.projectId === projectId); - } catch (err) { - return false; - } - }, - 60 * 5, -); - -export const getOrganizationAccess = cacheable( - 'getOrganizationAccess', - async ({ - userId, - organizationId, - }: { - userId: string; - organizationId: string; - }) => { - return db.$primary().member.findFirst({ - where: { - userId, - organizationId, - }, - }); - }, - 60 * 5, -); - -export const getClientAccessCached = cacheable(getClientAccess, 60 * 5); -export async function getClientAccess({ - userId, - clientId, -}: { - userId: string; - clientId: string; -}) { - const client = await db.client.findFirst({ - where: { - id: clientId, - }, - }); - - if (!client) { - return false; - } - - if (client.projectId) { - return getProjectAccess({ userId, projectId: client.projectId }); - } - - if (client.organizationId) { - return getOrganizationAccess({ - userId, - organizationId: client.organizationId, - }); - } - - return false; -} +export { + getOrganizationAccess, + getProjectAccess, + getClientAccess, +} from '@openpanel/db'; From 286ba59c9e58644160bfb31624ba840d13325c02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl-Gerhard=20Lindesva=CC=88rd?= Date: Fri, 31 Oct 2025 09:39:23 +0100 Subject: [PATCH 4/4] fix: use primary after a read --- packages/db/src/session-consistency.ts | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/packages/db/src/session-consistency.ts b/packages/db/src/session-consistency.ts index b833746a3..68781c9d9 100644 --- a/packages/db/src/session-consistency.ts +++ b/packages/db/src/session-consistency.ts @@ -94,6 +94,8 @@ async function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } +// Method not used for now, +// Need a way to check LSN on the actual replica that will be used for the read. async function waitForReplicaCatchup( prismaClient: BarePrismaClient, sessionId: string, @@ -203,17 +205,16 @@ export function sessionConsistency() { return result; } - // For read operations with session: try replica first, fallback to primary - if (isReadOperation(operation) && sessionId) { - const replicaCaughtUp = await waitForReplicaCatchup( - client, - sessionId, - ); - - if (!replicaCaughtUp) { - // This will force readReplicas extension to use primary - __internalParams.transaction = true; - } + // For now, we just force the read to the primary without checking the replica + // Since the check probably goes to the primary anyways it will always be true, + // Not sure how to check LSN on the actual replica that will be used for the read. + if ( + isReadOperation(operation) && + sessionId && + (await getCachedWalLsn(sessionId)) + ) { + // This will force readReplicas extension to use primary + __internalParams.transaction = true; } return query(args);