Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apps/backend/.env.development
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ STACK_SPOTIFY_CLIENT_SECRET=MOCK
STACK_ALLOW_SHARED_OAUTH_ACCESS_TOKENS=true

STACK_DATABASE_CONNECTION_STRING=postgres://postgres:PASSWORD-PLACEHOLDER--uqfEC1hmmv@localhost:${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}28/stackframe
STACK_DATABASE_REPLICA_CONNECTION_STRING=postgres://readonly:PASSWORD-PLACEHOLDER--readonlyuqfEC1hmmv@localhost:${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}28/stackframe
STACK_DATABASE_REPLICA_CONNECTION_STRING=postgres://postgres:PASSWORD-PLACEHOLDER--uqfEC1hmmv@localhost:${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}34/stackframe
STACK_DATABASE_REPLICATION_WAIT_STRATEGY=pg-stat-replication
Comment thread
N2D4 marked this conversation as resolved.

STACK_EMAIL_HOST=127.0.0.1
STACK_EMAIL_PORT=${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}29
Expand Down
7 changes: 7 additions & 0 deletions apps/backend/src/app/page.tsx
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { getNodeEnvironment } from "@stackframe/stack-shared/dist/utils/env";
import Link from "next/link";

export default function Home() {
Expand All @@ -10,6 +11,12 @@ export default function Home() {
You can also return to <Link href="https://stack-auth.com">https://stack-auth.com</Link>.<br />
<br />
<Link href="/api/v1">API v1</Link><br />
{getNodeEnvironment() === "development" && (
<>
<br />
<Link href="/dev-stats">Dev Stats</Link><br />
</>
)}
</div>
);
}
128 changes: 124 additions & 4 deletions apps/backend/src/prisma-client.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import { PrismaNeon } from "@prisma/adapter-neon";
import { PrismaPg } from '@prisma/adapter-pg';
import { readReplicas } from '@prisma/extension-read-replicas';
import { CompleteConfig } from "@stackframe/stack-shared/dist/config/schema";
import { yupObject, yupValidate } from "@stackframe/stack-shared/dist/schema-fields";
import { getEnvVariable, getNodeEnvironment } from '@stackframe/stack-shared/dist/utils/env';
import { StackAssertionError } from "@stackframe/stack-shared/dist/utils/errors";
import { captureError, StackAssertionError, throwErr } from "@stackframe/stack-shared/dist/utils/errors";
import { globalVar } from "@stackframe/stack-shared/dist/utils/globals";
import { deepPlainEquals, filterUndefined, typedFromEntries, typedKeys } from "@stackframe/stack-shared/dist/utils/objects";
import { concatStacktracesIfRejected, ignoreUnhandledRejection } from "@stackframe/stack-shared/dist/utils/promises";
import { concatStacktracesIfRejected, ignoreUnhandledRejection, wait } from "@stackframe/stack-shared/dist/utils/promises";
import { throwingProxy } from "@stackframe/stack-shared/dist/utils/proxies";
import { Result } from "@stackframe/stack-shared/dist/utils/results";
import { traceSpan } from "@stackframe/stack-shared/dist/utils/telemetry";
Expand Down Expand Up @@ -144,10 +145,130 @@ export type PrismaClientWithReplica<T extends PrismaClient = PrismaClient> = Omi
$replica: () => Omit<T, "$on">,
};


/**
* Waits until ALL replicas have caught up to the specified WAL LSN.
* This ensures read-after-write consistency when using read replicas.
*
* Strategy types (STACK_DATABASE_REPLICATION_WAIT_STRATEGY):
* - "none": Don't wait for replication (default)
* - "pg-stat-replication": Use pg_stat_replication (for local dev with streaming replication)
Comment thread
N2D4 marked this conversation as resolved.
* - "aurora": Use aurora_replica_status() (for AWS Aurora)
*/
async function waitForReplication(primary: PrismaClient, lsn: string): Promise<void> {
const strategy = getEnvVariable("STACK_DATABASE_REPLICATION_WAIT_STRATEGY", "none");
return await traceSpan({
description: 'waiting for replication',
attributes: {
'stack.db-replication.strategy': strategy,
'stack.db-replication.lsn': lsn,
},
}, async () => {
if (strategy === "none") {
return;
}

const minLsnSubquery = {
"pg-stat-replication": `(SELECT MIN(replay_lsn) FROM pg_stat_replication)`,
"aurora": `(SELECT MIN(current_read_lsn::pg_lsn) FROM aurora_replica_status())`,
}[strategy] ?? throwErr(`Unknown replication wait strategy: ${strategy}`);

// Validate LSN format (format: hex/hex, e.g., "0/1234ABC"). We do this just to be extra safe as we're using $queryRawUnsafe later
if (!/^[0-9A-Fa-f]+\/[0-9A-Fa-f]+$/.test(lsn)) {
throw new StackAssertionError(`Invalid LSN format: ${lsn}`);
}

// Poll until all replicas have caught up to the target LSN
// Using $queryRawUnsafe because DO blocks don't support bind parameters
await (primary as any).$queryRawUnsafe(`
DO $$
DECLARE
min_replica_lsn pg_lsn;
BEGIN
LOOP
SELECT ${minLsnSubquery} INTO min_replica_lsn;
Comment thread
N2D4 marked this conversation as resolved.

-- Exit if no replicas connected or all replicas have caught up
IF min_replica_lsn IS NULL OR min_replica_lsn >= '${lsn}'::pg_lsn THEN
EXIT;
END IF;

-- Wait 10ms and check again
PERFORM pg_sleep(0.01);
END LOOP;
Comment thread
N2D4 marked this conversation as resolved.
END $$;
Comment thread
N2D4 marked this conversation as resolved.
`);
Comment thread
N2D4 marked this conversation as resolved.
Comment thread
N2D4 marked this conversation as resolved.
});
Comment thread
N2D4 marked this conversation as resolved.
}

/**
* Extends a Prisma client to wait for replication after all operations.
* This ensures read-after-write consistency when using a read replica.
*/
function extendWithReplicationWait<T extends PrismaClient>(client: T): T {
const strategy = getEnvVariable("STACK_DATABASE_REPLICATION_WAIT_STRATEGY", "none");
if (strategy === "none") {
return client;
}

const readLsnAndWaitForReplication = async (client: PrismaClient) => {
await traceSpan({
description: 'getting current LSN and waiting for replication',
attributes: {
'stack.db-replication.strategy': strategy,
},
}, async (span) => {
try {
const [{ lsn }] = await (client as any).$queryRaw<[{ lsn: string }]>`SELECT pg_current_wal_lsn()::text AS lsn`;
await waitForReplication(client, lsn);
} catch (e) {
span.setAttribute('stack.db-replication.error', `${e}`);
captureError("prisma-client-replication-error", new StackAssertionError("Error getting current LSN and waiting for replication. We'll just wait 50ms instead, but please fix this as the replication may not be working.", { cause: e }));
await wait(50);
}
});
};

return client.$extends({
client: {
async $transaction(...args: Parameters<PrismaClient['$transaction']>) {
// eslint-disable-next-line no-restricted-syntax
const result = await client.$transaction(...args);
await readLsnAndWaitForReplication(client);
return result;
},
Comment thread
cursor[bot] marked this conversation as resolved.
},
query: {
async $allOperations(params: { args: any, query: (args: any) => Promise<any>, operation: string, model?: string, __internalParams?: unknown }) {
const { args, query, operation, model } = params;
Comment thread
N2D4 marked this conversation as resolved.
Comment thread
N2D4 marked this conversation as resolved.

// __internalParams is an undocumented property, so let's validate that it fits our schema with yup first
const internalParamsSchema = yupObject({
transaction: yupObject().nullable(),
}).defined();
const internalParams = await yupValidate(internalParamsSchema, params.__internalParams);
Comment thread
N2D4 marked this conversation as resolved.
Comment thread
N2D4 marked this conversation as resolved.

if (internalParams.transaction) {
Comment thread
N2D4 marked this conversation as resolved.
Comment thread
N2D4 marked this conversation as resolved.
// we're inside a transaction, so we don't need to wait for replication
return await query(args);
}

const result = await query(args);
await readLsnAndWaitForReplication(client);
return result;
},
Comment thread
N2D4 marked this conversation as resolved.
},
}) as T;
Comment thread
N2D4 marked this conversation as resolved.
}

function extendWithReadReplicas<T extends PrismaClient>(client: T, replicaConnectionString: string): PrismaClientWithReplica<T> {
// Create a separate PrismaClient for the read replica
const replicaClient = getPostgresPrismaClient(replicaConnectionString).client;
return client.$extends(readReplicas({

// First extend with replication wait, then with read replicas
const clientWithReplicationWait = extendWithReplicationWait(client);

return clientWithReplicationWait.$extends(readReplicas({
replicas: [replicaClient],
})) as PrismaClientWithReplica<T>;
}
Expand Down Expand Up @@ -441,7 +562,6 @@ async function rawQueryArray<Q extends RawQuery<any>[]>(tx: PrismaClientTransact
const queryClient = allReadOnly && '$replica' in tx
? (tx as any).$replica()
: tx;
// eslint-disable-next-line no-restricted-syntax -- $queryRaw is allowed here
const rawResult = await queryClient.$queryRaw(sqlQuery);

const postProcessed = combinedQuery.postProcess(rawResult as any);
Expand Down
10 changes: 10 additions & 0 deletions apps/dev-launchpad/public/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ <h2 style="margin-top: 64px;">Background services</h2>

const backgroundServices = [
{ suffix: "28", label: "PostgreSQL" },
{ suffix: "34", label: "PostgreSQL Replica (15ms lag)" },
{ suffix: "29", label: "Inbucket SMTP" },
{ suffix: "30", label: "Inbucket POP3" },
{ suffix: "31", label: "OTel collector" },
Expand Down Expand Up @@ -267,6 +268,15 @@ <h2 style="margin-top: 64px;">Background services</h2>
importance: 1,
img: "https://pghero.dokkuapp.com/assets/pghero-88a0d052.png",
},
{
name: "PgHero (Replica)",
portSuffix: "35",
description: [
"For replica database performance analysis",
],
importance: 1,
img: "https://pghero.dokkuapp.com/assets/pghero-88a0d052.png",
},
{
name: "PgAdmin",
portSuffix: "17",
Expand Down
19 changes: 13 additions & 6 deletions apps/e2e/tests/backend/endpoints/api/v1/emails/email-queue.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { wait } from "@stackframe/stack-shared/dist/utils/promises";
import { deindent } from "@stackframe/stack-shared/dist/utils/strings";
import { deindent, nicify } from "@stackframe/stack-shared/dist/utils/strings";
import beautify from "js-beautify";
import { describe } from "vitest";
import { it, logIfTestFails } from "../../../../../helpers";
Expand Down Expand Up @@ -1689,9 +1689,10 @@ describe("email outbox pagination", () => {
const draftId = createDraftResponse.body.id;

// Create 5 users
const mailboxes = await Promise.all(Array.from({ length: 5 }, async () => await bumpEmailAddress()));
const userIds: string[] = [];
for (let i = 0; i < 5; i++) {
const email = `pagination-test-${i}@example.com`;
const email = mailboxes[i].emailAddress;
const createUserResponse = await niceBackendFetch("/api/v1/users", {
method: "POST",
accessType: "server",
Expand All @@ -1715,12 +1716,18 @@ describe("email outbox pagination", () => {
});
expect(sendResponse.status).toBe(200);

// Wait until all emails are sent
for (const mailbox of mailboxes) {
await mailbox.waitForMessagesWithSubject("Pagination Test Email");
}


// Ensure there are 5 emails in the outbox
const allResponse = await niceBackendFetch("/api/v1/emails/outbox", {
method: "GET",
accessType: "server",
});
logIfTestFails({ allResponse });
logIfTestFails("allResponse", nicify(allResponse));
expect(allResponse.status).toBe(200);
expect(allResponse.body.items.length).toBe(5);

Expand All @@ -1729,7 +1736,7 @@ describe("email outbox pagination", () => {
method: "GET",
accessType: "server",
});
logIfTestFails({ page1Response });
logIfTestFails("page1Response", nicify(page1Response));
expect(page1Response.status).toBe(200);
expect(page1Response.body.items.length).toBe(2);
expect(page1Response.body.is_paginated).toBe(true);
Expand All @@ -1741,7 +1748,7 @@ describe("email outbox pagination", () => {
method: "GET",
accessType: "server",
});
logIfTestFails({ page2Response });
logIfTestFails("page2Response", nicify(page2Response));
expect(page2Response.status).toBe(200);
expect(page2Response.body.items.length).toBe(2);

Expand All @@ -1758,7 +1765,7 @@ describe("email outbox pagination", () => {
method: "GET",
accessType: "server",
});
logIfTestFails({ page3Response });
logIfTestFails("page3Response", nicify(page3Response));
expect(page3Response.status).toBe(200);
expect(page3Response.body.items.length).toBe(1); // Only 1 remaining
expect(page3Response.body.pagination.next_cursor).toBeNull(); // No more pages
Expand Down
30 changes: 30 additions & 0 deletions docker/dependencies/docker.compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,24 @@ services:
cap_add:
- NET_ADMIN # required for the fake latency during dev

# ================= PostgreSQL Replica (with replication lag) =================

db-replica:
build: ../dev-postgres-replica
environment:
PGDATA: /var/lib/postgresql/data
PRIMARY_HOST: db
PRIMARY_PORT: 5432
REPLICATOR_USER: replicator
REPLICATOR_PASSWORD: PASSWORD-PLACEHOLDER--replicatorpass
Comment thread
N2D4 marked this conversation as resolved.
RECOVERY_MIN_APPLY_DELAY: ${STACK_DATABASE_REPLICA_LAG_MS:-15}ms
ports:
- "${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}34:5432"
volumes:
- postgres-replica-data:/var/lib/postgresql/data
depends_on:
- db
Comment thread
N2D4 marked this conversation as resolved.

# ================= PgHero =================

pghero:
Expand All @@ -28,6 +46,17 @@ services:
ports:
- "${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}16:8080"

# ================= PgHero (Replica) =================

pghero-replica:
image: ankane/pghero:latest
environment:
DATABASE_URL: postgres://postgres:PASSWORD-PLACEHOLDER--uqfEC1hmmv@db-replica:5432/stackframe
ports:
- "${NEXT_PUBLIC_STACK_PORT_PREFIX:-81}35:8080"
depends_on:
- db-replica

# ================= PgAdmin =================

pgadmin:
Expand Down Expand Up @@ -224,6 +253,7 @@ services:

volumes:
postgres-data:
postgres-replica-data:
inbucket-data:
svix-redis-data:
svix-postgres-data:
Expand Down
7 changes: 7 additions & 0 deletions docker/dev-postgres-replica/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM postgres:15

# Copy the entrypoint script
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh

ENTRYPOINT ["/entrypoint.sh"]
60 changes: 60 additions & 0 deletions docker/dev-postgres-replica/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#!/bin/bash
set -e

# Configuration from environment variables
PRIMARY_HOST="${PRIMARY_HOST:-db}"
PRIMARY_PORT="${PRIMARY_PORT:-5432}"
REPLICATOR_USER="${REPLICATOR_USER:-replicator}"
REPLICATOR_PASSWORD="${REPLICATOR_PASSWORD:-PASSWORD-PLACEHOLDER--replicatorpass}"
RECOVERY_MIN_APPLY_DELAY="${RECOVERY_MIN_APPLY_DELAY:-100ms}"

echo "Starting PostgreSQL replica with ${RECOVERY_MIN_APPLY_DELAY} apply delay..."

# Wait for primary to be ready
echo "Waiting for primary at ${PRIMARY_HOST}:${PRIMARY_PORT}..."
until PGPASSWORD="${REPLICATOR_PASSWORD}" pg_isready -h "${PRIMARY_HOST}" -p "${PRIMARY_PORT}" -U "${REPLICATOR_USER}" 2>/dev/null; do
echo "Primary not ready yet, waiting..."
sleep 2
done
echo "Primary is ready!"

# If PGDATA is empty, do a base backup from primary
if [ -z "$(ls -A ${PGDATA} 2>/dev/null)" ]; then
Comment thread
N2D4 marked this conversation as resolved.
echo "PGDATA is empty, performing base backup from primary..."

# Perform base backup
PGPASSWORD="${REPLICATOR_PASSWORD}" pg_basebackup \
-h "${PRIMARY_HOST}" \
-p "${PRIMARY_PORT}" \
-U "${REPLICATOR_USER}" \
-D "${PGDATA}" \
-Fp \
-Xs \
-P \
-R

echo "Base backup completed!"

# Configure recovery settings with apply delay
cat >> "${PGDATA}/postgresql.auto.conf" <<EOF

# Replica configuration
primary_conninfo = 'host=${PRIMARY_HOST} port=${PRIMARY_PORT} user=${REPLICATOR_USER} password=${REPLICATOR_PASSWORD}'
Comment thread
N2D4 marked this conversation as resolved.
Comment thread
N2D4 marked this conversation as resolved.
recovery_min_apply_delay = ${RECOVERY_MIN_APPLY_DELAY}
hot_standby = on
EOF

# Create standby.signal to indicate this is a standby
touch "${PGDATA}/standby.signal"

# Set proper permissions
chmod 700 "${PGDATA}"
chown -R postgres:postgres "${PGDATA}"

echo "Replica configured with ${RECOVERY_MIN_APPLY_DELAY} apply delay"
else
echo "PGDATA already initialized, starting replica..."
fi

# Start PostgreSQL
exec gosu postgres postgres
Loading
Loading