diff --git a/packages/components/nodes/agentflow/Start/Start.ts b/packages/components/nodes/agentflow/Start/Start.ts index 3a33e430789..a384c01b329 100644 --- a/packages/components/nodes/agentflow/Start/Start.ts +++ b/packages/components/nodes/agentflow/Start/Start.ts @@ -213,6 +213,29 @@ class Start_Agentflow implements INode { startInputType: 'webhookTrigger' } }, + { + label: 'Callback URL', + name: 'callbackUrl', + type: 'string', + description: + 'If set, Flowise returns 202 immediately and POSTs the result to this URL when the flow finishes. Useful for platforms with strict HTTP timeout windows (GitHub, Slack, Zapier).', + placeholder: 'https://example.com/flowise-callback', + optional: true, + show: { + startInputType: 'webhookTrigger' + } + }, + { + label: 'Callback Secret', + name: 'callbackSecret', + type: 'string', + description: + 'If set, outgoing callback POSTs are signed with HMAC-SHA256. The signature is sent as X-Flowise-Signature: sha256= so your callback endpoint can verify the request came from Flowise.', + optional: true, + show: { + startInputType: 'webhookTrigger' + } + }, { label: 'Expected Query Parameters', name: 'webhookQueryParams', diff --git a/packages/server/src/controllers/webhook/index.test.ts b/packages/server/src/controllers/webhook/index.test.ts index f08235c1f1d..017a689379d 100644 --- a/packages/server/src/controllers/webhook/index.test.ts +++ b/packages/server/src/controllers/webhook/index.test.ts @@ -3,6 +3,7 @@ import { Request, Response, NextFunction } from 'express' const mockValidateWebhookChatflow = jest.fn() const mockBuildChatflow = jest.fn() +const mockDispatchCallback = jest.fn() jest.mock('../../services/webhook', () => ({ __esModule: true, @@ -19,6 +20,10 @@ jest.mock('../../utils/rateLimit', () => ({ }) } })) +jest.mock('../../utils/callbackDispatcher', () => ({ + dispatchCallback: mockDispatchCallback +})) +jest.mock('uuid', () => ({ v4: () => 'generated-uuid' })) import webhookController from './index' @@ -36,6 +41,7 @@ const mockReq = (overrides: Partial = {}): Request => const mockRes = (): Response => { const res = {} as Response res.json = jest.fn().mockReturnValue(res) + res.status = jest.fn().mockReturnValue(res) return res } @@ -44,6 +50,8 @@ const mockNext = (): NextFunction => jest.fn() describe('createWebhook', () => { beforeEach(() => { jest.clearAllMocks() + // Default: no callback config on Start node + mockValidateWebhookChatflow.mockResolvedValue({}) }) it('calls next with PRECONDITION_FAILED when id is missing', async () => { @@ -70,7 +78,6 @@ describe('createWebhook', () => { }) it('wraps req.body under webhook key before calling buildChatflow', async () => { - mockValidateWebhookChatflow.mockResolvedValue(undefined) mockBuildChatflow.mockResolvedValue({}) const originalBody = { foo: 'bar' } @@ -94,7 +101,6 @@ describe('createWebhook', () => { }) it('builds namespaced webhook payload with body, headers, and query', async () => { - mockValidateWebhookChatflow.mockResolvedValue(undefined) mockBuildChatflow.mockResolvedValue({}) const req = mockReq({ @@ -121,7 +127,6 @@ describe('createWebhook', () => { }) it('returns buildChatflow result as JSON response', async () => { - mockValidateWebhookChatflow.mockResolvedValue(undefined) const apiResult = { output: 'ok' } mockBuildChatflow.mockResolvedValue(apiResult) @@ -136,7 +141,6 @@ describe('createWebhook', () => { }) it('calls next with error when buildChatflow rejects', async () => { - mockValidateWebhookChatflow.mockResolvedValue(undefined) const error = new Error('execution failed') mockBuildChatflow.mockRejectedValue(error) @@ -150,7 +154,6 @@ describe('createWebhook', () => { }) it('passes the original body to validateWebhookChatflow before mutation', async () => { - mockValidateWebhookChatflow.mockResolvedValue(undefined) mockBuildChatflow.mockResolvedValue({}) const req = mockReq({ body: { foo: 'bar' } }) @@ -166,7 +169,226 @@ describe('createWebhook', () => { 'POST', expect.any(Object), expect.any(Object), - undefined // rawBody — not set on mock request + undefined, // rawBody — not set on mock request + undefined // options — not a resume call + ) + }) + + it('passes skipFieldValidation option when body contains humanInput (resume call)', async () => { + mockBuildChatflow.mockResolvedValue({}) + + const req = mockReq({ body: { chatId: 'abc', humanInput: { type: 'proceed', startNodeId: 'humanInputAgentflow_0' } } }) + const res = mockRes() + const next = mockNext() + + await webhookController.createWebhook(req, res, next) + + expect(mockValidateWebhookChatflow).toHaveBeenCalledWith( + 'chatflow-123', + undefined, + expect.objectContaining({ humanInput: expect.any(Object) }), + 'POST', + expect.any(Object), + expect.any(Object), + undefined, + { skipFieldValidation: true } + ) + }) + + it('includes humanInput and chatId at top level of req.body on resume', async () => { + mockBuildChatflow.mockResolvedValue({}) + + const humanInput = { type: 'proceed', startNodeId: 'humanInputAgentflow_0' } + const req = mockReq({ body: { chatId: 'abc123', humanInput } }) + const res = mockRes() + const next = mockNext() + + await webhookController.createWebhook(req, res, next) + + expect(mockBuildChatflow).toHaveBeenCalledWith( + expect.objectContaining({ + body: expect.objectContaining({ + humanInput, + chatId: 'abc123', + webhook: expect.any(Object) + }) + }) + ) + }) + + // --- Async callback (FLOWISE-367) --- + + it('returns 202 immediately when X-Callback-Url header is present', async () => { + mockBuildChatflow.mockResolvedValue({ text: 'done' }) + mockDispatchCallback.mockResolvedValue(undefined) + jest.spyOn(global, 'setImmediate').mockImplementation((fn: any) => fn()) + + const req = mockReq({ headers: { 'content-type': 'application/json', 'x-callback-url': 'https://cb.example.com' } as any }) + const res = mockRes() + const next = mockNext() + + await webhookController.createWebhook(req, res, next) + + expect(res.status).toHaveBeenCalledWith(202) + expect(res.json).toHaveBeenCalledWith({ chatId: expect.any(String), status: 'PROCESSING' }) + expect(mockBuildChatflow).toHaveBeenCalled() + }) + + it('returns 202 with chatId from body when already provided', async () => { + mockBuildChatflow.mockResolvedValue({ text: 'done' }) + mockDispatchCallback.mockResolvedValue(undefined) + jest.spyOn(global, 'setImmediate').mockImplementation((fn: any) => fn()) + + const req = mockReq({ + body: { chatId: 'existing-id', foo: 'bar' }, + headers: { 'content-type': 'application/json', 'x-callback-url': 'https://cb.example.com' } as any + }) + const res = mockRes() + const next = mockNext() + + await webhookController.createWebhook(req, res, next) + + expect(res.json).toHaveBeenCalledWith({ chatId: 'existing-id', status: 'PROCESSING' }) + }) + + it('generates a chatId when not in body and callback URL is present', async () => { + mockBuildChatflow.mockResolvedValue({ text: 'done' }) + mockDispatchCallback.mockResolvedValue(undefined) + jest.spyOn(global, 'setImmediate').mockImplementation((fn: any) => fn()) + + const req = mockReq({ headers: { 'content-type': 'application/json', 'x-callback-url': 'https://cb.example.com' } as any }) + const res = mockRes() + const next = mockNext() + + await webhookController.createWebhook(req, res, next) + + expect(res.json).toHaveBeenCalledWith({ chatId: 'generated-uuid', status: 'PROCESSING' }) + }) + + it('dispatches SUCCESS callback when flow completes without action', async () => { + const apiResponse = { text: 'hello', executionId: 'exec-1' } + mockBuildChatflow.mockResolvedValue(apiResponse) + mockDispatchCallback.mockResolvedValue(undefined) + jest.spyOn(global, 'setImmediate').mockImplementation((fn: any) => fn()) + + const req = mockReq({ headers: { 'content-type': 'application/json', 'x-callback-url': 'https://cb.example.com' } as any }) + const res = mockRes() + + await webhookController.createWebhook(req, res, mockNext()) + + expect(mockDispatchCallback).toHaveBeenCalledWith( + 'https://cb.example.com', + { status: 'SUCCESS', chatId: expect.any(String), data: apiResponse }, + undefined ) }) + + it('dispatches STOPPED callback when flow has action (HITL pause)', async () => { + const action = { id: 'act-1', mapping: { approve: 'Proceed', reject: 'Reject' }, elements: [] } + const apiResponse = { text: 'waiting', executionId: 'exec-2', action } + mockBuildChatflow.mockResolvedValue(apiResponse) + mockDispatchCallback.mockResolvedValue(undefined) + jest.spyOn(global, 'setImmediate').mockImplementation((fn: any) => fn()) + + const req = mockReq({ headers: { 'content-type': 'application/json', 'x-callback-url': 'https://cb.example.com' } as any }) + const res = mockRes() + + await webhookController.createWebhook(req, res, mockNext()) + + expect(mockDispatchCallback).toHaveBeenCalledWith( + 'https://cb.example.com', + { + status: 'STOPPED', + chatId: expect.any(String), + data: { text: 'waiting', executionId: 'exec-2', action } + }, + undefined + ) + }) + + it('dispatches ERROR callback when flow throws', async () => { + mockBuildChatflow.mockRejectedValue(new Error('flow exploded')) + mockDispatchCallback.mockResolvedValue(undefined) + jest.spyOn(global, 'setImmediate').mockImplementation((fn: any) => fn()) + + const req = mockReq({ headers: { 'content-type': 'application/json', 'x-callback-url': 'https://cb.example.com' } as any }) + const res = mockRes() + + await webhookController.createWebhook(req, res, mockNext()) + + expect(mockDispatchCallback).toHaveBeenCalledWith( + 'https://cb.example.com', + { status: 'ERROR', chatId: expect.any(String), error: 'flow exploded' }, + undefined + ) + }) + + it('uses callbackSecret from Start node config when signing', async () => { + mockValidateWebhookChatflow.mockResolvedValue({ callbackSecret: 'node-secret' }) + mockBuildChatflow.mockResolvedValue({ text: 'done' }) + mockDispatchCallback.mockResolvedValue(undefined) + jest.spyOn(global, 'setImmediate').mockImplementation((fn: any) => fn()) + + const req = mockReq({ headers: { 'content-type': 'application/json', 'x-callback-url': 'https://cb.example.com' } as any }) + const res = mockRes() + + await webhookController.createWebhook(req, res, mockNext()) + + expect(mockDispatchCallback).toHaveBeenCalledWith(expect.any(String), expect.any(Object), 'node-secret') + }) + + it('uses callbackUrl from Start node config when no header is present', async () => { + mockValidateWebhookChatflow.mockResolvedValue({ callbackUrl: 'https://node-configured.example.com/cb' }) + mockBuildChatflow.mockResolvedValue({ text: 'done' }) + mockDispatchCallback.mockResolvedValue(undefined) + jest.spyOn(global, 'setImmediate').mockImplementation((fn: any) => fn()) + + const req = mockReq() + const res = mockRes() + + await webhookController.createWebhook(req, res, mockNext()) + + expect(res.status).toHaveBeenCalledWith(202) + expect(mockDispatchCallback).toHaveBeenCalledWith('https://node-configured.example.com/cb', expect.any(Object), undefined) + }) + + it('header callbackUrl takes priority over Start node config', async () => { + mockValidateWebhookChatflow.mockResolvedValue({ callbackUrl: 'https://node.example.com/cb' }) + mockBuildChatflow.mockResolvedValue({ text: 'done' }) + mockDispatchCallback.mockResolvedValue(undefined) + jest.spyOn(global, 'setImmediate').mockImplementation((fn: any) => fn()) + + const req = mockReq({ headers: { 'content-type': 'application/json', 'x-callback-url': 'https://header.example.com/cb' } as any }) + const res = mockRes() + + await webhookController.createWebhook(req, res, mockNext()) + + expect(mockDispatchCallback).toHaveBeenCalledWith('https://header.example.com/cb', expect.any(Object), undefined) + }) + + it('calls next with BAD_REQUEST when callbackUrl is not a valid http/https URL', async () => { + const req = mockReq({ headers: { 'content-type': 'application/json', 'x-callback-url': 'ftp://bad.example.com' } as any }) + const res = mockRes() + const next = mockNext() + + await webhookController.createWebhook(req, res, next) + + expect(next).toHaveBeenCalledWith(expect.objectContaining({ statusCode: StatusCodes.BAD_REQUEST })) + expect(mockBuildChatflow).not.toHaveBeenCalled() + }) + + it('falls back to synchronous response when no callbackUrl is configured', async () => { + const apiResult = { text: 'sync result' } + mockBuildChatflow.mockResolvedValue(apiResult) + + const req = mockReq() + const res = mockRes() + const next = mockNext() + + await webhookController.createWebhook(req, res, next) + + expect(res.status).not.toHaveBeenCalledWith(202) + expect(res.json).toHaveBeenCalledWith(apiResult) + expect(mockDispatchCallback).not.toHaveBeenCalled() + }) }) diff --git a/packages/server/src/controllers/webhook/index.ts b/packages/server/src/controllers/webhook/index.ts index caf56d74a5f..168ae699c70 100644 --- a/packages/server/src/controllers/webhook/index.ts +++ b/packages/server/src/controllers/webhook/index.ts @@ -1,9 +1,12 @@ import { Request, Response, NextFunction } from 'express' import { StatusCodes } from 'http-status-codes' +import { v4 as uuidv4 } from 'uuid' import { RateLimiterManager } from '../../utils/rateLimit' import predictionsServices from '../../services/predictions' import webhookService from '../../services/webhook' import { InternalFlowiseError } from '../../errors/internalFlowiseError' +import { dispatchCallback } from '../../utils/callbackDispatcher' +import { getErrorMessage } from '../../errors/utils' const createWebhook = async (req: Request, res: Response, next: NextFunction) => { try { @@ -25,16 +28,22 @@ const createWebhook = async (req: Request, res: Response, next: NextFunction) => } } - await webhookService.validateWebhookChatflow( + const isResume = body?.humanInput != null + + const { callbackUrl: nodeCallbackUrl, callbackSecret } = await webhookService.validateWebhookChatflow( req.params.id, workspaceId, body, req.method, req.headers, req.query, - (req as any).rawBody + (req as any).rawBody, + isResume ? { skipFieldValidation: true } : undefined ) + // Header takes priority over Start node config + const callbackUrl: string | undefined = (req.headers['x-callback-url'] as string | undefined) || nodeCallbackUrl + // Namespace the webhook payload so $webhook.body.*, $webhook.headers.*, $webhook.query.* can coexist req.body = { webhook: { @@ -44,6 +53,50 @@ const createWebhook = async (req: Request, res: Response, next: NextFunction) => } } + const { humanInput, chatId: bodyChatId, sessionId } = body ?? {} + if (humanInput != null) req.body.humanInput = humanInput + if (bodyChatId != null) req.body.chatId = bodyChatId + if (sessionId != null) req.body.sessionId = sessionId + + if (callbackUrl) { + try { + const parsed = new URL(callbackUrl) + if (parsed.protocol !== 'http:' && parsed.protocol !== 'https:') throw new Error() + } catch { + throw new InternalFlowiseError(StatusCodes.BAD_REQUEST, `Invalid callbackUrl: must be a valid http or https URL`) + } + + // Pre-assign chatId so the 202 response and the background execution share the same ID + const chatId: string = (bodyChatId as string | undefined) ?? uuidv4() + req.body.chatId = chatId + + res.status(202).json({ chatId, status: 'PROCESSING' }) + + setImmediate(async () => { + try { + const apiResponse = await predictionsServices.buildChatflow(req) + + // apiResponse.action is the parsed humanInputAction — only present when flow is STOPPED (FLOWISE-387) + if (apiResponse.action) { + await dispatchCallback( + callbackUrl, + { + status: 'STOPPED', + chatId, + data: { text: apiResponse.text, executionId: apiResponse.executionId, action: apiResponse.action } + }, + callbackSecret + ) + } else { + await dispatchCallback(callbackUrl, { status: 'SUCCESS', chatId, data: apiResponse }, callbackSecret) + } + } catch (err: any) { + await dispatchCallback(callbackUrl, { status: 'ERROR', chatId, error: getErrorMessage(err) }, callbackSecret) + } + }) + return + } + const apiResponse = await predictionsServices.buildChatflow(req) return res.json(apiResponse) } catch (error) { diff --git a/packages/server/src/services/webhook/index.test.ts b/packages/server/src/services/webhook/index.test.ts index 198c26d7048..df941f83f0e 100644 --- a/packages/server/src/services/webhook/index.test.ts +++ b/packages/server/src/services/webhook/index.test.ts @@ -65,7 +65,7 @@ describe('validateWebhookChatflow', () => { it('resolves without error for a valid webhook chatflow', async () => { mockGetChatflowById.mockResolvedValue(makeChatflow('webhookTrigger')) - await expect(webhookService.validateWebhookChatflow('some-id')).resolves.toBeUndefined() + await expect(webhookService.validateWebhookChatflow('some-id')).resolves.toMatchObject({}) }) it('throws 500 for unexpected errors from getChatflowById', async () => { @@ -96,7 +96,7 @@ describe('validateWebhookChatflow', () => { it('resolves for any method when webhookMethod is not configured', async () => { mockGetChatflowById.mockResolvedValue(makeChatflow('webhookTrigger')) - await expect(webhookService.validateWebhookChatflow('some-id', undefined, {}, 'DELETE')).resolves.toBeUndefined() + await expect(webhookService.validateWebhookChatflow('some-id', undefined, {}, 'DELETE')).resolves.toMatchObject({}) }) // --- Content-Type validation --- @@ -114,7 +114,7 @@ describe('validateWebhookChatflow', () => { await expect( webhookService.validateWebhookChatflow('some-id', undefined, {}, 'POST', { 'content-type': 'application/json; charset=utf-8' }) - ).resolves.toBeUndefined() + ).resolves.toMatchObject({}) }) it('resolves when webhookContentType is not configured', async () => { @@ -122,7 +122,7 @@ describe('validateWebhookChatflow', () => { await expect( webhookService.validateWebhookChatflow('some-id', undefined, {}, 'POST', { 'content-type': 'text/plain' }) - ).resolves.toBeUndefined() + ).resolves.toMatchObject({}) }) // --- Header validation --- @@ -145,7 +145,7 @@ describe('validateWebhookChatflow', () => { await expect( webhookService.validateWebhookChatflow('some-id', undefined, {}, 'POST', { 'x-api-key': 'secret' }) - ).resolves.toBeUndefined() + ).resolves.toMatchObject({}) }) // --- Body param validation --- @@ -170,19 +170,19 @@ describe('validateWebhookChatflow', () => { it('resolves when all required params are present in body', async () => { mockGetChatflowById.mockResolvedValue(makeChatflow('webhookTrigger', { webhookBodyParams: [{ name: 'action', required: true }] })) - await expect(webhookService.validateWebhookChatflow('some-id', undefined, { action: 'push' })).resolves.toBeUndefined() + await expect(webhookService.validateWebhookChatflow('some-id', undefined, { action: 'push' })).resolves.toMatchObject({}) }) it('resolves when webhookBodyParams is empty string (DB default)', async () => { mockGetChatflowById.mockResolvedValue(makeChatflow('webhookTrigger', { webhookBodyParams: '' })) - await expect(webhookService.validateWebhookChatflow('some-id', undefined, {})).resolves.toBeUndefined() + await expect(webhookService.validateWebhookChatflow('some-id', undefined, {})).resolves.toMatchObject({}) }) it('resolves when no params declared but body has arbitrary fields', async () => { mockGetChatflowById.mockResolvedValue(makeChatflow('webhookTrigger')) - await expect(webhookService.validateWebhookChatflow('some-id', undefined, { anything: 'goes' })).resolves.toBeUndefined() + await expect(webhookService.validateWebhookChatflow('some-id', undefined, { anything: 'goes' })).resolves.toMatchObject({}) }) // --- Body type validation --- @@ -203,7 +203,7 @@ describe('validateWebhookChatflow', () => { makeChatflow('webhookTrigger', { webhookBodyParams: [{ name: 'count', type: 'number', required: false }] }) ) - await expect(webhookService.validateWebhookChatflow('some-id', undefined, { count: 42 })).resolves.toBeUndefined() + await expect(webhookService.validateWebhookChatflow('some-id', undefined, { count: 42 })).resolves.toMatchObject({}) }) it('resolves when number param is sent as a numeric string (form-encoded)', async () => { @@ -211,7 +211,7 @@ describe('validateWebhookChatflow', () => { makeChatflow('webhookTrigger', { webhookBodyParams: [{ name: 'count', type: 'number', required: false }] }) ) - await expect(webhookService.validateWebhookChatflow('some-id', undefined, { count: '42' })).resolves.toBeUndefined() + await expect(webhookService.validateWebhookChatflow('some-id', undefined, { count: '42' })).resolves.toMatchObject({}) }) it('throws 400 when number param is an empty string (form-encoded)', async () => { @@ -230,7 +230,7 @@ describe('validateWebhookChatflow', () => { makeChatflow('webhookTrigger', { webhookBodyParams: [{ name: 'active', type: 'boolean', required: false }] }) ) - await expect(webhookService.validateWebhookChatflow('some-id', undefined, { active: true })).resolves.toBeUndefined() + await expect(webhookService.validateWebhookChatflow('some-id', undefined, { active: true })).resolves.toMatchObject({}) }) it('resolves when boolean param is the string "true" (form-encoded)', async () => { @@ -238,7 +238,7 @@ describe('validateWebhookChatflow', () => { makeChatflow('webhookTrigger', { webhookBodyParams: [{ name: 'active', type: 'boolean', required: false }] }) ) - await expect(webhookService.validateWebhookChatflow('some-id', undefined, { active: 'true' })).resolves.toBeUndefined() + await expect(webhookService.validateWebhookChatflow('some-id', undefined, { active: 'true' })).resolves.toMatchObject({}) }) it('resolves when boolean param is the string "false" (form-encoded)', async () => { @@ -246,7 +246,7 @@ describe('validateWebhookChatflow', () => { makeChatflow('webhookTrigger', { webhookBodyParams: [{ name: 'active', type: 'boolean', required: false }] }) ) - await expect(webhookService.validateWebhookChatflow('some-id', undefined, { active: 'false' })).resolves.toBeUndefined() + await expect(webhookService.validateWebhookChatflow('some-id', undefined, { active: 'false' })).resolves.toMatchObject({}) }) it('throws 400 when boolean param is an invalid string like "yes" (form-encoded)', async () => { @@ -274,7 +274,7 @@ describe('validateWebhookChatflow', () => { it('resolves when all required query params are present', async () => { mockGetChatflowById.mockResolvedValue(makeChatflow('webhookTrigger', { webhookQueryParams: [{ name: 'page', required: true }] })) - await expect(webhookService.validateWebhookChatflow('some-id', undefined, {}, 'POST', {}, { page: '2' })).resolves.toBeUndefined() + await expect(webhookService.validateWebhookChatflow('some-id', undefined, {}, 'POST', {}, { page: '2' })).resolves.toMatchObject({}) }) // --- HMAC signature verification --- @@ -290,7 +290,7 @@ describe('validateWebhookChatflow', () => { it('resolves without signature check when no webhookSecret is configured', async () => { mockGetChatflowById.mockResolvedValue(makeChatflow('webhookTrigger')) - await expect(webhookService.validateWebhookChatflow('some-id', undefined, {}, 'POST', {}, {}, RAW_BODY)).resolves.toBeUndefined() + await expect(webhookService.validateWebhookChatflow('some-id', undefined, {}, 'POST', {}, {}, RAW_BODY)).resolves.toMatchObject({}) }) it('resolves when secret is set and signature is valid', async () => { @@ -300,7 +300,7 @@ describe('validateWebhookChatflow', () => { await expect( webhookService.validateWebhookChatflow('some-id', undefined, {}, 'POST', headers, {}, RAW_BODY) - ).resolves.toBeUndefined() + ).resolves.toMatchObject({}) }) it('throws 401 when secret is set but X-Webhook-Signature header is missing', async () => { @@ -344,4 +344,25 @@ describe('validateWebhookChatflow', () => { webhookService.validateWebhookChatflow('some-id', undefined, {}, 'POST', headers, {}, undefined) ).rejects.toMatchObject({ statusCode: 401 }) }) + + // --- skipFieldValidation option (resume calls) --- + + it('skips field validation when skipFieldValidation is true', async () => { + mockGetChatflowById.mockResolvedValue(makeChatflow('webhookTrigger', { webhookBodyParams: [{ name: 'action', required: true }] })) + + // Missing required body param 'action' — would normally throw 400, but not on resume + await expect( + webhookService.validateWebhookChatflow('some-id', undefined, {}, 'POST', {}, {}, undefined, { skipFieldValidation: true }) + ).resolves.toMatchObject({}) + }) + + it('still runs signature check when skipFieldValidation is true', async () => { + mockGetChatflowById.mockResolvedValue(makeChatflow('webhookTrigger', {}, { webhookSecretConfigured: true })) + mockGetWebhookSecret.mockResolvedValue(SECRET) + + // No signature header — should still 401 even with skipFieldValidation + await expect( + webhookService.validateWebhookChatflow('some-id', undefined, {}, 'POST', {}, {}, RAW_BODY, { skipFieldValidation: true }) + ).rejects.toMatchObject({ statusCode: 401 }) + }) }) diff --git a/packages/server/src/services/webhook/index.ts b/packages/server/src/services/webhook/index.ts index e5c6917e3e6..6cd593ba59f 100644 --- a/packages/server/src/services/webhook/index.ts +++ b/packages/server/src/services/webhook/index.ts @@ -12,8 +12,9 @@ const validateWebhookChatflow = async ( method?: string, headers?: Record, query?: Record, - rawBody?: Buffer -): Promise => { + rawBody?: Buffer, + options?: { skipFieldValidation?: boolean } +): Promise<{ callbackUrl?: string; callbackSecret?: string }> => { try { const chatflow = await chatflowsService.getChatflowById(chatflowId, workspaceId) if (!chatflow) { @@ -28,6 +29,9 @@ const validateWebhookChatflow = async ( throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `Chatflow ${chatflowId} is not configured as a webhook trigger`) } + const callbackUrl = (startNode?.data?.inputs?.callbackUrl as string | undefined) || undefined + const callbackSecret = (startNode?.data?.inputs?.callbackSecret as string | undefined) || undefined + // Signature verification (runs before any other validation to fail-fast on bad auth) if (chatflow.webhookSecretConfigured) { const sigHeader = ((startNode?.data?.inputs?.webhookSignatureHeader as string) || 'x-webhook-signature').toLowerCase() @@ -49,6 +53,8 @@ const validateWebhookChatflow = async ( } } + if (options?.skipFieldValidation) return { callbackUrl, callbackSecret } + // Method validation const webhookMethod = startNode?.data?.inputs?.webhookMethod if (webhookMethod && method?.toUpperCase() !== webhookMethod.toUpperCase()) { @@ -105,6 +111,8 @@ const validateWebhookChatflow = async ( if (missingQueryParams.length > 0) { throw new InternalFlowiseError(StatusCodes.BAD_REQUEST, `Missing required query parameters: ${missingQueryParams.join(', ')}`) } + + return { callbackUrl, callbackSecret } } catch (error) { if (error instanceof InternalFlowiseError) throw error throw new InternalFlowiseError( diff --git a/packages/server/src/utils/buildAgentflow.ts b/packages/server/src/utils/buildAgentflow.ts index ba959c17a06..5954250b618 100644 --- a/packages/server/src/utils/buildAgentflow.ts +++ b/packages/server/src/utils/buildAgentflow.ts @@ -1701,6 +1701,23 @@ export const executeAgentFlow = async ({ } } + // On webhook humanInput resume, restore the original trigger's webhook data so $webhook.body.*, + // $webhook.headers.*, $webhook.query.* and $flow.input resolve to the original trigger values. + // incomingInput.webhook is always present on webhook calls so we overwrite it directly rather + // than relying on the agentflowRuntime.webhook fallback (unlike the formInput pattern). + if (startInputType === 'webhookTrigger' && humanInput && previousExecution) { + const previousExecutionData = (JSON.parse(previousExecution.executionData) as IAgentflowExecutedData[]) ?? [] + + const previousStartAgent = previousExecutionData.find((execData) => execData.data.name === 'startAgentflow') + + if (previousStartAgent) { + const previousStartAgentOutput = previousStartAgent.data.output + if (previousStartAgentOutput && typeof previousStartAgentOutput === 'object' && 'webhook' in previousStartAgentOutput) { + incomingInput.webhook = previousStartAgentOutput.webhook as Record + } + } + } + // If it is human input, find the last checkpoint and resume // Skip human input resumption for recursive iteration calls - they should start fresh if (humanInput && !(isRecursive && iterationContext)) { diff --git a/packages/server/src/utils/callbackDispatcher.test.ts b/packages/server/src/utils/callbackDispatcher.test.ts new file mode 100644 index 00000000000..bff70ab46fd --- /dev/null +++ b/packages/server/src/utils/callbackDispatcher.test.ts @@ -0,0 +1,89 @@ +import { createHmac } from 'crypto' + +const mockAxiosPost = jest.fn() +const mockLoggerError = jest.fn() + +jest.mock('axios', () => ({ post: mockAxiosPost })) +jest.mock('./logger', () => ({ error: mockLoggerError })) + +import { dispatchCallback } from './callbackDispatcher' + +const URL = 'https://example.com/callback' +const PAYLOAD = { status: 'SUCCESS', chatId: 'abc-123', data: { text: 'hello' } } + +function expectedSignature(body: string, secret: string): string { + return 'sha256=' + createHmac('sha256', secret).update(body).digest('hex') +} + +describe('dispatchCallback', () => { + beforeEach(() => { + jest.clearAllMocks() + jest.useFakeTimers() + }) + + afterEach(() => { + jest.useRealTimers() + }) + + it('POSTs JSON payload to the callback URL', async () => { + mockAxiosPost.mockResolvedValue({ status: 200 }) + + await dispatchCallback(URL, PAYLOAD) + + expect(mockAxiosPost).toHaveBeenCalledTimes(1) + expect(mockAxiosPost).toHaveBeenCalledWith(URL, JSON.stringify(PAYLOAD), { + headers: { 'Content-Type': 'application/json' }, + timeout: 10000 + }) + }) + + it('includes X-Flowise-Signature header when secret is provided', async () => { + mockAxiosPost.mockResolvedValue({ status: 200 }) + const secret = 'my-secret' + const body = JSON.stringify(PAYLOAD) + + await dispatchCallback(URL, PAYLOAD, secret) + + expect(mockAxiosPost).toHaveBeenCalledWith( + URL, + body, + expect.objectContaining({ + headers: expect.objectContaining({ + 'X-Flowise-Signature': expectedSignature(body, secret) + }) + }) + ) + }) + + it('does not include X-Flowise-Signature when no secret is provided', async () => { + mockAxiosPost.mockResolvedValue({ status: 200 }) + + await dispatchCallback(URL, PAYLOAD) + + const call = mockAxiosPost.mock.calls[0] + expect(call[2].headers).not.toHaveProperty('X-Flowise-Signature') + }) + + it('retries on failure and succeeds on second attempt', async () => { + mockAxiosPost.mockRejectedValueOnce(new Error('timeout')).mockResolvedValue({ status: 200 }) + + const promise = dispatchCallback(URL, PAYLOAD) + await jest.advanceTimersByTimeAsync(3000) + await promise + + expect(mockAxiosPost).toHaveBeenCalledTimes(2) + expect(mockLoggerError).not.toHaveBeenCalled() + }) + + it('logs an error after all 3 attempts fail and does not throw', async () => { + mockAxiosPost.mockRejectedValue(new Error('unreachable')) + + const promise = dispatchCallback(URL, PAYLOAD) + await jest.advanceTimersByTimeAsync(3000) + await jest.advanceTimersByTimeAsync(6000) + await promise + + expect(mockAxiosPost).toHaveBeenCalledTimes(3) + expect(mockLoggerError).toHaveBeenCalledWith(expect.stringContaining('Failed to deliver callback')) + }) +}) diff --git a/packages/server/src/utils/callbackDispatcher.ts b/packages/server/src/utils/callbackDispatcher.ts new file mode 100644 index 00000000000..d32c560cc8d --- /dev/null +++ b/packages/server/src/utils/callbackDispatcher.ts @@ -0,0 +1,32 @@ +import axios from 'axios' +import { createHmac } from 'crypto' +import logger from './logger' + +// Delays in ms before each attempt: attempt 1 is immediate, attempt 2 waits 3s, attempt 3 waits 6s +const RETRY_DELAYS = [0, 3000, 6000] + +function sign(body: string, secret: string): string { + return 'sha256=' + createHmac('sha256', secret).update(body).digest('hex') +} + +export async function dispatchCallback(url: string, payload: Record, secret?: string): Promise { + const body = JSON.stringify(payload) + const headers: Record = { 'Content-Type': 'application/json' } + if (secret) headers['X-Flowise-Signature'] = sign(body, secret) + + for (let attempt = 0; attempt < RETRY_DELAYS.length; attempt++) { + if (RETRY_DELAYS[attempt] > 0) { + await new Promise((r) => setTimeout(r, RETRY_DELAYS[attempt])) + } + try { + await axios.post(url, body, { headers, timeout: 10000 }) + return + } catch (err: any) { + if (attempt === RETRY_DELAYS.length - 1) { + logger.error( + `[callbackDispatcher] Failed to deliver callback to ${url} after ${RETRY_DELAYS.length} attempts: ${err.message}` + ) + } + } + } +}