diff --git a/packages/server/api/src/app/flows/flow.module.ts b/packages/server/api/src/app/flows/flow.module.ts index d7bda432ec..88ec6c480e 100644 --- a/packages/server/api/src/app/flows/flow.module.ts +++ b/packages/server/api/src/app/flows/flow.module.ts @@ -3,6 +3,7 @@ import { accessTokenManager } from '../authentication/lib/access-token-manager' import { websocketService } from '../websockets/websockets.service' import { engineResponseWatcher } from '../workers/flow-worker/engine-response-watcher' import { flowVersionController } from './flow/flow-version.controller' +import { flowWorkerController } from './flow/flow-worker.controller' import { flowController } from './flow/flow.controller' import { flowRunService } from './flow-run/flow-run-service' import { folderController } from './folder/folder.controller' @@ -12,6 +13,7 @@ import { logger } from '@activepieces/server-shared' import { CreateStepRunRequestBody, isFlowStateTerminal, StepRunResponse, TestFlowRunRequestBody, WebsocketClientEvent, WebsocketServerEvent } from '@activepieces/shared' export const flowModule: FastifyPluginAsyncTypebox = async (app) => { + await app.register(flowWorkerController, { prefix: '/v1/worker/flows' }) await app.register(flowVersionController, { prefix: '/v1/flows' }) await app.register(flowController, { prefix: '/v1/flows' }) await app.register(folderController, { prefix: '/v1/folders' }) diff --git a/packages/server/api/src/app/flows/flow/flow-worker.controller.ts b/packages/server/api/src/app/flows/flow/flow-worker.controller.ts new file mode 100644 index 0000000000..1dcc0850a9 --- /dev/null +++ b/packages/server/api/src/app/flows/flow/flow-worker.controller.ts @@ -0,0 +1,42 @@ +import { FastifyPluginAsyncTypebox, Type } from '@fastify/type-provider-typebox' +import { StatusCodes } from 'http-status-codes' +import { entitiesMustBeOwnedByCurrentProject } from '../../authentication/authorization' +import { flowVersionService } from '../flow-version/flow-version.service' +import { flowService } from './flow.service' +import { PopulatedFlow, PrincipalType } from '@activepieces/shared' + +export const flowWorkerController: FastifyPluginAsyncTypebox = async (fastify) => { + fastify.addHook('preSerialization', entitiesMustBeOwnedByCurrentProject) + + fastify.get('/', GetLockedVersionRequest, async (request) => { + const flowVersion = await flowVersionService.getOneOrThrow(request.query.versionId) + // Check if the flow version is owned by the current project + const flow = await flowService.getOneOrThrow({ + id: flowVersion.flowId, + projectId: request.principal.projectId, + }) + const lockedVersion = await flowVersionService.lockPieceVersions({ + flowVersion, + projectId: request.principal.projectId, + }) + return { + ...flow, + version: lockedVersion, + } + }, + ) +} + +const GetLockedVersionRequest = { + config: { + allowedPrincipals: [PrincipalType.WORKER], + }, + schema: { + querystring: Type.Object({ + versionId: Type.String(), + }), + response: { + [StatusCodes.OK]: PopulatedFlow, + }, + }, +} diff --git a/packages/server/api/src/app/helper/engine-helper.ts b/packages/server/api/src/app/helper/engine-helper.ts index b72c69cbb3..0308f3c2d5 100644 --- a/packages/server/api/src/app/helper/engine-helper.ts +++ b/packages/server/api/src/app/helper/engine-helper.ts @@ -85,7 +85,7 @@ export type EngineHelperResponse = { standardOutput: string } -const generateWorkerToken = ({ +export const generateWorkerToken = ({ projectId, }: GenerateWorkerTokenParams): Promise => { return accessTokenManager.generateToken({ diff --git a/packages/server/api/src/app/workers/flow-worker/flow-worker.ts b/packages/server/api/src/app/workers/flow-worker/flow-worker.ts index 429c4b0d65..e247441619 100644 --- a/packages/server/api/src/app/workers/flow-worker/flow-worker.ts +++ b/packages/server/api/src/app/workers/flow-worker/flow-worker.ts @@ -4,14 +4,14 @@ import { flowRunService, HookType, } from '../../flows/flow-run/flow-run-service' -import { flowVersionService } from '../../flows/flow-version/flow-version.service' -import { engineHelper } from '../../helper/engine-helper' +import { engineHelper, generateWorkerToken } from '../../helper/engine-helper' import { getPiecePackage } from '../../pieces/piece-metadata-service' import { EngineHttpResponse, engineResponseWatcher } from './engine-response-watcher' import { flowWorkerHooks } from './flow-worker-hooks' import { OneTimeJobData } from './job-data' import { exceptionHandler, logger } from '@activepieces/server-shared' -import { Action, ActionType, +import { + Action, ActionType, ActivepiecesError, assertNotNullOrUndefined, BeginExecuteFlowOperation, @@ -40,7 +40,7 @@ import { Action, ActionType, Trigger, TriggerType, } from '@activepieces/shared' -import { logSerializer, Sandbox, SandBoxCacheType, sandboxProvisioner } from 'server-worker' +import { logSerializer, Sandbox, SandBoxCacheType, sandboxProvisioner, serverApiService } from 'server-worker' type FinishExecutionParams = { flowRunId: FlowRunId @@ -183,36 +183,31 @@ async function executeFlow(jobData: OneTimeJobData): Promise { ) const startTime = Date.now() - - const flowVersionWithLockedPieces = await flowVersionService.getOne( - jobData.flowVersionId, - ) - - if (isNil(flowVersionWithLockedPieces)) { + const workerToken = await generateWorkerToken({ + projectId: jobData.projectId, + }) + const serverApi = serverApiService(workerToken) + const flow = await serverApi.getFlowWithExactPieces(jobData.flowVersionId) + if (isNil(flow)) { logger.info({ message: 'Flow version not found, skipping execution', flowVersionId: jobData.flowVersionId, }) return } - const flowVersion = await flowVersionService.lockPieceVersions({ - projectId: jobData.projectId, - flowVersion: flowVersionWithLockedPieces, - }) - await flowWorkerHooks .getHooks() .preExecute({ projectId: jobData.projectId, runId: jobData.runId }) try { const { input, logFileId } = await loadInputAndLogFileId({ - flowVersion, + flowVersion: flow.version, jobData, }) const sandbox = await getSandbox({ projectId: jobData.projectId, - flowVersion, + flowVersion: flow.version, runEnvironment: jobData.environment, }) diff --git a/packages/server/api/test/integration/ce/flows/flow-consume.test.ts b/packages/server/api/test/integration/ce/flows/flow-consume.test.ts new file mode 100644 index 0000000000..8b4d80a7dd --- /dev/null +++ b/packages/server/api/test/integration/ce/flows/flow-consume.test.ts @@ -0,0 +1,194 @@ +import { FastifyInstance } from 'fastify' +import { setupApp } from '../../../../src/app/app' +import { databaseConnection } from '../../../../src/app/database/database-connection' +import { flowWorker } from '../../../../src/app/workers/flow-worker/flow-worker' +import { + createMockFlow, + createMockFlowRun, + createMockFlowVersion, + createMockPlatform, + createMockProject, + createMockUser, +} from '../../../helpers/mocks' +import { fileCompressor } from '@activepieces/server-shared' +import { + ActionType, + ExecutionType, + FlowRunStatus, + FlowStatus, + FlowVersionState, + PackageType, + PieceType, + RunEnvironment, + TriggerType, +} from '@activepieces/shared' + +let app: FastifyInstance | null = null + +beforeAll(async () => { + await databaseConnection.initialize() + app = await setupApp() + await app.listen({ + host: '0.0.0.0', + port: 3000, + }) +}) + +afterAll(async () => { + await databaseConnection.destroy() + await app?.close() +}) + +describe('flow execution', () => { + it('should execute simple flow with code and data mapper', async () => { + const mockUser = createMockUser() + await databaseConnection.getRepository('user').save([mockUser]) + + const mockPlatform = createMockPlatform({ ownerId: mockUser.id }) + await databaseConnection.getRepository('platform').save([mockPlatform]) + + const mockProject = createMockProject({ ownerId: mockUser.id, platformId: mockPlatform.id }) + await databaseConnection.getRepository('project').save([mockProject]) + + const mockFlow = createMockFlow({ + projectId: mockProject.id, + status: FlowStatus.ENABLED, + }) + await databaseConnection.getRepository('flow').save([mockFlow]) + + const mockFlowVersion = createMockFlowVersion({ + flowId: mockFlow.id, + updatedBy: mockUser.id, + state: FlowVersionState.LOCKED, + trigger: { + type: TriggerType.PIECE, + settings: { + pieceName: '@activepieces/piece-schedule', + pieceVersion: '0.1.0', + input: { + run_on_weekends: false, + }, + triggerName: 'everyHourTrigger', + 'pieceType': PieceType.OFFICIAL, + 'packageType': PackageType.REGISTRY, + inputUiInfo: {}, + }, + valid: true, + name: 'webhook', + displayName: 'Webhook', + nextAction: { + name: 'echo_step', + displayName: 'Echo Step', + type: ActionType.CODE, + settings: { + inputUiInfo: {}, + input: { + key: '{{ 1 + 2 }}', + }, + sourceCode: { + packageJson: '{}', + code: ` + export const code = async (inputs) => { + return inputs; + }; + `, + }, + }, + nextAction: { + name: 'datamapper', + displayName: 'Datamapper', + type: ActionType.PIECE, + settings: { + inputUiInfo: {}, + pieceName: '@activepieces/piece-data-mapper', + pieceVersion: '0.3.0', + packageType: 'REGISTRY', + pieceType: 'OFFICIAL', + actionName: 'advanced_mapping', + input: { + mapping: { + key: '{{ 1 + 2 }}', + }, + }, + }, + valid: true, + }, + valid: true, + }, + }, + }) + await databaseConnection + .getRepository('flow_version') + .save([mockFlowVersion]) + + const mockFlowRun = createMockFlowRun({ + flowVersionId: mockFlowVersion.id, + projectId: mockProject.id, + flowId: mockFlow.id, + status: FlowRunStatus.RUNNING, + }) + await databaseConnection.getRepository('flow_run').save([mockFlowRun]) + + await flowWorker.executeFlow({ + flowVersionId: mockFlowVersion.id, + projectId: mockProject.id, + environment: RunEnvironment.PRODUCTION, + runId: mockFlowRun.id, + payload: {}, + executionType: ExecutionType.BEGIN, + }) + + const flowRun = await databaseConnection + .getRepository('flow_run') + .findOneByOrFail({ + id: mockFlowRun.id, + }) + expect(flowRun.status).toEqual(FlowRunStatus.SUCCEEDED) + + const file = await databaseConnection + .getRepository('file') + .findOneByOrFail({ + id: flowRun.logsFileId, + }) + const decompressedData = await fileCompressor.decompress({ + data: file.data, + compression: file.compression, + }) + expect( + JSON.parse(decompressedData.toString('utf-8')).executionState, + ).toEqual({ + steps: { + webhook: { + type: 'PIECE_TRIGGER', + status: 'SUCCEEDED', + input: {}, + output: {}, + }, + echo_step: { + type: 'CODE', + status: 'SUCCEEDED', + input: { + key: 3, + }, + output: { + key: 3, + }, + duration: expect.any(Number), + }, + datamapper: { + type: 'PIECE', + status: 'SUCCEEDED', + input: { + mapping: { + key: 3, + }, + }, + output: { + key: 3, + }, + duration: expect.any(Number), + }, + }, + }) + }, 60000) +}) diff --git a/packages/server/api/test/integration/ce/flows/flow-worker.test.ts b/packages/server/api/test/integration/ce/flows/flow-worker.test.ts index c2f9f2b125..9cfcadd8e2 100644 --- a/packages/server/api/test/integration/ce/flows/flow-worker.test.ts +++ b/packages/server/api/test/integration/ce/flows/flow-worker.test.ts @@ -1,26 +1,17 @@ import { FastifyInstance } from 'fastify' +import { StatusCodes } from 'http-status-codes' import { setupApp } from '../../../../src/app/app' import { databaseConnection } from '../../../../src/app/database/database-connection' -import { flowWorker } from '../../../../src/app/workers/flow-worker/flow-worker' +import { generateMockToken } from '../../../helpers/auth' import { createMockFlow, - createMockFlowRun, createMockFlowVersion, - createMockPlatform, createMockProject, - createMockUser, + mockBasicSetup, } from '../../../helpers/mocks' -import { fileCompressor } from '@activepieces/server-shared' import { - ActionType, - ExecutionType, - FlowRunStatus, - FlowStatus, - FlowVersionState, - PackageType, - PieceType, - RunEnvironment, - TriggerType, + apId, + PrincipalType, } from '@activepieces/shared' let app: FastifyInstance | null = null @@ -35,156 +26,44 @@ afterAll(async () => { await app?.close() }) -describe('flow execution', () => { - it('should execute simple flow with code and data mapper', async () => { - const mockUser = createMockUser() - await databaseConnection.getRepository('user').save([mockUser]) +describe('Flow API for Worker', () => { + describe('Get Flow from Worker', () => { + it('List other flow for another project', async () => { + // arrange + const { mockPlatform, mockOwner, mockProject } = await mockBasicSetup() - const mockPlatform = createMockPlatform({ ownerId: mockUser.id }) - await databaseConnection.getRepository('platform').save([mockPlatform]) - - const mockProject = createMockProject({ ownerId: mockUser.id, platformId: mockPlatform.id }) - await databaseConnection.getRepository('project').save([mockProject]) - - const mockFlow = createMockFlow({ - projectId: mockProject.id, - status: FlowStatus.ENABLED, - }) - await databaseConnection.getRepository('flow').save([mockFlow]) - - const mockFlowVersion = createMockFlowVersion({ - flowId: mockFlow.id, - updatedBy: mockUser.id, - state: FlowVersionState.LOCKED, - trigger: { - type: TriggerType.PIECE, - settings: { - pieceName: '@activepieces/piece-schedule', - pieceVersion: '0.1.0', - input: { - run_on_weekends: false, - }, - triggerName: 'everyHourTrigger', - 'pieceType': PieceType.OFFICIAL, - 'packageType': PackageType.REGISTRY, - inputUiInfo: {}, - }, - valid: true, - name: 'webhook', - displayName: 'Webhook', - nextAction: { - name: 'echo_step', - displayName: 'Echo Step', - type: ActionType.CODE, - settings: { - inputUiInfo: {}, - input: { - key: '{{ 1 + 2 }}', - }, - sourceCode: { - packageJson: '{}', - code: ` - export const code = async (inputs) => { - return inputs; - }; - `, - }, - }, - nextAction: { - name: 'datamapper', - displayName: 'Datamapper', - type: ActionType.PIECE, - settings: { - inputUiInfo: {}, - pieceName: '@activepieces/piece-data-mapper', - pieceVersion: '0.3.0', - packageType: 'REGISTRY', - pieceType: 'OFFICIAL', - actionName: 'advanced_mapping', - input: { - mapping: { - key: '{{ 1 + 2 }}', - }, - }, - }, - valid: true, - }, - valid: true, - }, - }, - }) - await databaseConnection - .getRepository('flow_version') - .save([mockFlowVersion]) + const mockProject2 = createMockProject({ + platformId: mockPlatform.id, + ownerId: mockOwner.id, + }) - const mockFlowRun = createMockFlowRun({ - flowVersionId: mockFlowVersion.id, - projectId: mockProject.id, - flowId: mockFlow.id, - status: FlowRunStatus.RUNNING, - }) - await databaseConnection.getRepository('flow_run').save([mockFlowRun]) + await databaseConnection.getRepository('project').save([mockProject2]) - await flowWorker.executeFlow({ - flowVersionId: mockFlowVersion.id, - projectId: mockProject.id, - environment: RunEnvironment.PRODUCTION, - runId: mockFlowRun.id, - payload: {}, - executionType: ExecutionType.BEGIN, - }) + const mockFlow = createMockFlow({ + projectId: mockProject.id, + }) + await databaseConnection.getRepository('flow').save([mockFlow]) - const flowRun = await databaseConnection - .getRepository('flow_run') - .findOneByOrFail({ - id: mockFlowRun.id, + const mockFlowVersion = createMockFlowVersion({ + flowId: mockFlow.id, }) - expect(flowRun.status).toEqual(FlowRunStatus.SUCCEEDED) + await databaseConnection.getRepository('flow_version').save([mockFlowVersion]) - const file = await databaseConnection - .getRepository('file') - .findOneByOrFail({ - id: flowRun.logsFileId, + const mockToken = await generateMockToken({ + id: apId(), + type: PrincipalType.WORKER, + projectId: mockProject2.id, }) - const decompressedData = await fileCompressor.decompress({ - data: file.data, - compression: file.compression, - }) - expect( - JSON.parse(decompressedData.toString('utf-8')).executionState, - ).toEqual({ - steps: { - webhook: { - type: 'PIECE_TRIGGER', - status: 'SUCCEEDED', - input: {}, - output: {}, - }, - echo_step: { - type: 'CODE', - status: 'SUCCEEDED', - input: { - key: 3, - }, - output: { - key: 3, - }, - duration: expect.any(Number), - }, - datamapper: { - type: 'PIECE', - status: 'SUCCEEDED', - input: { - mapping: { - key: 3, - }, - }, - output: { - key: 3, - }, - duration: expect.any(Number), + + const response = await app?.inject({ + method: 'GET', + url: `/v1/worker/flows/${mockFlowVersion.id}`, + headers: { + authorization: `Bearer ${mockToken}`, }, - }, + }) + expect(response?.statusCode).toBe(StatusCodes.NOT_FOUND) }) - }, 60000) + }) + }) diff --git a/packages/server/worker/package.json b/packages/server/worker/package.json index 80473ec77d..7161a222b3 100644 --- a/packages/server/worker/package.json +++ b/packages/server/worker/package.json @@ -8,7 +8,8 @@ "fs-extra": "11.2.0", "async-mutex": "0.4.0", "dayjs": "1.11.9", - "cron-parser": "4.9.0" + "cron-parser": "4.9.0", + "axios": "1.6.7" }, "type": "commonjs", "main": "./src/index.js", diff --git a/packages/server/worker/src/index.ts b/packages/server/worker/src/index.ts index 296bf4e2b5..eed95f838d 100644 --- a/packages/server/worker/src/index.ts +++ b/packages/server/worker/src/index.ts @@ -1,6 +1,7 @@ -export { ApMemoryQueue } from './lib/queue/ap-memory-queue' +export { serverApiService } from './lib/api/server-api.service' +export { ApMemoryQueue } from './lib/utils/ap-memory-queue' export { sandboxProvisioner } from './lib/sandbox/provisioner/sandbox-provisioner' export { SandBoxCacheType } from './lib/sandbox/provisioner/sandbox-cache-key' export * from './lib/sandbox/sandbox-manager' export * from './lib/sandbox' -export * from './lib/log-serializer' \ No newline at end of file +export * from './lib/utils/log-serializer' \ No newline at end of file diff --git a/packages/server/worker/src/lib/api/server-api.service.ts b/packages/server/worker/src/lib/api/server-api.service.ts new file mode 100644 index 0000000000..de5957f002 --- /dev/null +++ b/packages/server/worker/src/lib/api/server-api.service.ts @@ -0,0 +1,34 @@ + +import { PopulatedFlow } from '@activepieces/shared' +import axios, { isAxiosError } from 'axios' + +const SERVER_URL = 'http://127.0.0.1:3000' + +export const serverApiService = (workerToken: string) => { + const client = axios.create({ + baseURL: SERVER_URL, + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${workerToken}`, + }, + }) + + return { + async getFlowWithExactPieces(flowVersionId: string): Promise { + try { + const response = await client.get('/v1/worker/flows', { + params: { + versionId: flowVersionId, + }, + }) + return response.data + } + catch (error) { + if (isAxiosError(error) && error.response && error.response.status === 404) { + return null + } + throw error + } + }, + } +} \ No newline at end of file diff --git a/packages/server/worker/src/lib/sandbox/files/cached-sandbox.ts b/packages/server/worker/src/lib/sandbox/files/cached-sandbox.ts index 082b20b832..e49d6f5d64 100644 --- a/packages/server/worker/src/lib/sandbox/files/cached-sandbox.ts +++ b/packages/server/worker/src/lib/sandbox/files/cached-sandbox.ts @@ -4,9 +4,9 @@ import { enrichErrorContext, logger, packageManager, system, SystemProp } from ' import { PiecePackage, SourceCode } from '@activepieces/shared' import { Mutex } from 'async-mutex' import dayjs from 'dayjs' -import { codeBuilder } from '../../code-worker/code-builder' -import { engineInstaller } from '../../engine/engine-installer' import { pieceManager } from '../../piece-manager' +import { codeBuilder } from '../../utils/code-builder' +import { engineInstaller } from '../../utils/engine-installer' import { CachedSandboxState } from './cached-sandbox-state' export class CachedSandbox { diff --git a/packages/server/worker/src/lib/queue/ap-memory-queue.ts b/packages/server/worker/src/lib/utils/ap-memory-queue.ts similarity index 100% rename from packages/server/worker/src/lib/queue/ap-memory-queue.ts rename to packages/server/worker/src/lib/utils/ap-memory-queue.ts diff --git a/packages/server/worker/src/lib/code-worker/code-builder.ts b/packages/server/worker/src/lib/utils/code-builder.ts similarity index 100% rename from packages/server/worker/src/lib/code-worker/code-builder.ts rename to packages/server/worker/src/lib/utils/code-builder.ts diff --git a/packages/server/worker/src/lib/engine/engine-installer.ts b/packages/server/worker/src/lib/utils/engine-installer.ts similarity index 100% rename from packages/server/worker/src/lib/engine/engine-installer.ts rename to packages/server/worker/src/lib/utils/engine-installer.ts diff --git a/packages/server/worker/src/lib/log-serializer.ts b/packages/server/worker/src/lib/utils/log-serializer.ts similarity index 100% rename from packages/server/worker/src/lib/log-serializer.ts rename to packages/server/worker/src/lib/utils/log-serializer.ts