diff --git a/package-lock.json b/package-lock.json index 1778664370..f352ea06cd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -147,7 +147,7 @@ "openai": "4.17.5", "papaparse": "5.4.1", "pdf-parse": "1.1.1", - "pdf-text-reader": "5.0.0", + "pdf-text-reader": "4.1.0", "pg": "8.11.3", "pickleparser": "0.1.0", "pino-loki": "2.1.3", @@ -11544,11 +11544,6 @@ "npm": ">= 8.6.0" } }, - "node_modules/@slack/web-api/node_modules/eventemitter3": { - "version": "5.0.1", - "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-5.0.1.tgz", - "integrity": "sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==" - }, "node_modules/@slack/web-api/node_modules/is-stream": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/is-stream/-/is-stream-2.0.1.tgz", @@ -24227,9 +24222,9 @@ } }, "node_modules/eventemitter3": { - "version": "4.0.7", - "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz", - "integrity": "sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw==" + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-5.0.1.tgz", + "integrity": "sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==" }, "node_modules/events": { "version": "3.3.0", @@ -26593,6 +26588,12 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/http-proxy/node_modules/eventemitter3": { + "version": "4.0.7", + "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz", + "integrity": "sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw==", + "dev": true + }, "node_modules/http-server": { "version": "14.1.1", "resolved": "https://registry.npmjs.org/http-server/-/http-server-14.1.1.tgz", @@ -30768,12 +30769,6 @@ "integrity": "sha512-L18DaJsXSUk2+42pv8mLs5jJT2hqFkFE4j21wOmgbUqsZ2hL72NsUU785g9RXgo3s0ZNgVl42TiHp3ZtOv/Vyg==", "dev": true }, - "node_modules/listr2/node_modules/eventemitter3": { - "version": "5.0.1", - "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-5.0.1.tgz", - "integrity": "sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==", - "dev": true - }, "node_modules/listr2/node_modules/string-width": { "version": "5.1.2", "resolved": "https://registry.npmjs.org/string-width/-/string-width-5.1.2.tgz", @@ -34152,6 +34147,11 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/p-queue/node_modules/eventemitter3": { + "version": "4.0.7", + "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz", + "integrity": "sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw==" + }, "node_modules/p-retry": { "version": "4.6.2", "resolved": "https://registry.npmjs.org/p-retry/-/p-retry-4.6.2.tgz", @@ -34473,13 +34473,13 @@ "node": ">=8" } }, - "node_modules/path2d": { - "version": "0.2.0", - "resolved": "https://registry.npmjs.org/path2d/-/path2d-0.2.0.tgz", - "integrity": "sha512-KdPAykQX6kmLSOO6Jpu2KNcCED7CKjmaBNGGNuctOsG0hgYO1OdYQaan6cYXJiG0WmXOwZZPILPBimu5QAIw3A==", + "node_modules/path2d-polyfill": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/path2d-polyfill/-/path2d-polyfill-2.0.1.tgz", + "integrity": "sha512-ad/3bsalbbWhmBo0D6FZ4RNMwsLsPpL6gnvhuSaU5Vm7b06Kr5ubSltQQ0T7YKsiJQO+g22zJ4dJKNTXIyOXtA==", "optional": true, "engines": { - "node": ">=6" + "node": ">=8" } }, "node_modules/pause-stream": { @@ -34512,23 +34512,23 @@ } }, "node_modules/pdf-text-reader": { - "version": "5.0.0", - "resolved": "https://registry.npmjs.org/pdf-text-reader/-/pdf-text-reader-5.0.0.tgz", - "integrity": "sha512-OthaaaSutojBNll4LMiD4oMRP2RoRxA7FQamvBGr9LFoXa2R5E2rwaKXrABddy/O+I7nAR5qrGO/pOjZrMNvog==", + "version": "4.1.0", + "resolved": "https://registry.npmjs.org/pdf-text-reader/-/pdf-text-reader-4.1.0.tgz", + "integrity": "sha512-De2EnUEI+w626PNHMY01Yqdafr6GG5Il8cIplgKsthsc4Uoailq6LNbBhAoFcvLCwzgS/HX/5jajXSL0A0tTcw==", "dependencies": { - "pdfjs-dist": "4.2.67" + "pdfjs-dist": "3.9.179" } }, "node_modules/pdfjs-dist": { - "version": "4.2.67", - "resolved": "https://registry.npmjs.org/pdfjs-dist/-/pdfjs-dist-4.2.67.tgz", - "integrity": "sha512-rJmuBDFpD7cqC8WIkQUEClyB4UAH05K4AsyewToMTp2gSy3Rrx8c1ydAVqlJlGv3yZSOrhEERQU/4ScQQFlLHA==", + "version": "3.9.179", + "resolved": "https://registry.npmjs.org/pdfjs-dist/-/pdfjs-dist-3.9.179.tgz", + "integrity": "sha512-AZBEIAORYDaOAlM0/A4Zg465+XF3ugYDdgrVmioVvNW5tH3xs3RpGFBYOG5PM9/vLM3M/wNncsMLTgyIKdqMKg==", "engines": { "node": ">=18" }, "optionalDependencies": { "canvas": "^2.11.2", - "path2d": "^0.2.0" + "path2d-polyfill": "^2.0.1" } }, "node_modules/peberminta": { diff --git a/package.json b/package.json index dda3b1bb62..ec2b5121d6 100644 --- a/package.json +++ b/package.json @@ -160,7 +160,7 @@ "openai": "4.17.5", "papaparse": "5.4.1", "pdf-parse": "1.1.1", - "pdf-text-reader": "5.0.0", + "pdf-text-reader": "4.1.0", "pg": "8.11.3", "pickleparser": "0.1.0", "pino-loki": "2.1.3", diff --git a/packages/pieces/community/pdf/package.json b/packages/pieces/community/pdf/package.json index 0a7f23c062..a6f0e7b593 100644 --- a/packages/pieces/community/pdf/package.json +++ b/packages/pieces/community/pdf/package.json @@ -3,7 +3,7 @@ "version": "0.0.1", "dependencies": { "@activepieces/pieces-framework": "*", - "pdf-text-reader": "5.0.0", + "pdf-text-reader": "4.1.0", "tslib": "2.6.2" }, "main": "./src/index.js", diff --git a/packages/server/api/src/app/app-event-routing/app-event-routing.module.ts b/packages/server/api/src/app/app-event-routing/app-event-routing.module.ts index fc968cd194..3b50004bac 100644 --- a/packages/server/api/src/app/app-event-routing/app-event-routing.module.ts +++ b/packages/server/api/src/app/app-event-routing/app-event-routing.module.ts @@ -9,7 +9,8 @@ import { slack } from '@activepieces/piece-slack' import { square } from '@activepieces/piece-square' import { Piece } from '@activepieces/pieces-framework' import { logger, rejectedPromiseHandler } from '@activepieces/server-shared' -import { ActivepiecesError, ALL_PRINCIPAL_TYPES, +import { + ActivepiecesError, ALL_PRINCIPAL_TYPES, ErrorCode, EventPayload, isNil, diff --git a/packages/server/api/src/app/flows/flow-run/flow-run-service.ts b/packages/server/api/src/app/flows/flow-run/flow-run-service.ts index 392204a838..a60db820c0 100644 --- a/packages/server/api/src/app/flows/flow-run/flow-run-service.ts +++ b/packages/server/api/src/app/flows/flow-run/flow-run-service.ts @@ -199,7 +199,7 @@ export const flowRunService = { projectId: flowRunToResume.projectId, flowVersionId: flowRunToResume.flowVersionId, synchronousHandlerId: returnHandlerId(pauseMetadata, requestId), - hookType: HookType.BEFORE_LOG, + hookType: HookType.AFTER_LOG, executionType, environment: RunEnvironment.PRODUCTION, }) diff --git a/packages/server/api/src/app/flows/flow.module.ts b/packages/server/api/src/app/flows/flow.module.ts index d4ed014300..d7bda432ec 100644 --- a/packages/server/api/src/app/flows/flow.module.ts +++ b/packages/server/api/src/app/flows/flow.module.ts @@ -1,4 +1,3 @@ -import EventEmitter from 'events' import { FastifyPluginAsyncTypebox } from '@fastify/type-provider-typebox' import { accessTokenManager } from '../authentication/lib/access-token-manager' import { websocketService } from '../websockets/websockets.service' @@ -10,7 +9,7 @@ import { folderController } from './folder/folder.controller' import { stepRunService } from './step-run/step-run-service' import { testTriggerController } from './test-trigger/test-trigger-controller' import { logger } from '@activepieces/server-shared' -import { CreateStepRunRequestBody, FlowRun, isFlowStateTerminal, StepRunResponse, TestFlowRunRequestBody, WebsocketClientEvent, WebsocketServerEvent } from '@activepieces/shared' +import { CreateStepRunRequestBody, isFlowStateTerminal, StepRunResponse, TestFlowRunRequestBody, WebsocketClientEvent, WebsocketServerEvent } from '@activepieces/shared' export const flowModule: FastifyPluginAsyncTypebox = async (app) => { await app.register(flowVersionController, { prefix: '/v1/flows' }) @@ -19,7 +18,6 @@ export const flowModule: FastifyPluginAsyncTypebox = async (app) => { await app.register(testTriggerController, { prefix: '/v1/test-trigger' }) websocketService.addListener(WebsocketServerEvent.TEST_FLOW_RUN, (socket) => { return async (data: TestFlowRunRequestBody) => { - const eventEmitter = new EventEmitter() const principal = await accessTokenManager.extractPrincipal(socket.handshake.auth.token) const flowRun = await flowRunService.test({ projectId: principal.projectId, @@ -27,16 +25,19 @@ export const flowModule: FastifyPluginAsyncTypebox = async (app) => { }) socket.emit(WebsocketClientEvent.TEST_FLOW_RUN_STARTED, flowRun) - - eventEmitter.on(WebsocketClientEvent.TEST_FLOW_RUN_PROGRESS, (flowRunResponse: FlowRun) => { - if (isFlowStateTerminal(flowRunResponse.status)) { - eventEmitter.removeAllListeners() + const eventEmitter = engineResponseWatcher.listen(flowRun.id) + eventEmitter.on(async (data) => { + const flowRun = await flowRunService.getOneOrThrow({ + id: data.requestId, + projectId: principal.projectId, + }) + + if (isFlowStateTerminal(flowRun.status)) { engineResponseWatcher.removeListener(flowRun.id) } - socket.emit(WebsocketClientEvent.TEST_FLOW_RUN_PROGRESS, flowRunResponse) + socket.emit(WebsocketClientEvent.TEST_FLOW_RUN_PROGRESS, flowRun) }) - - await engineResponseWatcher.listenAndEmit(flowRun.id, eventEmitter, flowRunService.getOneOrThrow({ id: flowRun.id, projectId: principal.projectId })) + } }) websocketService.addListener(WebsocketServerEvent.TEST_STEP_RUN, (socket) => { diff --git a/packages/server/api/src/app/webhooks/webhook-controller.ts b/packages/server/api/src/app/webhooks/webhook-controller.ts index c5f117765a..7abe7f1041 100644 --- a/packages/server/api/src/app/webhooks/webhook-controller.ts +++ b/packages/server/api/src/app/webhooks/webhook-controller.ts @@ -104,7 +104,7 @@ async function handleWebhook({ request, flowId, async, simulate }: { request: Fa headers: {}, } } - return engineResponseWatcher.listen(requestId, true) + return engineResponseWatcher.oneTimeListener(requestId, true) } async function convertRequest(request: FastifyRequest): Promise { diff --git a/packages/server/api/src/app/webhooks/webhook-service.ts b/packages/server/api/src/app/webhooks/webhook-service.ts index 0c4b89cf3d..e9e52342c5 100644 --- a/packages/server/api/src/app/webhooks/webhook-service.ts +++ b/packages/server/api/src/app/webhooks/webhook-service.ts @@ -69,7 +69,6 @@ export const webhookService = { logger.info( `[WebhookService#callback] flowInstance not found or not enabled ignoring the webhook, flowId=${flow.id}`, ) - throw new ActivepiecesError({ code: ErrorCode.FLOW_NOT_FOUND, params: { diff --git a/packages/server/api/src/app/workers/flow-worker/consumer/webook-consumer.ts b/packages/server/api/src/app/workers/flow-worker/consumer/webook-consumer.ts index 96eddd9757..7f22c87f4a 100644 --- a/packages/server/api/src/app/workers/flow-worker/consumer/webook-consumer.ts +++ b/packages/server/api/src/app/workers/flow-worker/consumer/webook-consumer.ts @@ -3,7 +3,7 @@ import { flowService } from '../../../flows/flow/flow.service' import { webhookService } from '../../../webhooks/webhook-service' import { EngineHttpResponse, engineResponseWatcher } from '../engine-response-watcher' import { WebhookJobData } from '../job-data' -import { isNil } from '@activepieces/shared' +import { FlowStatus, isNil } from '@activepieces/shared' export const webhookConsumer = { async consumeWebhook(data: WebhookJobData): Promise { @@ -17,6 +17,15 @@ export const webhookConsumer = { }) return } + const isPublishedAndEnabled = flow.status !== FlowStatus.ENABLED || isNil(flow.publishedVersionId) + if (isPublishedAndEnabled) { + await stopAndReply(data, { + status: StatusCodes.NOT_FOUND, + body: {}, + headers: {}, + }) + return + } const handshakeResponse = await webhookService.handshake({ flow, payload, @@ -51,7 +60,7 @@ export const webhookConsumer = { return } const firstRun = runs[0] - const response = await engineResponseWatcher.listen(firstRun.id, true) + const response = await engineResponseWatcher.oneTimeListener(firstRun.id, true) await stopAndReply(data, response) }, diff --git a/packages/server/api/src/app/workers/flow-worker/engine-response-watcher.ts b/packages/server/api/src/app/workers/flow-worker/engine-response-watcher.ts index 60386f0153..36df16e7b8 100644 --- a/packages/server/api/src/app/workers/flow-worker/engine-response-watcher.ts +++ b/packages/server/api/src/app/workers/flow-worker/engine-response-watcher.ts @@ -1,11 +1,10 @@ -import { EventEmitter } from 'events' import { logger } from '@sentry/utils' import { StatusCodes } from 'http-status-codes' import { pubSub } from '../../helper/pubsub' -import { system, SystemProp } from '@activepieces/server-shared' -import { apId, FlowRunStatus, WebsocketClientEvent } from '@activepieces/shared' +import { system, SystemProp, TypedEventEmitter } from '@activepieces/server-shared' +import { apId } from '@activepieces/shared' -const listeners = new Map void>() +const listeners = new Map void>() export type EngineHttpResponse = { status: number @@ -13,7 +12,7 @@ export type EngineHttpResponse = { headers: Record } -type EngineResponseWithId = { +export type EngineResponseWithId = { requestId: string httpResponse: EngineHttpResponse } @@ -37,8 +36,7 @@ export const engineResponseWatcher = { const parsedMessasge: EngineResponseWithId = JSON.parse(message) const listener = listeners.get(parsedMessasge.requestId) if (listener) { - listener(parsedMessasge.httpResponse) - listeners.delete(parsedMessasge.requestId) + listener(parsedMessasge) } logger.info( `[engineWatcher#init] message=${parsedMessasge.requestId}`, @@ -46,41 +44,39 @@ export const engineResponseWatcher = { }, ) }, - async listenAndEmit(requestId: string, event: EventEmitter, driver: Promise): Promise { - logger.info(`[engineWatcher#listenAndEmit] requestId=${requestId}`) - - const listenStatus = async () => { - const finalFlowRun = await driver - event.emit(WebsocketClientEvent.TEST_FLOW_RUN_PROGRESS, finalFlowRun) - if (finalFlowRun.status !== FlowRunStatus.SUCCEEDED) { - await listenStatus() - } - } - - await listenStatus() + listen(requestId: string): TypedEventEmitter { + const eventEmitter = new TypedEventEmitter() + logger.error('FUCK') + listeners.set(requestId, (data) => { + logger.error('ASH ' + data.requestId) + eventEmitter.emit(data) + }) + return eventEmitter }, - async listen(requestId: string, timeoutRequest: boolean): Promise { + async oneTimeListener(requestId: string, timeoutRequest: boolean): Promise { logger.info(`[engineWatcher#listen] requestId=${requestId}`) return new Promise((resolve) => { - const defaultResponse: EngineHttpResponse = { - status: StatusCodes.NO_CONTENT, - body: {}, - headers: {}, - } - const responseHandler = (flowResponse: EngineHttpResponse) => { - clearTimeout(timeout) - resolve(flowResponse) - } let timeout: NodeJS.Timeout - if (!timeoutRequest) { - listeners.set(requestId, resolve) - } - else { + if (timeoutRequest) { + const defaultResponse: EngineHttpResponse = { + status: StatusCodes.NO_CONTENT, + body: {}, + headers: {}, + } timeout = setTimeout(() => { + this.removeListener(requestId) resolve(defaultResponse) }, WEBHOOK_TIMEOUT_MS) - listeners.set(requestId, responseHandler) + + } + const responseHandler = (flowResponse: EngineResponseWithId) => { + if (timeout) { + clearTimeout(timeout) + } + this.removeListener(requestId) + resolve(flowResponse.httpResponse) } + listeners.set(requestId, responseHandler) }) }, async publish( diff --git a/packages/server/shared/package.json b/packages/server/shared/package.json index ef59338eff..8dbc4602ec 100644 --- a/packages/server/shared/package.json +++ b/packages/server/shared/package.json @@ -6,7 +6,8 @@ "@sentry/node": "7.64.0", "pino": "8.18.0", "pino-loki": "2.1.3", - "@activepieces/shared": "*" + "@activepieces/shared": "*", + "events": "3.3.0" }, "type": "commonjs", "main": "./src/index.js", diff --git a/packages/server/shared/src/index.ts b/packages/server/shared/src/index.ts index 9b0e318118..2821662697 100644 --- a/packages/server/shared/src/index.ts +++ b/packages/server/shared/src/index.ts @@ -1,3 +1,5 @@ +export * from './lib/exception-handler' +export * from './lib/typed-event-emitter' export * from './lib/semaphore' export * from './lib/file-compressor' export * from './lib/file-system' @@ -5,5 +7,4 @@ export * from './lib/package-manager' export * from './lib/system/system' export * from './lib/system/system-prop' export * from './lib/promise-handler' -export * from './lib/logger' -export * from './lib/exception-handler' \ No newline at end of file +export * from './lib/logger' \ No newline at end of file diff --git a/packages/server/shared/src/lib/typed-event-emitter.ts b/packages/server/shared/src/lib/typed-event-emitter.ts new file mode 100644 index 0000000000..46b6c74a45 --- /dev/null +++ b/packages/server/shared/src/lib/typed-event-emitter.ts @@ -0,0 +1,17 @@ +import EventEmitter from 'events' + +export class TypedEventEmitter { + private emitter = new EventEmitter() + + emit(eventArg: TEventData) { + this.emitter.emit('event', eventArg) + } + + on(handler: (eventArg: TEventData) => void) { + this.emitter.on('event', handler) + } + + off(handler: (eventArg: TEventData) => void) { + this.emitter.off('event', handler) + } +}