diff --git a/apps/backend/.env.development b/apps/backend/.env.development index 9a6cbf4746..574ae52fd4 100644 --- a/apps/backend/.env.development +++ b/apps/backend/.env.development @@ -27,7 +27,8 @@ STACK_SPOTIFY_CLIENT_SECRET=MOCK STACK_ALLOW_SHARED_OAUTH_ACCESS_TOKENS=true STACK_DATABASE_CONNECTION_STRING=postgres://postgres:PASSWORD-PLACEHOLDER--uqfEC1hmmv@localhost:${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}28/stackframe -STACK_DATABASE_REPLICA_CONNECTION_STRING=postgres://readonly:PASSWORD-PLACEHOLDER--readonlyuqfEC1hmmv@localhost:${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}28/stackframe +STACK_DATABASE_REPLICA_CONNECTION_STRING=postgres://postgres:PASSWORD-PLACEHOLDER--uqfEC1hmmv@localhost:${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}34/stackframe +STACK_DATABASE_REPLICATION_WAIT_STRATEGY=pg-stat-replication STACK_EMAIL_HOST=127.0.0.1 STACK_EMAIL_PORT=${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}29 diff --git a/apps/backend/src/app/page.tsx b/apps/backend/src/app/page.tsx index ec24a3b745..3c029a59ec 100644 --- a/apps/backend/src/app/page.tsx +++ b/apps/backend/src/app/page.tsx @@ -1,3 +1,4 @@ +import { getNodeEnvironment } from "@stackframe/stack-shared/dist/utils/env"; import Link from "next/link"; export default function Home() { @@ -10,6 +11,12 @@ export default function Home() { You can also return to https://stack-auth.com.

API v1
+ {getNodeEnvironment() === "development" && ( + <> +
+ Dev Stats
+ + )} ); } diff --git a/apps/backend/src/prisma-client.tsx b/apps/backend/src/prisma-client.tsx index 267c09c6ab..13e4c7c282 100644 --- a/apps/backend/src/prisma-client.tsx +++ b/apps/backend/src/prisma-client.tsx @@ -4,11 +4,12 @@ import { PrismaNeon } from "@prisma/adapter-neon"; import { PrismaPg } from '@prisma/adapter-pg'; import { readReplicas } from '@prisma/extension-read-replicas'; import { CompleteConfig } from "@stackframe/stack-shared/dist/config/schema"; +import { yupObject, yupValidate } from "@stackframe/stack-shared/dist/schema-fields"; import { getEnvVariable, getNodeEnvironment } from '@stackframe/stack-shared/dist/utils/env'; -import { StackAssertionError } from "@stackframe/stack-shared/dist/utils/errors"; +import { captureError, StackAssertionError, throwErr } from "@stackframe/stack-shared/dist/utils/errors"; import { globalVar } from "@stackframe/stack-shared/dist/utils/globals"; import { deepPlainEquals, filterUndefined, typedFromEntries, typedKeys } from "@stackframe/stack-shared/dist/utils/objects"; -import { concatStacktracesIfRejected, ignoreUnhandledRejection } from "@stackframe/stack-shared/dist/utils/promises"; +import { concatStacktracesIfRejected, ignoreUnhandledRejection, wait } from "@stackframe/stack-shared/dist/utils/promises"; import { throwingProxy } from "@stackframe/stack-shared/dist/utils/proxies"; import { Result } from "@stackframe/stack-shared/dist/utils/results"; import { traceSpan } from "@stackframe/stack-shared/dist/utils/telemetry"; @@ -144,10 +145,130 @@ export type PrismaClientWithReplica = Omi $replica: () => Omit, }; + +/** + * Waits until ALL replicas have caught up to the specified WAL LSN. + * This ensures read-after-write consistency when using read replicas. + * + * Strategy types (STACK_DATABASE_REPLICATION_WAIT_STRATEGY): + * - "none": Don't wait for replication (default) + * - "pg-stat-replication": Use pg_stat_replication (for local dev with streaming replication) + * - "aurora": Use aurora_replica_status() (for AWS Aurora) + */ +async function waitForReplication(primary: PrismaClient, lsn: string): Promise { + const strategy = getEnvVariable("STACK_DATABASE_REPLICATION_WAIT_STRATEGY", "none"); + return await traceSpan({ + description: 'waiting for replication', + attributes: { + 'stack.db-replication.strategy': strategy, + 'stack.db-replication.lsn': lsn, + }, + }, async () => { + if (strategy === "none") { + return; + } + + const minLsnSubquery = { + "pg-stat-replication": `(SELECT MIN(replay_lsn) FROM pg_stat_replication)`, + "aurora": `(SELECT MIN(current_read_lsn::pg_lsn) FROM aurora_replica_status())`, + }[strategy] ?? throwErr(`Unknown replication wait strategy: ${strategy}`); + + // Validate LSN format (format: hex/hex, e.g., "0/1234ABC"). We do this just to be extra safe as we're using $queryRawUnsafe later + if (!/^[0-9A-Fa-f]+\/[0-9A-Fa-f]+$/.test(lsn)) { + throw new StackAssertionError(`Invalid LSN format: ${lsn}`); + } + + // Poll until all replicas have caught up to the target LSN + // Using $queryRawUnsafe because DO blocks don't support bind parameters + await (primary as any).$queryRawUnsafe(` + DO $$ + DECLARE + min_replica_lsn pg_lsn; + BEGIN + LOOP + SELECT ${minLsnSubquery} INTO min_replica_lsn; + + -- Exit if no replicas connected or all replicas have caught up + IF min_replica_lsn IS NULL OR min_replica_lsn >= '${lsn}'::pg_lsn THEN + EXIT; + END IF; + + -- Wait 10ms and check again + PERFORM pg_sleep(0.01); + END LOOP; + END $$; + `); + }); +} + +/** + * Extends a Prisma client to wait for replication after all operations. + * This ensures read-after-write consistency when using a read replica. + */ +function extendWithReplicationWait(client: T): T { + const strategy = getEnvVariable("STACK_DATABASE_REPLICATION_WAIT_STRATEGY", "none"); + if (strategy === "none") { + return client; + } + + const readLsnAndWaitForReplication = async (client: PrismaClient) => { + await traceSpan({ + description: 'getting current LSN and waiting for replication', + attributes: { + 'stack.db-replication.strategy': strategy, + }, + }, async (span) => { + try { + const [{ lsn }] = await (client as any).$queryRaw<[{ lsn: string }]>`SELECT pg_current_wal_lsn()::text AS lsn`; + await waitForReplication(client, lsn); + } catch (e) { + span.setAttribute('stack.db-replication.error', `${e}`); + captureError("prisma-client-replication-error", new StackAssertionError("Error getting current LSN and waiting for replication. We'll just wait 50ms instead, but please fix this as the replication may not be working.", { cause: e })); + await wait(50); + } + }); + }; + + return client.$extends({ + client: { + async $transaction(...args: Parameters) { + // eslint-disable-next-line no-restricted-syntax + const result = await client.$transaction(...args); + await readLsnAndWaitForReplication(client); + return result; + }, + }, + query: { + async $allOperations(params: { args: any, query: (args: any) => Promise, operation: string, model?: string, __internalParams?: unknown }) { + const { args, query, operation, model } = params; + + // __internalParams is an undocumented property, so let's validate that it fits our schema with yup first + const internalParamsSchema = yupObject({ + transaction: yupObject().nullable(), + }).defined(); + const internalParams = await yupValidate(internalParamsSchema, params.__internalParams); + + if (internalParams.transaction) { + // we're inside a transaction, so we don't need to wait for replication + return await query(args); + } + + const result = await query(args); + await readLsnAndWaitForReplication(client); + return result; + }, + }, + }) as T; +} + function extendWithReadReplicas(client: T, replicaConnectionString: string): PrismaClientWithReplica { // Create a separate PrismaClient for the read replica const replicaClient = getPostgresPrismaClient(replicaConnectionString).client; - return client.$extends(readReplicas({ + + // First extend with replication wait, then with read replicas + const clientWithReplicationWait = extendWithReplicationWait(client); + + return clientWithReplicationWait.$extends(readReplicas({ replicas: [replicaClient], })) as PrismaClientWithReplica; } @@ -441,7 +562,6 @@ async function rawQueryArray[]>(tx: PrismaClientTransact const queryClient = allReadOnly && '$replica' in tx ? (tx as any).$replica() : tx; - // eslint-disable-next-line no-restricted-syntax -- $queryRaw is allowed here const rawResult = await queryClient.$queryRaw(sqlQuery); const postProcessed = combinedQuery.postProcess(rawResult as any); diff --git a/apps/dev-launchpad/public/index.html b/apps/dev-launchpad/public/index.html index f6b50db54c..38f9f0785f 100644 --- a/apps/dev-launchpad/public/index.html +++ b/apps/dev-launchpad/public/index.html @@ -122,6 +122,7 @@

Background services

const backgroundServices = [ { suffix: "28", label: "PostgreSQL" }, + { suffix: "34", label: "PostgreSQL Replica (15ms lag)" }, { suffix: "29", label: "Inbucket SMTP" }, { suffix: "30", label: "Inbucket POP3" }, { suffix: "31", label: "OTel collector" }, @@ -267,6 +268,15 @@

Background services

importance: 1, img: "https://pghero.dokkuapp.com/assets/pghero-88a0d052.png", }, + { + name: "PgHero (Replica)", + portSuffix: "35", + description: [ + "For replica database performance analysis", + ], + importance: 1, + img: "https://pghero.dokkuapp.com/assets/pghero-88a0d052.png", + }, { name: "PgAdmin", portSuffix: "17", diff --git a/apps/e2e/tests/backend/endpoints/api/v1/emails/email-queue.test.ts b/apps/e2e/tests/backend/endpoints/api/v1/emails/email-queue.test.ts index 47cce6cd18..58ec85ce8c 100644 --- a/apps/e2e/tests/backend/endpoints/api/v1/emails/email-queue.test.ts +++ b/apps/e2e/tests/backend/endpoints/api/v1/emails/email-queue.test.ts @@ -1,5 +1,5 @@ import { wait } from "@stackframe/stack-shared/dist/utils/promises"; -import { deindent } from "@stackframe/stack-shared/dist/utils/strings"; +import { deindent, nicify } from "@stackframe/stack-shared/dist/utils/strings"; import beautify from "js-beautify"; import { describe } from "vitest"; import { it, logIfTestFails } from "../../../../../helpers"; @@ -1689,9 +1689,10 @@ describe("email outbox pagination", () => { const draftId = createDraftResponse.body.id; // Create 5 users + const mailboxes = await Promise.all(Array.from({ length: 5 }, async () => await bumpEmailAddress())); const userIds: string[] = []; for (let i = 0; i < 5; i++) { - const email = `pagination-test-${i}@example.com`; + const email = mailboxes[i].emailAddress; const createUserResponse = await niceBackendFetch("/api/v1/users", { method: "POST", accessType: "server", @@ -1715,12 +1716,18 @@ describe("email outbox pagination", () => { }); expect(sendResponse.status).toBe(200); + // Wait until all emails are sent + for (const mailbox of mailboxes) { + await mailbox.waitForMessagesWithSubject("Pagination Test Email"); + } + + // Ensure there are 5 emails in the outbox const allResponse = await niceBackendFetch("/api/v1/emails/outbox", { method: "GET", accessType: "server", }); - logIfTestFails({ allResponse }); + logIfTestFails("allResponse", nicify(allResponse)); expect(allResponse.status).toBe(200); expect(allResponse.body.items.length).toBe(5); @@ -1729,7 +1736,7 @@ describe("email outbox pagination", () => { method: "GET", accessType: "server", }); - logIfTestFails({ page1Response }); + logIfTestFails("page1Response", nicify(page1Response)); expect(page1Response.status).toBe(200); expect(page1Response.body.items.length).toBe(2); expect(page1Response.body.is_paginated).toBe(true); @@ -1741,7 +1748,7 @@ describe("email outbox pagination", () => { method: "GET", accessType: "server", }); - logIfTestFails({ page2Response }); + logIfTestFails("page2Response", nicify(page2Response)); expect(page2Response.status).toBe(200); expect(page2Response.body.items.length).toBe(2); @@ -1758,7 +1765,7 @@ describe("email outbox pagination", () => { method: "GET", accessType: "server", }); - logIfTestFails({ page3Response }); + logIfTestFails("page3Response", nicify(page3Response)); expect(page3Response.status).toBe(200); expect(page3Response.body.items.length).toBe(1); // Only 1 remaining expect(page3Response.body.pagination.next_cursor).toBeNull(); // No more pages diff --git a/docker/dependencies/docker.compose.yaml b/docker/dependencies/docker.compose.yaml index 3764c78b9b..c409521336 100644 --- a/docker/dependencies/docker.compose.yaml +++ b/docker/dependencies/docker.compose.yaml @@ -19,6 +19,24 @@ services: cap_add: - NET_ADMIN # required for the fake latency during dev + # ================= PostgreSQL Replica (with replication lag) ================= + + db-replica: + build: ../dev-postgres-replica + environment: + PGDATA: /var/lib/postgresql/data + PRIMARY_HOST: db + PRIMARY_PORT: 5432 + REPLICATOR_USER: replicator + REPLICATOR_PASSWORD: PASSWORD-PLACEHOLDER--replicatorpass + RECOVERY_MIN_APPLY_DELAY: ${STACK_DATABASE_REPLICA_LAG_MS:-15}ms + ports: + - "${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}34:5432" + volumes: + - postgres-replica-data:/var/lib/postgresql/data + depends_on: + - db + # ================= PgHero ================= pghero: @@ -28,6 +46,17 @@ services: ports: - "${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}16:8080" + # ================= PgHero (Replica) ================= + + pghero-replica: + image: ankane/pghero:latest + environment: + DATABASE_URL: postgres://postgres:PASSWORD-PLACEHOLDER--uqfEC1hmmv@db-replica:5432/stackframe + ports: + - "${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}35:8080" + depends_on: + - db-replica + # ================= PgAdmin ================= pgadmin: @@ -224,6 +253,7 @@ services: volumes: postgres-data: + postgres-replica-data: inbucket-data: svix-redis-data: svix-postgres-data: diff --git a/docker/dev-postgres-replica/Dockerfile b/docker/dev-postgres-replica/Dockerfile new file mode 100644 index 0000000000..4ed8808686 --- /dev/null +++ b/docker/dev-postgres-replica/Dockerfile @@ -0,0 +1,7 @@ +FROM postgres:15 + +# Copy the entrypoint script +COPY entrypoint.sh /entrypoint.sh +RUN chmod +x /entrypoint.sh + +ENTRYPOINT ["/entrypoint.sh"] diff --git a/docker/dev-postgres-replica/entrypoint.sh b/docker/dev-postgres-replica/entrypoint.sh new file mode 100644 index 0000000000..748c2ccd0c --- /dev/null +++ b/docker/dev-postgres-replica/entrypoint.sh @@ -0,0 +1,60 @@ +#!/bin/bash +set -e + +# Configuration from environment variables +PRIMARY_HOST="${PRIMARY_HOST:-db}" +PRIMARY_PORT="${PRIMARY_PORT:-5432}" +REPLICATOR_USER="${REPLICATOR_USER:-replicator}" +REPLICATOR_PASSWORD="${REPLICATOR_PASSWORD:-PASSWORD-PLACEHOLDER--replicatorpass}" +RECOVERY_MIN_APPLY_DELAY="${RECOVERY_MIN_APPLY_DELAY:-100ms}" + +echo "Starting PostgreSQL replica with ${RECOVERY_MIN_APPLY_DELAY} apply delay..." + +# Wait for primary to be ready +echo "Waiting for primary at ${PRIMARY_HOST}:${PRIMARY_PORT}..." +until PGPASSWORD="${REPLICATOR_PASSWORD}" pg_isready -h "${PRIMARY_HOST}" -p "${PRIMARY_PORT}" -U "${REPLICATOR_USER}" 2>/dev/null; do + echo "Primary not ready yet, waiting..." + sleep 2 +done +echo "Primary is ready!" + +# If PGDATA is empty, do a base backup from primary +if [ -z "$(ls -A ${PGDATA} 2>/dev/null)" ]; then + echo "PGDATA is empty, performing base backup from primary..." + + # Perform base backup + PGPASSWORD="${REPLICATOR_PASSWORD}" pg_basebackup \ + -h "${PRIMARY_HOST}" \ + -p "${PRIMARY_PORT}" \ + -U "${REPLICATOR_USER}" \ + -D "${PGDATA}" \ + -Fp \ + -Xs \ + -P \ + -R + + echo "Base backup completed!" + + # Configure recovery settings with apply delay + cat >> "${PGDATA}/postgresql.auto.conf" <> /docker-entrypoint-initd RUN echo "GRANT SELECT ON ALL TABLES IN SCHEMA public TO readonly;" >> /docker-entrypoint-initdb.d/init.sql RUN echo "ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO readonly;" >> /docker-entrypoint-initdb.d/init.sql +# Create a replication user for streaming replication to the replica +RUN echo "CREATE USER replicator WITH REPLICATION PASSWORD 'PASSWORD-PLACEHOLDER--replicatorpass';" >> /docker-entrypoint-initdb.d/init.sql + +# Create a script to add replication permissions to pg_hba.conf after init +# This script runs after the database is initialized but before it starts accepting connections +RUN echo '#!/bin/bash' > /docker-entrypoint-initdb.d/00-setup-replication.sh && \ + echo 'echo "host replication replicator all scram-sha-256" >> "$PGDATA/pg_hba.conf"' >> /docker-entrypoint-initdb.d/00-setup-replication.sh && \ + chmod +x /docker-entrypoint-initdb.d/00-setup-replication.sh + # Add args to Postgres entrypoint ENTRYPOINT ["sh", "-c", "\ # Add delay if POSTGRES_DELAY_MS is set \ @@ -35,6 +44,12 @@ ENTRYPOINT ["sh", "-c", "\ apt-get update && apt-get install -y iproute2 && tc qdisc add dev eth0 root netem delay ${POSTGRES_DELAY_MS}ms; \ fi; \ \ - # Start Postgres with extensions enabled \ - exec docker-entrypoint.sh postgres -c shared_preload_libraries='pg_stat_statements' -c pg_stat_statements.track=all \ + # Start Postgres with replication enabled and extensions \ + exec docker-entrypoint.sh postgres \ + -c shared_preload_libraries='pg_stat_statements' \ + -c pg_stat_statements.track=all \ + -c wal_level=replica \ + -c max_wal_senders=3 \ + -c wal_keep_size=64MB \ + -c hot_standby=on \ "] diff --git a/package.json b/package.json index 474d91eceb..b74b914bd1 100644 --- a/package.json +++ b/package.json @@ -24,7 +24,7 @@ "codegen:backend": "pnpm pre && turbo run codegen --filter=@stackframe/stack-backend...", "deps-compose": "docker compose -p stack-dependencies-${NEXT_PUBLIC_STACK_PORT_PREFIX:-81} -f docker/dependencies/docker.compose.yaml", "stop-deps": "POSTGRES_DELAY_MS=0 pnpm run deps-compose kill && POSTGRES_DELAY_MS=0 pnpm run deps-compose down -v", - "wait-until-postgres-is-ready:pg_isready": "until pg_isready -h localhost -p ${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}28; do sleep 1; done", + "wait-until-postgres-is-ready:pg_isready": "until pg_isready -h localhost -p ${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}28 && pg_isready -h localhost -p ${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}34; do sleep 1; done", "wait-until-postgres-is-ready": "command -v pg_isready >/dev/null 2>&1 && pnpm run wait-until-postgres-is-ready:pg_isready || sleep 10 # not everyone has pg_isready installed, so we fallback to sleeping", "start-deps:no-delay": "pnpm pre && pnpm run deps-compose up --detach --build && pnpm run wait-until-postgres-is-ready && pnpm run db:init && echo \"\\nDependencies started in the background as Docker containers. 'pnpm run stop-deps' to stop them\"n", "start-deps": "POSTGRES_DELAY_MS=${POSTGRES_DELAY_MS:-0} pnpm run start-deps:no-delay",