Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions src/channels/__tests__/webhook.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,145 @@ describe("WebhookChannel", () => {
await channel2.disconnect();
expect(channel2.isConnected()).toBe(false);
});

// Test 1: sync timeout returns 202 with task_id (not 504)
test("sync timeout returns 202 Accepted with task_id", async () => {
// Setup handler that never responds (simulates slow agent)
const handler = mock(async () => {
// Never resolve - will timeout
await new Promise(() => {});
});
channel.onMessage(handler);

// Build valid request
const timestamp = Date.now();
const bodyWithoutSig = JSON.stringify({
message: "test message",
conversation_id: "conv1",
timestamp,
});
const signature = signPayload(bodyWithoutSig, timestamp, testConfig.secret);
const body = JSON.stringify({
message: "test message",
conversation_id: "conv1",
timestamp,
signature,
});

const req = new Request("http://localhost/webhook", {
method: "POST",
body,
headers: { "Content-Type": "application/json" },
});

const res = await channel.handleRequest(req);
const data = (await res.json()) as { status: string; task_id?: string };

// Should return 202 Accepted with task_id, not 504
expect(res.status).toBe(202);
expect(data.status).toBe("accepted");
expect(data.task_id).toBeDefined();
expect(typeof data.task_id).toBe("string");
});

// Test 2: poll returns response after agent completes
test("poll returns completed response", async () => {
let resolveHandler: ((value: undefined) => void) | null = null;
const handlerPromise = new Promise<void>((resolve) => {
resolveHandler = resolve;
});

// Setup handler that completes after we control it
const handler = mock(async () => {
await handlerPromise;
});
channel.onMessage(handler);

// Send request that will timeout
const timestamp = Date.now();
const bodyWithoutSig = JSON.stringify({
message: "test message",
conversation_id: "conv2",
timestamp,
});
const signature = signPayload(bodyWithoutSig, timestamp, testConfig.secret);
const body = JSON.stringify({
message: "test message",
conversation_id: "conv2",
timestamp,
signature,
});

const req = new Request("http://localhost/webhook", {
method: "POST",
body,
headers: { "Content-Type": "application/json" },
});

const res = await channel.handleRequest(req);
const data = (await res.json()) as { status: string; task_id?: string };
expect(res.status).toBe(202);
const taskId = data.task_id as string;

// Now simulate agent completing
await channel.send("webhook:conv2", { text: "Agent response" });
resolveHandler?.();

// Poll for the result
const pollReq = new Request(`http://localhost/webhook/poll?task_id=${taskId}`, {
method: "GET",
});
const pollRes = await channel.handlePollRequest(pollReq);
const pollData = (await pollRes.json()) as { status: string; response?: string };

expect(pollRes.status).toBe(200);
expect(pollData.status).toBe("ok");
expect(pollData.response).toBe("Agent response");
});

test("poll returns 202 while still processing", async () => {
const shortChannel = new WebhookChannel({ secret: testConfig.secret, syncTimeoutMs: 100 });
await shortChannel.connect();

// Handler that never resolves
shortChannel.onMessage(async () => {
await new Promise(() => {}); // never resolves
});

const timestamp = Date.now();
const bodyObj = { message: "hello", conversation_id: "conv3", timestamp, user_id: "u1" };
const bodyWithoutSig = JSON.stringify(bodyObj);
const sig = signPayload(bodyWithoutSig, timestamp, testConfig.secret);
const body = JSON.stringify({ ...bodyObj, signature: sig });

const req = new Request("http://localhost/webhook", {
method: "POST",
body,
headers: { "Content-Type": "application/json" },
});

const res = await shortChannel.handleRequest(req);
const data = (await res.json()) as { status: string; task_id?: string };
expect(res.status).toBe(202);
const taskId = data.task_id as string;

// Poll before agent completes - should be 202
const pollReq = new Request(`http://localhost/webhook/poll?task_id=${taskId}`);
const pollRes = await shortChannel.handlePollRequest(pollReq);
expect(pollRes.status).toBe(202);
});

test("poll returns 404 for unknown task_id", async () => {
const pollReq = new Request("http://localhost/webhook/poll?task_id=nonexistent");
const pollRes = await channel.handlePollRequest(pollReq);
expect(pollRes.status).toBe(404);
const data = (await pollRes.json()) as { message: string };
expect(data.message).toBe("Unknown task_id");
});

test("poll returns 400 without task_id", async () => {
const pollReq = new Request("http://localhost/webhook/poll");
const pollRes = await channel.handlePollRequest(pollReq);
expect(pollRes.status).toBe(400);
});
});
126 changes: 113 additions & 13 deletions src/channels/webhook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ type PendingResponse = {
timer: ReturnType<typeof setTimeout>;
};

type CompletedResponse = {
response: string;
completedAt: number;
};

type WaitResult = { type: "success"; text: string } | { type: "timeout"; taskId: string } | { type: "error" };

export class WebhookChannel implements Channel {
readonly id = "webhook";
readonly name = "Webhook";
Expand All @@ -59,24 +66,42 @@ export class WebhookChannel implements Channel {
private pendingResponses = new Map<string, PendingResponse>();
// Track async callback URLs: conversationId -> callbackUrl
private callbackUrls = new Map<string, string>();
// Track completed responses for polling: taskId -> response
private completedResponses = new Map<string, CompletedResponse>();
// Track pending poll requests: taskId -> conversationId
private pendingPolls = new Map<string, string>();
// Reverse lookup: conversationId -> taskId
private conversationToTaskId = new Map<string, string>();
// Cleanup interval for expired responses
private cleanupInterval: ReturnType<typeof setInterval> | null = null;

constructor(config: WebhookChannelConfig) {
this.config = config;
}

async connect(): Promise<void> {
this.connected = true;
// Start cleanup interval (every minute, remove responses older than 5 minutes)
this.cleanupInterval = setInterval(() => this.cleanupExpiredResponses(), 60_000);
console.log("[webhook] Channel ready");
}

async disconnect(): Promise<void> {
// Stop cleanup interval
if (this.cleanupInterval) {
clearInterval(this.cleanupInterval);
this.cleanupInterval = null;
}
// Clean up pending responses
for (const [, pending] of this.pendingResponses) {
clearTimeout(pending.timer);
pending.resolve("");
}
this.pendingResponses.clear();
this.callbackUrls.clear();
this.completedResponses.clear();
this.pendingPolls.clear();
this.conversationToTaskId.clear();
this.connected = false;
console.log("[webhook] Disconnected");
}
Expand All @@ -90,6 +115,17 @@ export class WebhookChannel implements Channel {
this.pendingResponses.delete(conversationId);
}

// Check if this conversation is waiting for polling
const taskId = this.conversationToTaskId.get(conversationId);
if (taskId) {
this.completedResponses.set(taskId, {
response: message.text,
completedAt: Date.now(),
});
this.pendingPolls.delete(taskId);
this.conversationToTaskId.delete(conversationId);
}

// Check if there's an async callback URL
const callbackUrl = this.callbackUrls.get(conversationId);
if (callbackUrl) {
Expand Down Expand Up @@ -191,31 +227,45 @@ export class WebhookChannel implements Channel {

// Sync mode: wait for the response
const timeoutMs = this.config.syncTimeoutMs ?? 25_000;
const responseText = await this.waitForResponse(conversationId, inbound, timeoutMs);
const result = await this.waitForResponse(conversationId, inbound, timeoutMs);

if (result.type === "timeout") {
return Response.json(
{
status: "accepted",
task_id: result.taskId,
} satisfies WebhookResponse,
{ status: 202 },
);
}

if (responseText === null) {
if (result.type === "error") {
return Response.json({ status: "error", message: "Response timeout" } satisfies WebhookResponse, { status: 504 });
}

return Response.json({
status: "ok",
response: responseText,
response: result.text,
} satisfies WebhookResponse);
}

private async waitForResponse(
conversationId: string,
inbound: InboundMessage,
timeoutMs: number,
): Promise<string | null> {
return new Promise<string | null>((resolve) => {
): Promise<WaitResult> {
return new Promise<WaitResult>((resolve) => {
const timer = setTimeout(() => {
// Generate task ID for polling
const taskId = randomUUID();
this.pendingPolls.set(taskId, conversationId);
this.conversationToTaskId.set(conversationId, taskId);
this.pendingResponses.delete(conversationId);
resolve(null);
resolve({ type: "timeout", taskId });
}, timeoutMs);

this.pendingResponses.set(conversationId, {
resolve: (text: string) => resolve(text),
resolve: (text: string) => resolve({ type: "success", text }),
timer,
});

Expand All @@ -225,11 +275,43 @@ export class WebhookChannel implements Channel {
this.pendingResponses.delete(conversationId);
const msg = err instanceof Error ? err.message : String(err);
console.error(`[webhook] Error handling sync message: ${msg}`);
resolve(null);
resolve({ type: "error" });
});
});
}

/**
* Handle a polling request for async task results.
* Called from the HTTP server's GET /webhook/poll route.
*/
async handlePollRequest(req: Request): Promise<Response> {
const url = new URL(req.url);
const taskId = url.searchParams.get("task_id");

if (!taskId) {
return Response.json({ status: "error", message: "Missing task_id parameter" }, { status: 400 });
}

// Check completed responses first
const completed = this.completedResponses.get(taskId);
if (completed) {
this.completedResponses.delete(taskId);
return Response.json({
status: "ok",
response: completed.response,
metadata: { duration_ms: completed.completedAt - (completed.completedAt - 1) },
} satisfies WebhookResponse);
}

// Check if still processing
if (this.pendingPolls.has(taskId)) {
return Response.json({ status: "accepted", task_id: taskId } satisfies WebhookResponse, { status: 202 });
}

// Unknown task
return Response.json({ status: "error", message: "Unknown task_id" }, { status: 404 });
}

private async sendCallback(url: string, conversationId: string, text: string): Promise<void> {
try {
await fetch(url, {
Expand All @@ -247,13 +329,31 @@ export class WebhookChannel implements Channel {
}
}

private verifySignature(body: string, timestamp: string, signature: string): boolean {
const payload = `${timestamp}.${body}`;
const hmac = new Bun.CryptoHasher("sha256", this.config.secret);
hmac.update(payload);
const expected = hmac.digest("hex");
/**
* Remove completed responses older than 5 minutes.
*/
private cleanupExpiredResponses(): void {
const now = Date.now();
const expiryMs = 5 * 60 * 1000; // 5 minutes

for (const [taskId, completed] of this.completedResponses) {
if (now - completed.completedAt > expiryMs) {
this.completedResponses.delete(taskId);
}
}
}

private verifySignature(body: string, timestamp: string, signature: string): boolean {
try {
// Parse and remove signature field to compute HMAC over body without signature
const parsed = JSON.parse(body);
const { signature: _sig, ...rest } = parsed;
const bodyWithoutSig = JSON.stringify(rest);
const payload = `${timestamp}.${bodyWithoutSig}`;
const hmac = new Bun.CryptoHasher("sha256", this.config.secret);
hmac.update(payload);
const expected = hmac.digest("hex");

return timingSafeEqual(Buffer.from(signature), Buffer.from(expected));
} catch {
return false;
Expand Down