-
Notifications
You must be signed in to change notification settings - Fork 17
feat(tfe-job-agent): webhook-based TFC job agent #847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a97feba
535ca61
945c6a8
c690f25
75fa9b9
80ee5fe
484e8c7
b6fdc9b
6f3c3ea
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| import { describe, expect, it } from "vitest"; | ||
| import { JobStatus } from "@ctrlplane/validators/jobs"; | ||
|
|
||
| import { mapTriggerToStatus } from "../run_notification.js"; | ||
|
|
||
| describe("mapTriggerToStatus", () => { | ||
| it.each([ | ||
| ["run:created", JobStatus.Pending], | ||
| ["run:planning", JobStatus.InProgress], | ||
| ["run:needs_attention", JobStatus.ActionRequired], | ||
| ["run:applying", JobStatus.InProgress], | ||
| ["run:completed", JobStatus.Successful], | ||
| ["run:errored", JobStatus.Failure], | ||
| ])("maps trigger %s to %s", (trigger, expected) => { | ||
| expect(mapTriggerToStatus(trigger)).toBe(expected); | ||
| }); | ||
|
|
||
| it("returns null for unknown triggers", () => { | ||
| expect(mapTriggerToStatus("run:unknown")).toBeNull(); | ||
| expect(mapTriggerToStatus("")).toBeNull(); | ||
| expect(mapTriggerToStatus("something:else")).toBeNull(); | ||
| }); | ||
| }); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,164 @@ | ||
| import crypto from "node:crypto"; | ||
| import type { Request, Response } from "express"; | ||
| import { describe, expect, it, vi, beforeEach } from "vitest"; | ||
|
|
||
| // Mock config before importing the module under test | ||
| vi.mock("@/config.js", () => ({ | ||
| env: { TFE_WEBHOOK_SECRET: "test-secret-123" }, | ||
| })); | ||
|
|
||
| vi.mock("@ctrlplane/logger", () => ({ | ||
| logger: { error: vi.fn(), info: vi.fn(), warn: vi.fn() }, | ||
| })); | ||
|
|
||
| const mockHandleRunNotification = vi.fn().mockResolvedValue(undefined); | ||
| vi.mock("../run_notification.js", () => ({ | ||
| handleRunNotification: (...args: unknown[]) => | ||
| mockHandleRunNotification(...args), | ||
| })); | ||
|
|
||
| import { createTfeRouter } from "../index.js"; | ||
|
|
||
| function signPayload(body: object, secret: string): string { | ||
| const json = JSON.stringify(body); | ||
| return crypto.createHmac("sha512", secret).update(json).digest("hex"); | ||
| } | ||
|
|
||
| function makeMockRes() { | ||
| const res = { statusCode: 200, _json: null as unknown }; | ||
| return Object.assign(res, { | ||
| status: (code: number) => { | ||
| res.statusCode = code; | ||
| return res; | ||
| }, | ||
| json: (data: unknown) => { | ||
| res._json = data; | ||
| return res; | ||
| }, | ||
| }) as typeof res & Response; | ||
| } | ||
|
|
||
| function getWebhookHandler() { | ||
| const router = createTfeRouter(); | ||
| // eslint-disable-next-line @typescript-eslint/no-unsafe-call | ||
| const layer = (router as any).stack.find( | ||
| (l: any) => l.route?.path === "/webhook" && l.route?.methods?.post, | ||
| ); | ||
| if (!layer) throw new Error("POST /webhook route not found on router"); | ||
| // eslint-disable-next-line @typescript-eslint/no-unsafe-call | ||
| const handlers = layer.route.stack.filter( | ||
| (s: any) => s.method === "post", | ||
| ); | ||
| return handlers[handlers.length - 1].handle as ( | ||
| req: Request, | ||
| res: Response, | ||
| ) => Promise<void>; | ||
| } | ||
|
|
||
| describe("TFE webhook router", () => { | ||
| let handler: (req: Request, res: Response) => Promise<void>; | ||
|
|
||
| beforeEach(() => { | ||
| handler = getWebhookHandler(); | ||
| vi.clearAllMocks(); | ||
| }); | ||
|
|
||
| const payload = { | ||
| payload_version: 1, | ||
| notification_configuration_id: "nc-test", | ||
| run_url: "https://app.terraform.io/runs/run-abc", | ||
| run_id: "run-abc", | ||
| run_message: "test", | ||
| run_created_at: "2024-01-01T00:00:00Z", | ||
| run_created_by: "user", | ||
| workspace_id: "ws-test", | ||
| workspace_name: "test-ws", | ||
| organization_name: "org", | ||
| notifications: [ | ||
| { | ||
| message: "Applied", | ||
| trigger: "run:completed", | ||
| run_status: "applied", | ||
| run_updated_at: "2024-01-01T00:01:00Z", | ||
| run_updated_by: "user", | ||
| }, | ||
| ], | ||
| }; | ||
|
|
||
| it("returns 200 and calls handler with valid signature", async () => { | ||
| const signature = signPayload(payload, "test-secret-123"); | ||
| const req = { | ||
| headers: { "x-tfe-notification-signature": signature }, | ||
| body: payload, | ||
| } as unknown as Request; | ||
| const res = makeMockRes(); | ||
|
|
||
| await handler(req, res); | ||
|
|
||
| expect(res.statusCode).toBe(200); | ||
| expect((res as any)._json).toEqual({ message: "OK" }); | ||
| expect(mockHandleRunNotification).toHaveBeenCalledOnce(); | ||
| expect(mockHandleRunNotification).toHaveBeenCalledWith(payload); | ||
| }); | ||
|
|
||
| it("returns 401 with missing signature header", async () => { | ||
| const req = { | ||
| headers: {}, | ||
| body: payload, | ||
| } as unknown as Request; | ||
| const res = makeMockRes(); | ||
|
|
||
| await handler(req, res); | ||
|
|
||
| expect(res.statusCode).toBe(401); | ||
| expect((res as any)._json).toEqual({ message: "Unauthorized" }); | ||
| expect(mockHandleRunNotification).not.toHaveBeenCalled(); | ||
| }); | ||
|
|
||
| it("returns 401 with wrong signature", async () => { | ||
| const req = { | ||
| headers: { | ||
| "x-tfe-notification-signature": "deadbeef".repeat(16), | ||
| }, | ||
| body: payload, | ||
| } as unknown as Request; | ||
| const res = makeMockRes(); | ||
|
|
||
| await handler(req, res); | ||
|
|
||
| expect(res.statusCode).toBe(401); | ||
| expect(mockHandleRunNotification).not.toHaveBeenCalled(); | ||
| }); | ||
|
|
||
| it("returns 200 without calling handler when notifications is empty", async () => { | ||
| const emptyPayload = { ...payload, notifications: [] }; | ||
| const signature = signPayload(emptyPayload, "test-secret-123"); | ||
| const req = { | ||
| headers: { "x-tfe-notification-signature": signature }, | ||
| body: emptyPayload, | ||
| } as unknown as Request; | ||
| const res = makeMockRes(); | ||
|
|
||
| await handler(req, res); | ||
|
|
||
| expect(res.statusCode).toBe(200); | ||
| expect(mockHandleRunNotification).not.toHaveBeenCalled(); | ||
| }); | ||
|
|
||
| it("returns 500 when handler throws", async () => { | ||
| mockHandleRunNotification.mockRejectedValueOnce( | ||
| new Error("db connection lost"), | ||
| ); | ||
| const signature = signPayload(payload, "test-secret-123"); | ||
| const req = { | ||
| headers: { "x-tfe-notification-signature": signature }, | ||
| body: payload, | ||
| } as unknown as Request; | ||
| const res = makeMockRes(); | ||
|
|
||
| await handler(req, res); | ||
|
|
||
| expect(res.statusCode).toBe(500); | ||
| expect((res as any)._json).toEqual({ message: "db connection lost" }); | ||
| }); | ||
| }); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| import type { Request, Response } from "express"; | ||
| import crypto from "node:crypto"; | ||
| import { env } from "@/config.js"; | ||
| import { Router } from "express"; | ||
|
|
||
| import { logger } from "@ctrlplane/logger"; | ||
|
|
||
| import { handleRunNotification } from "./run_notification.js"; | ||
|
|
||
| export const createTfeRouter = (): Router => | ||
| Router().post("/webhook", handleWebhookRequest); | ||
|
|
||
| const verifySignature = (req: Request): boolean => { | ||
| const secret = env.TFE_WEBHOOK_SECRET; | ||
| if (secret == null) return false; | ||
|
|
||
| const signature = req.headers["x-tfe-notification-signature"]?.toString(); | ||
| if (signature == null) return false; | ||
|
|
||
| const body = JSON.stringify(req.body); | ||
| const expected = crypto | ||
| .createHmac("sha512", secret) | ||
| .update(body) | ||
| .digest("hex"); | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| const sigBuf = Buffer.from(signature, "hex"); | ||
| const expBuf = Buffer.from(expected, "hex"); | ||
| if (sigBuf.length !== expBuf.length) return false; | ||
| return crypto.timingSafeEqual(sigBuf, expBuf); | ||
| }; | ||
|
|
||
| const handleWebhookRequest = async (req: Request, res: Response) => { | ||
| try { | ||
| if (!verifySignature(req)) { | ||
| res.status(401).json({ message: "Unauthorized" }); | ||
| return; | ||
| } | ||
|
|
||
| const payload = req.body; | ||
| if (payload.notifications != null && payload.notifications.length > 0) | ||
| await handleRunNotification(payload); | ||
|
|
||
| res.status(200).json({ message: "OK" }); | ||
| } catch (error: unknown) { | ||
| const message = error instanceof Error ? error.message : String(error); | ||
| logger.error(message); | ||
| res.status(500).json({ message }); | ||
| } | ||
| }; | ||
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,117 @@ | ||||||||||
| import { eq, sql, takeFirstOrNull } from "@ctrlplane/db"; | ||||||||||
| import { db } from "@ctrlplane/db/client"; | ||||||||||
| import { enqueueAllReleaseTargetsDesiredVersion } from "@ctrlplane/db/reconcilers"; | ||||||||||
| import * as schema from "@ctrlplane/db/schema"; | ||||||||||
| import { logger } from "@ctrlplane/logger"; | ||||||||||
| import { ReservedMetadataKey } from "@ctrlplane/validators/conditions"; | ||||||||||
| import { exitedStatus, JobStatus } from "@ctrlplane/validators/jobs"; | ||||||||||
|
|
||||||||||
| /** | ||||||||||
| * TFC notification trigger → ctrlplane job status. | ||||||||||
| * https://developer.hashicorp.com/terraform/cloud-docs/workspaces/settings/notifications#notification-triggers | ||||||||||
| */ | ||||||||||
| const triggerStatusMap: Record<string, JobStatus> = { | ||||||||||
| "run:created": JobStatus.Pending, | ||||||||||
| "run:planning": JobStatus.InProgress, | ||||||||||
| "run:needs_attention": JobStatus.ActionRequired, | ||||||||||
| "run:applying": JobStatus.InProgress, | ||||||||||
| "run:completed": JobStatus.Successful, | ||||||||||
| "run:errored": JobStatus.Failure, | ||||||||||
| }; | ||||||||||
|
|
||||||||||
| export const mapTriggerToStatus = (trigger: string): JobStatus | null => | ||||||||||
| triggerStatusMap[trigger] ?? null; | ||||||||||
|
|
||||||||||
| const uuidRegex = | ||||||||||
| /\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b/; | ||||||||||
|
|
||||||||||
| /** | ||||||||||
| * Extract the ctrlplane job ID from the TFC run message. | ||||||||||
| * The dispatcher sets: "Triggered by ctrlplane job <uuid>" | ||||||||||
| */ | ||||||||||
| const extractJobId = (runMessage: string): string | null => { | ||||||||||
| const match = uuidRegex.exec(runMessage); | ||||||||||
| return match ? match[0] : null; | ||||||||||
| }; | ||||||||||
|
|
||||||||||
| export const handleRunNotification = async (payload: { | ||||||||||
| run_url: string; | ||||||||||
| run_id: string; | ||||||||||
| run_message: string; | ||||||||||
| workspace_name: string; | ||||||||||
| organization_name: string; | ||||||||||
| notifications: Array<{ message: string; trigger: string }>; | ||||||||||
| }) => { | ||||||||||
| if (payload.notifications.length === 0) return; | ||||||||||
|
|
||||||||||
| const notification = payload.notifications[0]!; | ||||||||||
| const status = mapTriggerToStatus(notification.trigger); | ||||||||||
| if (status == null) { | ||||||||||
| logger.warn("Unknown TFC notification trigger, ignoring", { | ||||||||||
| trigger: notification.trigger, | ||||||||||
| }); | ||||||||||
| return; | ||||||||||
| } | ||||||||||
|
|
||||||||||
| const jobId = extractJobId(payload.run_message); | ||||||||||
| if (jobId == null) return; | ||||||||||
|
|
||||||||||
| const now = new Date(); | ||||||||||
| const isCompleted = exitedStatus.includes(status); | ||||||||||
| const isInProgress = status === JobStatus.InProgress; | ||||||||||
|
|
||||||||||
| const [updated] = await db | ||||||||||
| .update(schema.job) | ||||||||||
| .set({ | ||||||||||
| externalId: payload.run_id, | ||||||||||
| status, | ||||||||||
| updatedAt: now, | ||||||||||
| message: notification.message, | ||||||||||
| ...(isInProgress | ||||||||||
| ? { startedAt: sql`COALESCE(${schema.job.startedAt}, ${now})` } | ||||||||||
| : {}), | ||||||||||
| ...(isCompleted ? { completedAt: now } : {}), | ||||||||||
| }) | ||||||||||
| .where(eq(schema.job.id, jobId)) | ||||||||||
| .returning(); | ||||||||||
|
|
||||||||||
| if (updated == null) return; | ||||||||||
|
|
||||||||||
| // Derive workspace URL from run_url (works for both TFC and TFE) | ||||||||||
| const runUrlParts = payload.run_url.split("/runs/"); | ||||||||||
| const workspaceUrl = runUrlParts[0] ?? payload.run_url; | ||||||||||
| const links = JSON.stringify({ | ||||||||||
| Run: payload.run_url, | ||||||||||
| Workspace: workspaceUrl, | ||||||||||
| }); | ||||||||||
| const metadataEntries = [ | ||||||||||
| { jobId, key: String(ReservedMetadataKey.Links), value: links }, | ||||||||||
| { jobId, key: "run_url", value: payload.run_url }, | ||||||||||
| ]; | ||||||||||
|
|
||||||||||
| for (const entry of metadataEntries) | ||||||||||
| await db | ||||||||||
| .insert(schema.jobMetadata) | ||||||||||
| .values(entry) | ||||||||||
| .onConflictDoUpdate({ | ||||||||||
| target: [schema.jobMetadata.key, schema.jobMetadata.jobId], | ||||||||||
| set: { value: entry.value }, | ||||||||||
| }); | ||||||||||
|
|
||||||||||
| const result = await db | ||||||||||
| .select({ workspaceId: schema.deployment.workspaceId }) | ||||||||||
| .from(schema.releaseJob) | ||||||||||
| .innerJoin( | ||||||||||
| schema.release, | ||||||||||
| eq(schema.releaseJob.releaseId, schema.release.id), | ||||||||||
| ) | ||||||||||
| .innerJoin( | ||||||||||
| schema.deployment, | ||||||||||
| eq(schema.release.deploymentId, schema.deployment.id), | ||||||||||
| ) | ||||||||||
| .where(eq(schema.releaseJob.jobId, jobId)) | ||||||||||
| .then(takeFirstOrNull); | ||||||||||
|
|
||||||||||
| if (result?.workspaceId != null) | ||||||||||
| enqueueAllReleaseTargetsDesiredVersion(db, result.workspaceId); | ||||||||||
|
Comment on lines
+115
to
+116
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: cat -n apps/api/src/routes/tfe/run_notification.ts | head -150Repository: ctrlplanedev/ctrlplane Length of output: 4556 🏁 Script executed: rg "enqueueAllReleaseTargetsDesiredVersion" --type ts -B 5 -A 10Repository: ctrlplanedev/ctrlplane Length of output: 19957 🏁 Script executed: fd "version-manager" -e ts -o | head -5Repository: ctrlplanedev/ctrlplane Length of output: 182 🏁 Script executed: sed -n '37,115p' apps/api/src/routes/tfe/run_notification.tsRepository: ctrlplanedev/ctrlplane Length of output: 2392 Await the desired-version enqueue. This promise is currently fire-and-forget, so a queue write failure will result in an unhandled rejection. Since the handler is async, Suggested fix if (result?.workspaceId != null)
- enqueueAllReleaseTargetsDesiredVersion(db, result.workspaceId);
+ await enqueueAllReleaseTargetsDesiredVersion(db, result.workspaceId);📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is intentional — it's an enqueue operation (write to a queue table) that's designed to be best-effort. If it fails, the reconciler will pick it up on the next cycle anyway. Adding await here would make the webhook response wait on the enqueue write, and if it throws, the webhook would return 500 to TFC causing unnecessary retries for a non-critical side effect.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That said, there's still a risk of an unhandled promise rejection if the enqueue fails, which can crash or warn the Node.js process even though the behavior is intentional. The idiomatic fix for intentional fire-and-forget is to explicitly mark it with if (result?.workspaceId != null)
void enqueueAllReleaseTargetsDesiredVersion(db, result.workspaceId).catch(
(err) => logger.warn("Failed to enqueue desired version (best-effort)", { err }),
);This way:
✏️ Learnings added
🧠 Learnings used |
||||||||||
| }; | ||||||||||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: ctrlplanedev/ctrlplane
Length of output: 88
🏁 Script executed:
Repository: ctrlplanedev/ctrlplane
Length of output: 815
🏁 Script executed:
Repository: ctrlplanedev/ctrlplane
Length of output: 98
🏁 Script executed:
Repository: ctrlplanedev/ctrlplane
Length of output: 80
🏁 Script executed:
# Read the TFE route file to see how the secret is validated cat -n apps/api/src/routes/tfe/index.tsRepository: ctrlplanedev/ctrlplane
Length of output: 1919
🏁 Script executed:
# Read .env.compose to see how TFE_WEBHOOK_SECRET is configured cat -n ./.env.composeRepository: ctrlplanedev/ctrlplane
Length of output: 1322
Normalize blank webhook secrets to
undefined.The schema accepts
TFE_WEBHOOK_SECRET=as a valid empty string, and the router only guards againstnull/undefined(line 15 of routes/tfe/index.ts). A blank env entry becomes a valid HMAC key instead of disabling the feature, allowing anyone to compute a valid signature using an empty secret.Suggested fix
📝 Committable suggestion
🤖 Prompt for AI Agents
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's also the exact same pattern as GITHUB_WEBHOOK_SECRET on line 34. Fixing one without the other would be inconsistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zacharyblasczyk, that's a fair point — the same empty-string concern applies toGITHUB_WEBHOOK_SECRETas well. For true consistency, the fix should be applied to both:That said, if the project's convention is to leave webhook secrets as plain
z.string().optional()and rely on runtime guards in the route handlers instead, I understand treating both consistently with the existing pattern. The risk is low as long as developers know not to set these variables to empty strings in production.( ͡° ͜ʖ ͡°) 🐇