From 6aaa7ebe84d734bc3b57386a9aa840812d7cb931 Mon Sep 17 00:00:00 2001 From: Dan Lynch Date: Thu, 21 May 2026 02:26:18 +0000 Subject: [PATCH] feat(worker): add X-Entity-Id header to job dispatch pipeline - Add entity_id to JobRow interface (worker reads from app_jobs.jobs) - Add entityId to RequestOptions and emit X-Entity-Id header on dispatch - Add entityId to FunctionContext.job (fn-types) - Read X-Entity-Id header in fn-runtime context and server - Add entityId to JobContext in fn-app, echo on responses, forward in callbacks This completes the entity_id promotion from DB column to HTTP header, making entity context available to cloud functions for billing/metering. --- job/worker/src/index.ts | 2 ++ job/worker/src/req.ts | 4 +++- packages/fn-app/src/index.ts | 10 +++++++++- packages/fn-runtime/src/context.ts | 5 +++-- packages/fn-runtime/src/server.ts | 1 + packages/fn-types/src/runtime.ts | 1 + 6 files changed, 19 insertions(+), 4 deletions(-) diff --git a/job/worker/src/index.ts b/job/worker/src/index.ts index 7d6438b..f9c076c 100644 --- a/job/worker/src/index.ts +++ b/job/worker/src/index.ts @@ -12,6 +12,7 @@ export interface JobRow { payload?: unknown; database_id?: string; actor_id?: string; + entity_id?: string; } const log = new Logger('jobs:worker'); @@ -119,6 +120,7 @@ export default class Worker { body: payload, databaseId: job.database_id, actorId: job.actor_id, + entityId: job.entity_id, workerId: this.workerId, jobId: job.id }); diff --git a/job/worker/src/req.ts b/job/worker/src/req.ts index 27b748a..c56be79 100644 --- a/job/worker/src/req.ts +++ b/job/worker/src/req.ts @@ -33,13 +33,14 @@ interface RequestOptions { body: unknown; databaseId?: string; actorId?: string; + entityId?: string; workerId: string; jobId: string | number; } const request = ( fn: string, - { body, databaseId, actorId, workerId, jobId }: RequestOptions + { body, databaseId, actorId, entityId, workerId, jobId }: RequestOptions ) => { const url = getFunctionUrl(fn); log.info(`dispatching job`, { @@ -77,6 +78,7 @@ const request = ( 'X-Job-Id': String(jobId), ...(databaseId ? { 'X-Database-Id': databaseId } : {}), ...(actorId ? { 'X-Actor-Id': actorId } : {}), + ...(entityId ? { 'X-Entity-Id': entityId } : {}), // async HTTP completion callback 'X-Callback-Url': completeUrl diff --git a/packages/fn-app/src/index.ts b/packages/fn-app/src/index.ts index afaae5e..6cf14d3 100644 --- a/packages/fn-app/src/index.ts +++ b/packages/fn-app/src/index.ts @@ -14,6 +14,7 @@ type JobContext = { jobId: string | undefined; databaseId: string | undefined; actorId: string | undefined; + entityId: string | undefined; }; function getHeaders(req: any) { @@ -22,6 +23,7 @@ function getHeaders(req: any) { 'x-job-id': req.get('X-Job-Id'), 'x-database-id': req.get('X-Database-Id'), 'x-actor-id': req.get('X-Actor-Id'), + 'x-entity-id': req.get('X-Entity-Id'), 'x-callback-url': req.get('X-Callback-Url') }; } @@ -105,6 +107,10 @@ const sendJobCallback = async ( headers['X-Actor-Id'] = ctx.actorId; } + if (ctx.entityId) { + headers['X-Entity-Id'] = ctx.entityId; + } + const body: Record = { status }; @@ -175,6 +181,7 @@ const createJobApp = () => { 'X-Worker-Id': req.get('X-Worker-Id'), 'X-Database-Id': req.get('X-Database-Id'), 'X-Actor-Id': req.get('X-Actor-Id'), + 'X-Entity-Id': req.get('X-Entity-Id'), 'X-Job-Id': req.get('X-Job-Id') }); next(); @@ -187,7 +194,8 @@ const createJobApp = () => { workerId: req.get('X-Worker-Id'), jobId: req.get('X-Job-Id'), databaseId: req.get('X-Database-Id'), - actorId: req.get('X-Actor-Id') + actorId: req.get('X-Actor-Id'), + entityId: req.get('X-Entity-Id') }; // Store on res.locals so the error middleware can also mark callbacks as sent. diff --git a/packages/fn-runtime/src/context.ts b/packages/fn-runtime/src/context.ts index cbf5b97..a1c66ac 100644 --- a/packages/fn-runtime/src/context.ts +++ b/packages/fn-runtime/src/context.ts @@ -5,6 +5,7 @@ import { createClients } from './graphql'; type RequestHeaders = { databaseId?: string; actorId?: string; + entityId?: string; workerId?: string; jobId?: string; }; @@ -16,7 +17,7 @@ export const buildContext = ( const env = process.env as Record; const log = createLogger(options.name || 'fn-runtime'); - const { databaseId, actorId, workerId, jobId } = headers; + const { databaseId, actorId, entityId, workerId, jobId } = headers; // Create GraphQL clients if databaseId is available and GRAPHQL_URL is set let client: FunctionContext['client']; @@ -46,7 +47,7 @@ export const buildContext = ( } return { - job: { jobId, workerId, databaseId, actorId }, + job: { jobId, workerId, databaseId, actorId, entityId }, client, meta, log, diff --git a/packages/fn-runtime/src/server.ts b/packages/fn-runtime/src/server.ts index dd14ba3..3e4a436 100644 --- a/packages/fn-runtime/src/server.ts +++ b/packages/fn-runtime/src/server.ts @@ -14,6 +14,7 @@ export const createFunctionServer = ( { databaseId: req.get('X-Database-Id') || req.get('x-database-id') || process.env.DEFAULT_DATABASE_ID, actorId: req.get('X-Actor-Id') || req.get('x-actor-id'), + entityId: req.get('X-Entity-Id') || req.get('x-entity-id'), workerId: req.get('X-Worker-Id') || req.get('x-worker-id'), jobId: req.get('X-Job-Id') || req.get('x-job-id') }, diff --git a/packages/fn-types/src/runtime.ts b/packages/fn-types/src/runtime.ts index db0c346..1cb2a02 100644 --- a/packages/fn-types/src/runtime.ts +++ b/packages/fn-types/src/runtime.ts @@ -17,6 +17,7 @@ export type FunctionContext = { workerId?: string; databaseId?: string; actorId?: string; + entityId?: string; }; client: GraphQLClient; meta: GraphQLClient;