From 741115c490cf7cd4a395b7df1753ddffc9f96cf2 Mon Sep 17 00:00:00 2001 From: Islam Abdelfattah Date: Wed, 8 May 2024 18:23:32 +0000 Subject: [PATCH 1/7] fix: continue flow execution in UI when resuming --- .../app/flows/flow-run/flow-run-service.ts | 21 ++++++++++++++++++- .../server/api/src/app/flows/flow.module.ts | 15 ++++++++++--- .../flow-worker/engine-response-watcher.ts | 3 +++ .../lib/flow-run/execution/flow-execution.ts | 1 + 4 files changed, 36 insertions(+), 4 deletions(-) diff --git a/packages/server/api/src/app/flows/flow-run/flow-run-service.ts b/packages/server/api/src/app/flows/flow-run/flow-run-service.ts index 0282e52894..2c796f3249 100644 --- a/packages/server/api/src/app/flows/flow-run/flow-run-service.ts +++ b/packages/server/api/src/app/flows/flow-run/flow-run-service.ts @@ -78,6 +78,20 @@ async function updateFlowRunToLatestFlowVersionId( }) } +function returnHandlerId(pauseMetadata: PauseMetadata | undefined, requestId: string | undefined): string { + const handlerId = engineResponseWatcher.getHandlerId() + if (isNil(pauseMetadata)) { + return handlerId + } + + if (pauseMetadata.type === PauseType.WEBHOOK && requestId === pauseMetadata.requestId && pauseMetadata.handlerId) { + return pauseMetadata.handlerId + } + else { + return handlerId + } +} + export const flowRunService = { async list({ projectId, @@ -173,6 +187,8 @@ export const flowRunService = { flowRunId: flowRunToResume.id, projectId: flowRunToResume.projectId, flowVersionId: flowRunToResume.flowVersionId, + synchronousHandlerId: returnHandlerId(pauseMetadata, requestId), + hookType: HookType.BEFORE_LOG, executionType, environment: RunEnvironment.PRODUCTION, }) @@ -288,11 +304,14 @@ export const flowRunService = { const { flowRunId, logFileId, pauseMetadata } = params + const updatedPauseMetadata = params.pauseMetadata.type === PauseType.WEBHOOK ? { ...pauseMetadata, + id: engineResponseWatcher.getHandlerId() } : pauseMetadata + await flowRunRepo.update(flowRunId, { status: FlowRunStatus.PAUSED, logsFileId: logFileId, // eslint-disable-next-line @typescript-eslint/no-explicit-any - pauseMetadata: pauseMetadata as any, + pauseMetadata: updatedPauseMetadata as any, }) const flowRun = await flowRunRepo.findOneByOrFail({ id: flowRunId }) diff --git a/packages/server/api/src/app/flows/flow.module.ts b/packages/server/api/src/app/flows/flow.module.ts index d913879d9b..e35b0b503a 100644 --- a/packages/server/api/src/app/flows/flow.module.ts +++ b/packages/server/api/src/app/flows/flow.module.ts @@ -9,7 +9,7 @@ import { folderController } from './folder/folder.controller' import { stepRunService } from './step-run/step-run-service' import { testTriggerController } from './test-trigger/test-trigger-controller' import { logger } from '@activepieces/server-shared' -import { CreateStepRunRequestBody, StepRunResponse, TestFlowRunRequestBody, WebsocketClientEvent, WebsocketServerEvent } from '@activepieces/shared' +import { CreateStepRunRequestBody, FlowRunStatus, StepRunResponse, TestFlowRunRequestBody, WebsocketClientEvent, WebsocketServerEvent } from '@activepieces/shared' export const flowModule: FastifyPluginAsyncTypebox = async (app) => { await app.register(flowVersionController, { prefix: '/v1/flows' }) @@ -24,8 +24,17 @@ export const flowModule: FastifyPluginAsyncTypebox = async (app) => { flowVersionId: data.flowVersionId, }) socket.emit(WebsocketClientEvent.TEST_FLOW_RUN_STARTED, flowRun) - await engineResponseWatcher.listen(flowRun.id, false) - socket.emit(WebsocketClientEvent.TEST_FLOW_RUN_FINISHED, flowRun) + + let finalFlowRun = flowRun + while (finalFlowRun.status === FlowRunStatus.PAUSED) { + await engineResponseWatcher.listen(finalFlowRun.id, false) + finalFlowRun = await flowRunService.getOneOrThrow({ id: finalFlowRun.id, projectId: principal.projectId }) + socket.emit(WebsocketClientEvent.TEST_FLOW_RUN_FINISHED, finalFlowRun) + } + + if (finalFlowRun.status === FlowRunStatus.SUCCEEDED) { + engineResponseWatcher.removeListener(flowRun.id) + } } }) websocketService.addListener(WebsocketServerEvent.TEST_STEP_RUN, (socket) => { diff --git a/packages/server/api/src/app/workers/flow-worker/engine-response-watcher.ts b/packages/server/api/src/app/workers/flow-worker/engine-response-watcher.ts index aae19a1b6e..90b7f7ba71 100644 --- a/packages/server/api/src/app/workers/flow-worker/engine-response-watcher.ts +++ b/packages/server/api/src/app/workers/flow-worker/engine-response-watcher.ts @@ -24,6 +24,9 @@ export const engineResponseWatcher = { getHandlerId(): string { return HANDLER_ID }, + removeListener(requestId: string): void { + listeners.delete(requestId) + }, async init(): Promise { logger.info('[engineWatcher#init] Initializing engine run watcher') diff --git a/packages/shared/src/lib/flow-run/execution/flow-execution.ts b/packages/shared/src/lib/flow-run/execution/flow-execution.ts index a1adc6e3a5..cbca52894e 100644 --- a/packages/shared/src/lib/flow-run/execution/flow-execution.ts +++ b/packages/shared/src/lib/flow-run/execution/flow-execution.ts @@ -29,6 +29,7 @@ export const WebhookPauseMetadata = Type.Object({ type: Type.Literal(PauseType.WEBHOOK), requestId: Type.String(), response: Type.Unknown(), + handlerId: Type.Optional(Type.String({})), }) export type WebhookPauseMetadata = Static From c2af8168222bba904318230979288fcaef183f6b Mon Sep 17 00:00:00 2001 From: Islam Abdelfattah Date: Thu, 9 May 2024 00:08:18 +0000 Subject: [PATCH 2/7] feat: emit events with driver function --- .../server/api/src/app/flows/flow.module.ts | 25 ++++++++++--------- .../flow-worker/engine-response-watcher.ts | 16 +++++++++++- 2 files changed, 28 insertions(+), 13 deletions(-) diff --git a/packages/server/api/src/app/flows/flow.module.ts b/packages/server/api/src/app/flows/flow.module.ts index e35b0b503a..ecd100c854 100644 --- a/packages/server/api/src/app/flows/flow.module.ts +++ b/packages/server/api/src/app/flows/flow.module.ts @@ -1,3 +1,4 @@ +import EventEmitter from 'events' import { FastifyPluginAsyncTypebox } from '@fastify/type-provider-typebox' import { accessTokenManager } from '../authentication/lib/access-token-manager' import { websocketService } from '../websockets/websockets.service' @@ -9,7 +10,7 @@ import { folderController } from './folder/folder.controller' import { stepRunService } from './step-run/step-run-service' import { testTriggerController } from './test-trigger/test-trigger-controller' import { logger } from '@activepieces/server-shared' -import { CreateStepRunRequestBody, FlowRunStatus, StepRunResponse, TestFlowRunRequestBody, WebsocketClientEvent, WebsocketServerEvent } from '@activepieces/shared' +import { CreateStepRunRequestBody, FlowRun, FlowRunStatus, StepRunResponse, TestFlowRunRequestBody, WebsocketClientEvent, WebsocketServerEvent } from '@activepieces/shared' export const flowModule: FastifyPluginAsyncTypebox = async (app) => { await app.register(flowVersionController, { prefix: '/v1/flows' }) @@ -18,23 +19,23 @@ export const flowModule: FastifyPluginAsyncTypebox = async (app) => { await app.register(testTriggerController, { prefix: '/v1/test-trigger' }) websocketService.addListener(WebsocketServerEvent.TEST_FLOW_RUN, (socket) => { return async (data: TestFlowRunRequestBody) => { + const eventEmitter = new EventEmitter() const principal = await accessTokenManager.extractPrincipal(socket.handshake.auth.token) const flowRun = await flowRunService.test({ projectId: principal.projectId, flowVersionId: data.flowVersionId, }) - socket.emit(WebsocketClientEvent.TEST_FLOW_RUN_STARTED, flowRun) - let finalFlowRun = flowRun - while (finalFlowRun.status === FlowRunStatus.PAUSED) { - await engineResponseWatcher.listen(finalFlowRun.id, false) - finalFlowRun = await flowRunService.getOneOrThrow({ id: finalFlowRun.id, projectId: principal.projectId }) - socket.emit(WebsocketClientEvent.TEST_FLOW_RUN_FINISHED, finalFlowRun) - } - - if (finalFlowRun.status === FlowRunStatus.SUCCEEDED) { - engineResponseWatcher.removeListener(flowRun.id) - } + socket.emit(WebsocketClientEvent.TEST_FLOW_RUN_STARTED, flowRun) + + eventEmitter.on('FlowStatus', (flowRunResponse: FlowRun) => { + if (flowRunResponse.status === FlowRunStatus.SUCCEEDED) { + eventEmitter.removeAllListeners() + } + socket.emit(WebsocketClientEvent.TEST_FLOW_RUN_FINISHED, flowRunResponse) + }) + + await engineResponseWatcher.listenAndEmit(flowRun.id, eventEmitter, flowRunService.getOneOrThrow({ id: flowRun.id, projectId: principal.projectId })) } }) websocketService.addListener(WebsocketServerEvent.TEST_STEP_RUN, (socket) => { diff --git a/packages/server/api/src/app/workers/flow-worker/engine-response-watcher.ts b/packages/server/api/src/app/workers/flow-worker/engine-response-watcher.ts index 90b7f7ba71..285e489a0f 100644 --- a/packages/server/api/src/app/workers/flow-worker/engine-response-watcher.ts +++ b/packages/server/api/src/app/workers/flow-worker/engine-response-watcher.ts @@ -1,8 +1,9 @@ +import { EventEmitter } from 'events' import { logger } from '@sentry/utils' import { StatusCodes } from 'http-status-codes' import { pubSub } from '../../helper/pubsub' import { system, SystemProp } from '@activepieces/server-shared' -import { apId } from '@activepieces/shared' +import { apId, FlowRunStatus } from '@activepieces/shared' const listeners = new Map void>() @@ -45,6 +46,19 @@ export const engineResponseWatcher = { }, ) }, + async listenAndEmit(requestId: string, event: EventEmitter, driver: Promise): Promise { + logger.info(`[engineWatcher#listenAndEmit] requestId=${requestId}`) + + const listenStatus = async () => { + const finalFlowRun = await driver + event.emit('FlowStatus', finalFlowRun) + if (finalFlowRun.status !== FlowRunStatus.SUCCEEDED) { + await listenStatus() + } + } + + await listenStatus() + }, async listen(requestId: string, timeoutRequest: boolean): Promise { logger.info(`[engineWatcher#listen] requestId=${requestId}`) return new Promise((resolve) => { From e92f370589b4c39327252016d33d476d90abc923 Mon Sep 17 00:00:00 2001 From: Islam Abdelfattah Date: Thu, 9 May 2024 00:11:59 +0000 Subject: [PATCH 3/7] fix: remove all listeners when failed --- packages/server/api/src/app/flows/flow.module.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/server/api/src/app/flows/flow.module.ts b/packages/server/api/src/app/flows/flow.module.ts index ecd100c854..48d5ae8cb3 100644 --- a/packages/server/api/src/app/flows/flow.module.ts +++ b/packages/server/api/src/app/flows/flow.module.ts @@ -29,8 +29,9 @@ export const flowModule: FastifyPluginAsyncTypebox = async (app) => { socket.emit(WebsocketClientEvent.TEST_FLOW_RUN_STARTED, flowRun) eventEmitter.on('FlowStatus', (flowRunResponse: FlowRun) => { - if (flowRunResponse.status === FlowRunStatus.SUCCEEDED) { + if (flowRunResponse.status === FlowRunStatus.SUCCEEDED || flowRunResponse.status === FlowRunStatus.FAILED) { eventEmitter.removeAllListeners() + engineResponseWatcher.removeListener(flowRun.id) } socket.emit(WebsocketClientEvent.TEST_FLOW_RUN_FINISHED, flowRunResponse) }) From 8e7bff8b3e4bf0f07d70dc3a32ebb862eae9c690 Mon Sep 17 00:00:00 2001 From: Islam Abdelfattah Date: Thu, 9 May 2024 12:01:08 +0000 Subject: [PATCH 4/7] fix: make pauseMetadata handler as a separate function --- .../src/app/flows/flow-run/flow-run-service.ts | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/packages/server/api/src/app/flows/flow-run/flow-run-service.ts b/packages/server/api/src/app/flows/flow-run/flow-run-service.ts index 2c796f3249..392204a838 100644 --- a/packages/server/api/src/app/flows/flow-run/flow-run-service.ts +++ b/packages/server/api/src/app/flows/flow-run/flow-run-service.ts @@ -92,6 +92,17 @@ function returnHandlerId(pauseMetadata: PauseMetadata | undefined, requestId: st } } +function modifyPauseMetadata(pauseMetadata: PauseMetadata): PauseMetadata { + if (pauseMetadata.type === PauseType.WEBHOOK) { + return { + ...pauseMetadata, + handlerId: engineResponseWatcher.getHandlerId(), + } + } + + return pauseMetadata +} + export const flowRunService = { async list({ projectId, @@ -304,14 +315,11 @@ export const flowRunService = { const { flowRunId, logFileId, pauseMetadata } = params - const updatedPauseMetadata = params.pauseMetadata.type === PauseType.WEBHOOK ? { ...pauseMetadata, - id: engineResponseWatcher.getHandlerId() } : pauseMetadata - await flowRunRepo.update(flowRunId, { status: FlowRunStatus.PAUSED, logsFileId: logFileId, // eslint-disable-next-line @typescript-eslint/no-explicit-any - pauseMetadata: updatedPauseMetadata as any, + pauseMetadata: modifyPauseMetadata(pauseMetadata) as any, }) const flowRun = await flowRunRepo.findOneByOrFail({ id: flowRunId }) From 9c75b68b033184b2d1c99a84fe85c104fd18772f Mon Sep 17 00:00:00 2001 From: Islam Abdelfattah Date: Thu, 9 May 2024 12:01:49 +0000 Subject: [PATCH 5/7] feat: add getFlowState to check state for running flow --- packages/shared/src/index.ts | 1 + packages/shared/src/lib/flow-run/flow-status.ts | 5 +++++ 2 files changed, 6 insertions(+) create mode 100644 packages/shared/src/lib/flow-run/flow-status.ts diff --git a/packages/shared/src/index.ts b/packages/shared/src/index.ts index 93a9663be2..879918c2ba 100755 --- a/packages/shared/src/index.ts +++ b/packages/shared/src/index.ts @@ -60,6 +60,7 @@ export { DelayPauseMetadata, PauseMetadata, WebhookPauseMetadata } from './lib/f export * from './lib/federated-authn' export { STORE_KEY_MAX_LENGTH } from './lib/store-entry/store-entry' export { RetryFlowRequestBody } from './lib/flow-run/test-flow-run-request' +export * from './lib/flow-run/flow-status' export * from './lib/flows/dto/flow-template-request' // Look at https://github.com/sinclairzx81/typebox/issues/350 TypeSystem.ExactOptionalPropertyTypes = false diff --git a/packages/shared/src/lib/flow-run/flow-status.ts b/packages/shared/src/lib/flow-run/flow-status.ts new file mode 100644 index 0000000000..b9dafd3fc8 --- /dev/null +++ b/packages/shared/src/lib/flow-run/flow-status.ts @@ -0,0 +1,5 @@ +import { FlowRunStatus } from './execution/flow-execution' + +export const getFlowState = (status: FlowRunStatus): boolean => { + return status === FlowRunStatus.SUCCEEDED || status === FlowRunStatus.FAILED || status === FlowRunStatus.INTERNAL_ERROR || status === FlowRunStatus.QUOTA_EXCEEDED +} \ No newline at end of file From 087091e08773d3616ad394fd1bd175735c8adb9b Mon Sep 17 00:00:00 2001 From: Islam Abdelfattah Date: Thu, 9 May 2024 12:02:42 +0000 Subject: [PATCH 6/7] fix: rename websocket event --- packages/server/api/src/app/flows/flow.module.ts | 8 ++++---- .../app/workers/flow-worker/engine-response-watcher.ts | 4 ++-- packages/shared/src/lib/websocket/index.ts | 2 +- .../test-flow-widget/test-flow-widget.component.ts | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/packages/server/api/src/app/flows/flow.module.ts b/packages/server/api/src/app/flows/flow.module.ts index 48d5ae8cb3..4dc87bbdc4 100644 --- a/packages/server/api/src/app/flows/flow.module.ts +++ b/packages/server/api/src/app/flows/flow.module.ts @@ -10,7 +10,7 @@ import { folderController } from './folder/folder.controller' import { stepRunService } from './step-run/step-run-service' import { testTriggerController } from './test-trigger/test-trigger-controller' import { logger } from '@activepieces/server-shared' -import { CreateStepRunRequestBody, FlowRun, FlowRunStatus, StepRunResponse, TestFlowRunRequestBody, WebsocketClientEvent, WebsocketServerEvent } from '@activepieces/shared' +import { CreateStepRunRequestBody, FlowRun, getFlowState, StepRunResponse, TestFlowRunRequestBody, WebsocketClientEvent, WebsocketServerEvent } from '@activepieces/shared' export const flowModule: FastifyPluginAsyncTypebox = async (app) => { await app.register(flowVersionController, { prefix: '/v1/flows' }) @@ -28,12 +28,12 @@ export const flowModule: FastifyPluginAsyncTypebox = async (app) => { socket.emit(WebsocketClientEvent.TEST_FLOW_RUN_STARTED, flowRun) - eventEmitter.on('FlowStatus', (flowRunResponse: FlowRun) => { - if (flowRunResponse.status === FlowRunStatus.SUCCEEDED || flowRunResponse.status === FlowRunStatus.FAILED) { + eventEmitter.on(WebsocketClientEvent.TEST_FLOW_RUN_PROGRESS, (flowRunResponse: FlowRun) => { + if (getFlowState(flowRunResponse.status)) { eventEmitter.removeAllListeners() engineResponseWatcher.removeListener(flowRun.id) } - socket.emit(WebsocketClientEvent.TEST_FLOW_RUN_FINISHED, flowRunResponse) + socket.emit(WebsocketClientEvent.TEST_FLOW_RUN_PROGRESS, flowRunResponse) }) await engineResponseWatcher.listenAndEmit(flowRun.id, eventEmitter, flowRunService.getOneOrThrow({ id: flowRun.id, projectId: principal.projectId })) diff --git a/packages/server/api/src/app/workers/flow-worker/engine-response-watcher.ts b/packages/server/api/src/app/workers/flow-worker/engine-response-watcher.ts index 285e489a0f..60386f0153 100644 --- a/packages/server/api/src/app/workers/flow-worker/engine-response-watcher.ts +++ b/packages/server/api/src/app/workers/flow-worker/engine-response-watcher.ts @@ -3,7 +3,7 @@ import { logger } from '@sentry/utils' import { StatusCodes } from 'http-status-codes' import { pubSub } from '../../helper/pubsub' import { system, SystemProp } from '@activepieces/server-shared' -import { apId, FlowRunStatus } from '@activepieces/shared' +import { apId, FlowRunStatus, WebsocketClientEvent } from '@activepieces/shared' const listeners = new Map void>() @@ -51,7 +51,7 @@ export const engineResponseWatcher = { const listenStatus = async () => { const finalFlowRun = await driver - event.emit('FlowStatus', finalFlowRun) + event.emit(WebsocketClientEvent.TEST_FLOW_RUN_PROGRESS, finalFlowRun) if (finalFlowRun.status !== FlowRunStatus.SUCCEEDED) { await listenStatus() } diff --git a/packages/shared/src/lib/websocket/index.ts b/packages/shared/src/lib/websocket/index.ts index cc85d14a77..34156832c2 100644 --- a/packages/shared/src/lib/websocket/index.ts +++ b/packages/shared/src/lib/websocket/index.ts @@ -2,7 +2,7 @@ export enum WebsocketClientEvent { TEST_FLOW_RUN_STARTED = 'TEST_FLOW_RUN_STARTED', - TEST_FLOW_RUN_FINISHED = 'TEST_FLOW_RUN_FINISHED', + TEST_FLOW_RUN_PROGRESS = 'TEST_FLOW_RUN_PROGRESS', GENERATE_CODE_FINISHED = 'GENERATE_CODE_FINIISHED', TEST_STEP_FINISHED = 'TEST_STEP_FINISHED', } diff --git a/packages/ui/feature-builder-canvas/src/lib/components/widgets/test-flow-widget/test-flow-widget.component.ts b/packages/ui/feature-builder-canvas/src/lib/components/widgets/test-flow-widget/test-flow-widget.component.ts index 8d161b2119..56d34955ba 100644 --- a/packages/ui/feature-builder-canvas/src/lib/components/widgets/test-flow-widget/test-flow-widget.component.ts +++ b/packages/ui/feature-builder-canvas/src/lib/components/widgets/test-flow-widget/test-flow-widget.component.ts @@ -112,7 +112,7 @@ export class TestFlowWidgetComponent implements OnInit { ); this.testResult$ = this.websockService.socket - .fromEvent(WebsocketClientEvent.TEST_FLOW_RUN_FINISHED) + .fromEvent(WebsocketClientEvent.TEST_FLOW_RUN_PROGRESS) .pipe( switchMap((flowRun) => { return this.instanceRunService.get(flowRun.id); From b16503a9ee58d3bf2ff1c6b3a6a9a7fa2ea9c467 Mon Sep 17 00:00:00 2001 From: Islam Abdelfattah Date: Thu, 9 May 2024 12:29:42 +0000 Subject: [PATCH 7/7] fix: flow state function naming --- packages/server/api/src/app/flows/flow.module.ts | 4 ++-- packages/shared/src/lib/flow-run/flow-status.ts | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/server/api/src/app/flows/flow.module.ts b/packages/server/api/src/app/flows/flow.module.ts index 4dc87bbdc4..d4ed014300 100644 --- a/packages/server/api/src/app/flows/flow.module.ts +++ b/packages/server/api/src/app/flows/flow.module.ts @@ -10,7 +10,7 @@ import { folderController } from './folder/folder.controller' import { stepRunService } from './step-run/step-run-service' import { testTriggerController } from './test-trigger/test-trigger-controller' import { logger } from '@activepieces/server-shared' -import { CreateStepRunRequestBody, FlowRun, getFlowState, StepRunResponse, TestFlowRunRequestBody, WebsocketClientEvent, WebsocketServerEvent } from '@activepieces/shared' +import { CreateStepRunRequestBody, FlowRun, isFlowStateTerminal, StepRunResponse, TestFlowRunRequestBody, WebsocketClientEvent, WebsocketServerEvent } from '@activepieces/shared' export const flowModule: FastifyPluginAsyncTypebox = async (app) => { await app.register(flowVersionController, { prefix: '/v1/flows' }) @@ -29,7 +29,7 @@ export const flowModule: FastifyPluginAsyncTypebox = async (app) => { socket.emit(WebsocketClientEvent.TEST_FLOW_RUN_STARTED, flowRun) eventEmitter.on(WebsocketClientEvent.TEST_FLOW_RUN_PROGRESS, (flowRunResponse: FlowRun) => { - if (getFlowState(flowRunResponse.status)) { + if (isFlowStateTerminal(flowRunResponse.status)) { eventEmitter.removeAllListeners() engineResponseWatcher.removeListener(flowRun.id) } diff --git a/packages/shared/src/lib/flow-run/flow-status.ts b/packages/shared/src/lib/flow-run/flow-status.ts index b9dafd3fc8..24411d3909 100644 --- a/packages/shared/src/lib/flow-run/flow-status.ts +++ b/packages/shared/src/lib/flow-run/flow-status.ts @@ -1,5 +1,5 @@ import { FlowRunStatus } from './execution/flow-execution' -export const getFlowState = (status: FlowRunStatus): boolean => { +export const isFlowStateTerminal = (status: FlowRunStatus): boolean => { return status === FlowRunStatus.SUCCEEDED || status === FlowRunStatus.FAILED || status === FlowRunStatus.INTERNAL_ERROR || status === FlowRunStatus.QUOTA_EXCEEDED } \ No newline at end of file