diff --git a/packages/server/api/src/app/flows/flow-run/flow-run-service.ts b/packages/server/api/src/app/flows/flow-run/flow-run-service.ts index 0282e52894..392204a838 100644 --- a/packages/server/api/src/app/flows/flow-run/flow-run-service.ts +++ b/packages/server/api/src/app/flows/flow-run/flow-run-service.ts @@ -78,6 +78,31 @@ async function updateFlowRunToLatestFlowVersionId( }) } +function returnHandlerId(pauseMetadata: PauseMetadata | undefined, requestId: string | undefined): string { + const handlerId = engineResponseWatcher.getHandlerId() + if (isNil(pauseMetadata)) { + return handlerId + } + + if (pauseMetadata.type === PauseType.WEBHOOK && requestId === pauseMetadata.requestId && pauseMetadata.handlerId) { + return pauseMetadata.handlerId + } + else { + return handlerId + } +} + +function modifyPauseMetadata(pauseMetadata: PauseMetadata): PauseMetadata { + if (pauseMetadata.type === PauseType.WEBHOOK) { + return { + ...pauseMetadata, + handlerId: engineResponseWatcher.getHandlerId(), + } + } + + return pauseMetadata +} + export const flowRunService = { async list({ projectId, @@ -173,6 +198,8 @@ export const flowRunService = { flowRunId: flowRunToResume.id, projectId: flowRunToResume.projectId, flowVersionId: flowRunToResume.flowVersionId, + synchronousHandlerId: returnHandlerId(pauseMetadata, requestId), + hookType: HookType.BEFORE_LOG, executionType, environment: RunEnvironment.PRODUCTION, }) @@ -292,7 +319,7 @@ export const flowRunService = { status: FlowRunStatus.PAUSED, logsFileId: logFileId, // eslint-disable-next-line @typescript-eslint/no-explicit-any - pauseMetadata: pauseMetadata as any, + pauseMetadata: modifyPauseMetadata(pauseMetadata) as any, }) const flowRun = await flowRunRepo.findOneByOrFail({ id: flowRunId }) diff --git a/packages/server/api/src/app/flows/flow.module.ts b/packages/server/api/src/app/flows/flow.module.ts index d913879d9b..d4ed014300 100644 --- a/packages/server/api/src/app/flows/flow.module.ts +++ b/packages/server/api/src/app/flows/flow.module.ts @@ -1,3 +1,4 @@ +import EventEmitter from 'events' import { FastifyPluginAsyncTypebox } from '@fastify/type-provider-typebox' import { accessTokenManager } from '../authentication/lib/access-token-manager' import { websocketService } from '../websockets/websockets.service' @@ -9,7 +10,7 @@ import { folderController } from './folder/folder.controller' import { stepRunService } from './step-run/step-run-service' import { testTriggerController } from './test-trigger/test-trigger-controller' import { logger } from '@activepieces/server-shared' -import { CreateStepRunRequestBody, StepRunResponse, TestFlowRunRequestBody, WebsocketClientEvent, WebsocketServerEvent } from '@activepieces/shared' +import { CreateStepRunRequestBody, FlowRun, isFlowStateTerminal, StepRunResponse, TestFlowRunRequestBody, WebsocketClientEvent, WebsocketServerEvent } from '@activepieces/shared' export const flowModule: FastifyPluginAsyncTypebox = async (app) => { await app.register(flowVersionController, { prefix: '/v1/flows' }) @@ -18,14 +19,24 @@ export const flowModule: FastifyPluginAsyncTypebox = async (app) => { await app.register(testTriggerController, { prefix: '/v1/test-trigger' }) websocketService.addListener(WebsocketServerEvent.TEST_FLOW_RUN, (socket) => { return async (data: TestFlowRunRequestBody) => { + const eventEmitter = new EventEmitter() const principal = await accessTokenManager.extractPrincipal(socket.handshake.auth.token) const flowRun = await flowRunService.test({ projectId: principal.projectId, flowVersionId: data.flowVersionId, }) + socket.emit(WebsocketClientEvent.TEST_FLOW_RUN_STARTED, flowRun) - await engineResponseWatcher.listen(flowRun.id, false) - socket.emit(WebsocketClientEvent.TEST_FLOW_RUN_FINISHED, flowRun) + + eventEmitter.on(WebsocketClientEvent.TEST_FLOW_RUN_PROGRESS, (flowRunResponse: FlowRun) => { + if (isFlowStateTerminal(flowRunResponse.status)) { + eventEmitter.removeAllListeners() + engineResponseWatcher.removeListener(flowRun.id) + } + socket.emit(WebsocketClientEvent.TEST_FLOW_RUN_PROGRESS, flowRunResponse) + }) + + await engineResponseWatcher.listenAndEmit(flowRun.id, eventEmitter, flowRunService.getOneOrThrow({ id: flowRun.id, projectId: principal.projectId })) } }) websocketService.addListener(WebsocketServerEvent.TEST_STEP_RUN, (socket) => { diff --git a/packages/server/api/src/app/workers/flow-worker/engine-response-watcher.ts b/packages/server/api/src/app/workers/flow-worker/engine-response-watcher.ts index aae19a1b6e..60386f0153 100644 --- a/packages/server/api/src/app/workers/flow-worker/engine-response-watcher.ts +++ b/packages/server/api/src/app/workers/flow-worker/engine-response-watcher.ts @@ -1,8 +1,9 @@ +import { EventEmitter } from 'events' import { logger } from '@sentry/utils' import { StatusCodes } from 'http-status-codes' import { pubSub } from '../../helper/pubsub' import { system, SystemProp } from '@activepieces/server-shared' -import { apId } from '@activepieces/shared' +import { apId, FlowRunStatus, WebsocketClientEvent } from '@activepieces/shared' const listeners = new Map void>() @@ -24,6 +25,9 @@ export const engineResponseWatcher = { getHandlerId(): string { return HANDLER_ID }, + removeListener(requestId: string): void { + listeners.delete(requestId) + }, async init(): Promise { logger.info('[engineWatcher#init] Initializing engine run watcher') @@ -42,6 +46,19 @@ export const engineResponseWatcher = { }, ) }, + async listenAndEmit(requestId: string, event: EventEmitter, driver: Promise): Promise { + logger.info(`[engineWatcher#listenAndEmit] requestId=${requestId}`) + + const listenStatus = async () => { + const finalFlowRun = await driver + event.emit(WebsocketClientEvent.TEST_FLOW_RUN_PROGRESS, finalFlowRun) + if (finalFlowRun.status !== FlowRunStatus.SUCCEEDED) { + await listenStatus() + } + } + + await listenStatus() + }, async listen(requestId: string, timeoutRequest: boolean): Promise { logger.info(`[engineWatcher#listen] requestId=${requestId}`) return new Promise((resolve) => { diff --git a/packages/shared/src/index.ts b/packages/shared/src/index.ts index 93a9663be2..879918c2ba 100755 --- a/packages/shared/src/index.ts +++ b/packages/shared/src/index.ts @@ -60,6 +60,7 @@ export { DelayPauseMetadata, PauseMetadata, WebhookPauseMetadata } from './lib/f export * from './lib/federated-authn' export { STORE_KEY_MAX_LENGTH } from './lib/store-entry/store-entry' export { RetryFlowRequestBody } from './lib/flow-run/test-flow-run-request' +export * from './lib/flow-run/flow-status' export * from './lib/flows/dto/flow-template-request' // Look at https://github.com/sinclairzx81/typebox/issues/350 TypeSystem.ExactOptionalPropertyTypes = false diff --git a/packages/shared/src/lib/flow-run/execution/flow-execution.ts b/packages/shared/src/lib/flow-run/execution/flow-execution.ts index a1adc6e3a5..cbca52894e 100644 --- a/packages/shared/src/lib/flow-run/execution/flow-execution.ts +++ b/packages/shared/src/lib/flow-run/execution/flow-execution.ts @@ -29,6 +29,7 @@ export const WebhookPauseMetadata = Type.Object({ type: Type.Literal(PauseType.WEBHOOK), requestId: Type.String(), response: Type.Unknown(), + handlerId: Type.Optional(Type.String({})), }) export type WebhookPauseMetadata = Static diff --git a/packages/shared/src/lib/flow-run/flow-status.ts b/packages/shared/src/lib/flow-run/flow-status.ts new file mode 100644 index 0000000000..24411d3909 --- /dev/null +++ b/packages/shared/src/lib/flow-run/flow-status.ts @@ -0,0 +1,5 @@ +import { FlowRunStatus } from './execution/flow-execution' + +export const isFlowStateTerminal = (status: FlowRunStatus): boolean => { + return status === FlowRunStatus.SUCCEEDED || status === FlowRunStatus.FAILED || status === FlowRunStatus.INTERNAL_ERROR || status === FlowRunStatus.QUOTA_EXCEEDED +} \ No newline at end of file diff --git a/packages/shared/src/lib/websocket/index.ts b/packages/shared/src/lib/websocket/index.ts index cc85d14a77..34156832c2 100644 --- a/packages/shared/src/lib/websocket/index.ts +++ b/packages/shared/src/lib/websocket/index.ts @@ -2,7 +2,7 @@ export enum WebsocketClientEvent { TEST_FLOW_RUN_STARTED = 'TEST_FLOW_RUN_STARTED', - TEST_FLOW_RUN_FINISHED = 'TEST_FLOW_RUN_FINISHED', + TEST_FLOW_RUN_PROGRESS = 'TEST_FLOW_RUN_PROGRESS', GENERATE_CODE_FINISHED = 'GENERATE_CODE_FINIISHED', TEST_STEP_FINISHED = 'TEST_STEP_FINISHED', } diff --git a/packages/ui/feature-builder-canvas/src/lib/components/widgets/test-flow-widget/test-flow-widget.component.ts b/packages/ui/feature-builder-canvas/src/lib/components/widgets/test-flow-widget/test-flow-widget.component.ts index 8d161b2119..56d34955ba 100644 --- a/packages/ui/feature-builder-canvas/src/lib/components/widgets/test-flow-widget/test-flow-widget.component.ts +++ b/packages/ui/feature-builder-canvas/src/lib/components/widgets/test-flow-widget/test-flow-widget.component.ts @@ -112,7 +112,7 @@ export class TestFlowWidgetComponent implements OnInit { ); this.testResult$ = this.websockService.socket - .fromEvent(WebsocketClientEvent.TEST_FLOW_RUN_FINISHED) + .fromEvent(WebsocketClientEvent.TEST_FLOW_RUN_PROGRESS) .pipe( switchMap((flowRun) => { return this.instanceRunService.get(flowRun.id);