Skip to content

Commit 2454536

Browse files
committed
fix(paykit): harden listen webhook flow
1 parent 7befdf1 commit 2454536

7 files changed

Lines changed: 139 additions & 43 deletions

File tree

apps/wh/drizzle.config.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import path from "node:path";
44
import { defineConfig } from "drizzle-kit";
55

66
const d1StateDir = path.resolve(process.cwd(), ".wrangler/state/v3/d1/miniflare-D1DatabaseObject");
7+
const isGenerateCommand = process.argv.includes("generate");
78

89
function resolveLocalSqliteFile(): string {
910
const files = fs
@@ -26,6 +27,8 @@ export default defineConfig({
2627
out: "./migrations",
2728
schema: "./src/db/schema.ts",
2829
dbCredentials: {
29-
url: resolveLocalSqliteFile(),
30+
get url() {
31+
return isGenerateCommand ? ":memory:" : resolveLocalSqliteFile();
32+
},
3033
},
3134
});

apps/wh/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"type": "module",
66
"scripts": {
77
"dev": "wrangler dev --local",
8-
"deploy": "if [ -z \"$PAYKIT_WEBHOOK_API_BASE_URL\" ]; then printf '%s\n' 'Missing PAYKIT_WEBHOOK_API_BASE_URL'; exit 1; fi; wrangler deploy --var PAYKIT_WEBHOOK_API_BASE_URL:$PAYKIT_WEBHOOK_API_BASE_URL",
8+
"deploy": "if [ -z \"$PAYKIT_WEBHOOK_API_BASE_URL\" ]; then printf '%s\n' 'Missing PAYKIT_WEBHOOK_API_BASE_URL'; exit 1; fi; wrangler deploy --var \"PAYKIT_WEBHOOK_API_BASE_URL:$PAYKIT_WEBHOOK_API_BASE_URL\"",
99
"db:migrate:local": "wrangler d1 migrations apply paykit-wh --local",
1010
"db:migrate:remote": "wrangler d1 migrations apply paykit-wh --remote",
1111
"db:studio": "drizzle-kit studio --config drizzle.config.ts",

apps/wh/src/index.ts

Lines changed: 55 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,24 @@ async function getOwnedTunnel(params: {
9898
return rows[0] ?? null;
9999
}
100100

101+
function readNumberParam(value: string | undefined, fallback: number): number {
102+
if (value === undefined) {
103+
return fallback;
104+
}
105+
106+
const parsed = Number(value);
107+
return Number.isNaN(parsed) ? fallback : parsed;
108+
}
109+
110+
function readOptionalNumberParam(value: string | undefined): number | undefined {
111+
if (value === undefined) {
112+
return undefined;
113+
}
114+
115+
const parsed = Number(value);
116+
return Number.isNaN(parsed) ? undefined : parsed;
117+
}
118+
101119
function buildPullableDeliveryWhere(params: {
102120
includeFailedBefore?: number;
103121
retryWindowMs: number;
@@ -142,7 +160,9 @@ async function pruneDeliveries(params: {
142160
const maxDeliveries = getNumericVar(params.env.MAX_DELIVERIES_PER_TUNNEL, 5000);
143161
const cutoff = now() - retentionDays * 24 * 60 * 60 * 1000;
144162

145-
await params.db.delete(delivery).where(lt(delivery.receivedAt, cutoff));
163+
await params.db
164+
.delete(delivery)
165+
.where(and(eq(delivery.tunnelId, params.tunnelId), lt(delivery.receivedAt, cutoff)));
146166

147167
const rows = await params.db
148168
.select({ count: count() })
@@ -181,9 +201,11 @@ app.post("/api/tunnels/ensure", async (c) => {
181201
return c.text("providerId, providerAccountId, and environment are required", 400);
182202
}
183203

184-
const retryWindowMs = Math.max(0, Number(body.retryWindowMs ?? 0));
204+
const retryWindowMs = Math.max(0, readNumberParam(String(body.retryWindowMs ?? "0"), 0));
185205
const includeFailedBefore =
186-
typeof body.includeFailedBefore === "number" ? body.includeFailedBefore : undefined;
206+
typeof body.includeFailedBefore === "number" && !Number.isNaN(body.includeFailedBefore)
207+
? body.includeFailedBefore
208+
: undefined;
187209

188210
const createIfMissing = body.createIfMissing !== false;
189211
const existing = await db
@@ -265,9 +287,12 @@ app.get("/api/tunnels/:tunnelId/welcome", async (c) => {
265287
return c.text("Tunnel not found", 404);
266288
}
267289

268-
const retryWindowMs = Math.max(0, Number(c.req.query("retryWindowMs") ?? 0));
269-
const includeFailedBeforeRaw = c.req.query("includeFailedBefore");
270-
const includeFailedBefore = includeFailedBeforeRaw ? Number(includeFailedBeforeRaw) : undefined;
290+
if (current.status === "disabled") {
291+
return c.text("Tunnel disabled", 410);
292+
}
293+
294+
const retryWindowMs = Math.max(0, readNumberParam(c.req.query("retryWindowMs"), 0));
295+
const includeFailedBefore = readOptionalNumberParam(c.req.query("includeFailedBefore"));
271296

272297
return c.json({
273298
pendingCount: await getPullableCount(db, {
@@ -292,6 +317,10 @@ app.post("/api/tunnels/:tunnelId/provider-webhook", async (c) => {
292317
return c.text("Tunnel not found", 404);
293318
}
294319

320+
if (current.status === "disabled") {
321+
return c.text("Tunnel disabled", 410);
322+
}
323+
295324
const body = (await c.req.json()) as { providerWebhookEndpointId?: string };
296325
if (!body.providerWebhookEndpointId) {
297326
return c.text("providerWebhookEndpointId is required", 400);
@@ -318,11 +347,14 @@ app.get("/api/tunnels/:tunnelId/pull", async (c) => {
318347
return c.text("Tunnel not found", 404);
319348
}
320349

321-
const limit = clamp(Number(c.req.query("limit") ?? 30), 1, 100);
322-
const offset = clamp(Number(c.req.query("offset") ?? 0), 0, 10_000);
323-
const retryWindowMs = Math.max(0, Number(c.req.query("retryWindowMs") ?? 0));
324-
const includeFailedBeforeRaw = c.req.query("includeFailedBefore");
325-
const includeFailedBefore = includeFailedBeforeRaw ? Number(includeFailedBeforeRaw) : undefined;
350+
if (current.status === "disabled") {
351+
return c.text("Tunnel disabled", 410);
352+
}
353+
354+
const limit = clamp(readNumberParam(c.req.query("limit"), 30), 1, 100);
355+
const offset = clamp(readNumberParam(c.req.query("offset"), 0), 0, 10_000);
356+
const retryWindowMs = Math.max(0, readNumberParam(c.req.query("retryWindowMs"), 0));
357+
const includeFailedBefore = readOptionalNumberParam(c.req.query("includeFailedBefore"));
326358
const deliveries = await db
327359
.select()
328360
.from(delivery)
@@ -371,6 +403,10 @@ app.get("/api/deliveries/:deliveryId", async (c) => {
371403
return c.text("Delivery not found", 404);
372404
}
373405

406+
if (currentTunnel.status === "disabled") {
407+
return c.text("Tunnel disabled", 410);
408+
}
409+
374410
return c.json({
375411
body: currentDelivery.body,
376412
deliveredAt: currentDelivery.deliveredAt,
@@ -405,6 +441,10 @@ app.post("/api/deliveries/:deliveryId/ack", async (c) => {
405441
return c.text("Delivery not found", 404);
406442
}
407443

444+
if (currentTunnel.status === "disabled") {
445+
return c.text("Tunnel disabled", 410);
446+
}
447+
408448
await db
409449
.update(delivery)
410450
.set({ deliveredAt: now(), error: null, failedAt: null })
@@ -436,6 +476,10 @@ app.post("/api/deliveries/:deliveryId/fail", async (c) => {
436476
return c.text("Delivery not found", 404);
437477
}
438478

479+
if (currentTunnel.status === "disabled") {
480+
return c.text("Tunnel disabled", 410);
481+
}
482+
439483
const body = (await c.req.json()) as { error?: string };
440484
await db
441485
.update(delivery)

packages/paykit/src/cli/commands/listen.ts

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import { capture } from "../utils/telemetry";
1212
const DEFAULT_CLOUD_BASE_URL = "https://wh.paykit.sh";
1313
const DEFAULT_URL = "http://localhost:3000";
1414
const DEFAULT_BATCH_SIZE = 30;
15+
const DEFAULT_ERROR_BACKOFF_MS = 2_000;
16+
const MAX_ERROR_BACKOFF_MS = 15_000;
1517
const DEFAULT_POLL_INTERVAL_MS = 2_000;
1618
const DEFAULT_RETRY_WINDOW = "5m";
1719
const REPLAY_HEADER = "x-paykit-cloud-replay";
@@ -447,6 +449,10 @@ async function processPendingDeliveries(params: {
447449
return { hadDeliveries: true, processedCount: deliveries.length };
448450
}
449451

452+
function getNextErrorBackoff(currentMs: number): number {
453+
return currentMs === 0 ? DEFAULT_ERROR_BACKOFF_MS : Math.min(currentMs * 2, MAX_ERROR_BACKOFF_MS);
454+
}
455+
450456
async function loadRelayRuntimeContext(params: {
451457
configPath?: string;
452458
cwd: string;
@@ -519,31 +525,41 @@ async function listenAction(options: {
519525
}
520526

521527
let mode: "live" | "replay" = "replay";
528+
let errorBackoffMs = 0;
522529

523530
for (;;) {
524-
const deliveries = await pullDeliveries({
525-
deviceToken,
526-
includeFailedBefore: mode === "replay" ? relayStartedAt : undefined,
527-
limit: DEFAULT_BATCH_SIZE,
528-
retryWindowMs: mode === "replay" ? retryWindowMs : 0,
529-
tunnelId: tunnel.tunnelId,
530-
});
531+
try {
532+
const deliveries = await pullDeliveries({
533+
deviceToken,
534+
includeFailedBefore: mode === "replay" ? relayStartedAt : undefined,
535+
limit: DEFAULT_BATCH_SIZE,
536+
retryWindowMs: mode === "replay" ? retryWindowMs : 0,
537+
tunnelId: tunnel.tunnelId,
538+
});
531539

532-
const result = await processPendingDeliveries({
533-
devLogger,
534-
deliveries,
535-
deviceToken,
536-
localWebhookUrl,
537-
mode,
538-
});
540+
const result = await processPendingDeliveries({
541+
devLogger,
542+
deliveries,
543+
deviceToken,
544+
localWebhookUrl,
545+
mode,
546+
});
539547

540-
if (!result.hadDeliveries && mode === "replay") {
541-
devLogger.info("replay complete, listening for new webhooks");
542-
mode = "live";
543-
continue;
544-
}
548+
errorBackoffMs = 0;
549+
550+
if (!result.hadDeliveries && mode === "replay") {
551+
devLogger.info("replay complete, listening for new webhooks");
552+
mode = "live";
553+
continue;
554+
}
545555

546-
await sleep(result.processedCount > 0 ? 250 : DEFAULT_POLL_INTERVAL_MS);
556+
await sleep(result.processedCount > 0 ? 250 : DEFAULT_POLL_INTERVAL_MS);
557+
} catch (error) {
558+
const message = error instanceof Error ? error.message : String(error);
559+
devLogger.warn(`Listen loop failed: ${message}`);
560+
errorBackoffMs = getNextErrorBackoff(errorBackoffMs);
561+
await sleep(errorBackoffMs);
562+
}
547563
}
548564
}
549565

@@ -573,9 +589,8 @@ async function enableAction(options: { config?: string; cwd: string; url: string
573589
devLogger.update("Ensuring webhook endpoint");
574590
const { webhookSecret } = await syncProviderWebhook({ deviceToken, provider, tunnel });
575591

576-
const localWebhookUrl = buildLocalWebhookUrl(localOrigin, config.options.basePath ?? "/paykit");
592+
buildLocalWebhookUrl(localOrigin, config.options.basePath ?? "/paykit");
577593
devLogger.stop();
578-
void localWebhookUrl;
579594
printEnableSummary(devLogger, {
580595
account,
581596
webhookSecret,

packages/paykit/src/cli/utils/device-token.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,22 @@ export function getDeviceConfigPath(): string {
1919

2020
export function getOrCreateDeviceToken(): string {
2121
if (fs.existsSync(DEVICE_CONFIG_PATH)) {
22-
const parsed = JSON.parse(fs.readFileSync(DEVICE_CONFIG_PATH, "utf8")) as Partial<DeviceConfig>;
22+
let parsed: Partial<DeviceConfig> = {};
23+
try {
24+
parsed = JSON.parse(fs.readFileSync(DEVICE_CONFIG_PATH, "utf8")) as Partial<DeviceConfig>;
25+
} catch {
26+
console.warn(`Invalid device token config at ${DEVICE_CONFIG_PATH}, creating a fresh token.`);
27+
}
28+
2329
if (typeof parsed.deviceToken === "string" && parsed.deviceToken.length > 0) {
2430
return parsed.deviceToken;
2531
}
2632
}
2733

2834
const deviceToken = generateDeviceToken();
2935
fs.mkdirSync(path.dirname(DEVICE_CONFIG_PATH), { recursive: true });
30-
fs.writeFileSync(DEVICE_CONFIG_PATH, JSON.stringify({ deviceToken }, null, 2) + "\n");
36+
fs.writeFileSync(DEVICE_CONFIG_PATH, JSON.stringify({ deviceToken }, null, 2) + "\n", {
37+
mode: 0o600,
38+
});
3139
return deviceToken;
3240
}

packages/paykit/src/webhook/webhook.api.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,15 @@ function headersToRecord(headers: Headers): Record<string, string> {
1010
}
1111

1212
function shouldAllowStaleSignatures(headers: Headers): boolean {
13-
return process.env.NODE_ENV !== "production" && headers.get("x-paykit-cloud-replay") === "1";
13+
if (headers.get("x-paykit-cloud-replay") !== "1") {
14+
return false;
15+
}
16+
17+
return (
18+
process.env.PAYKIT_ALLOW_STALE_SIGNATURES === "1" ||
19+
process.env.NODE_ENV === "development" ||
20+
process.env.NODE_ENV === "test"
21+
);
1422
}
1523

1624
/** Applies an incoming provider webhook payload. */

packages/stripe/src/stripe-provider.ts

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,22 @@ function getStripeEnvironment(secretKey: string): string {
189189
return secretKey.startsWith("sk_test_") || secretKey.startsWith("rk_test_") ? "test" : "live";
190190
}
191191

192+
function getStripeDisplayName(account: StripeSdk.Account): string {
193+
return account.settings?.dashboard?.display_name || account.business_profile?.name || account.id;
194+
}
195+
196+
function isStripeResourceMissingError(error: unknown): boolean {
197+
if (!(error instanceof StripeSdk.errors.StripeError)) {
198+
return false;
199+
}
200+
201+
return (
202+
error.type === "StripeInvalidRequestError" &&
203+
error.code === "resource_missing" &&
204+
error.statusCode === 404
205+
);
206+
}
207+
192208
async function retrieveExpandedSubscription(
193209
client: StripeSdk,
194210
providerSubscriptionId: string,
@@ -958,8 +974,7 @@ export function createStripeProvider(client: StripeSdk, options: StripeOptions):
958974

959975
async getTunnelAccount() {
960976
const account = await client.accounts.retrieve();
961-
const displayName =
962-
account.settings?.dashboard?.display_name || account.business_profile?.name || account.id;
977+
const displayName = getStripeDisplayName(account);
963978
return {
964979
displayName,
965980
environment: getStripeEnvironment(options.secretKey),
@@ -980,7 +995,11 @@ export function createStripeProvider(client: StripeSdk, options: StripeOptions):
980995
endpointId: endpoint.id,
981996
webhookSecret: options.webhookSecret,
982997
};
983-
} catch {
998+
} catch (error) {
999+
if (!isStripeResourceMissingError(error)) {
1000+
throw error;
1001+
}
1002+
9841003
// Fall through to create a fresh endpoint when the stored one no longer exists.
9851004
}
9861005
}
@@ -1013,8 +1032,7 @@ export function createStripeProvider(client: StripeSdk, options: StripeOptions):
10131032
const mode = getStripeEnvironment(options.secretKey) === "test" ? "test mode" : "live mode";
10141033
try {
10151034
const account = await client.accounts.retrieve();
1016-
const displayName =
1017-
account.settings?.dashboard?.display_name || account.business_profile?.name || account.id;
1035+
const displayName = getStripeDisplayName(account);
10181036

10191037
let webhookEndpoints: Array<{ url: string; status: string }> = [];
10201038
try {

0 commit comments

Comments
 (0)