Skip to content

Commit

Permalink
Merge pull request #4595 from activepieces/feat/isolate-worker
Browse files Browse the repository at this point in the history
feat: isolate worker form backend
  • Loading branch information
abuaboud committed May 14, 2024
2 parents 88ae285 + bbfdae3 commit 2fcaf15
Show file tree
Hide file tree
Showing 14 changed files with 328 additions and 180 deletions.
2 changes: 2 additions & 0 deletions packages/server/api/src/app/flows/flow.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { accessTokenManager } from '../authentication/lib/access-token-manager'
import { websocketService } from '../websockets/websockets.service'
import { engineResponseWatcher } from '../workers/flow-worker/engine-response-watcher'
import { flowVersionController } from './flow/flow-version.controller'
import { flowWorkerController } from './flow/flow-worker.controller'
import { flowController } from './flow/flow.controller'
import { flowRunService } from './flow-run/flow-run-service'
import { folderController } from './folder/folder.controller'
Expand All @@ -12,6 +13,7 @@ import { logger } from '@activepieces/server-shared'
import { CreateStepRunRequestBody, isFlowStateTerminal, StepRunResponse, TestFlowRunRequestBody, WebsocketClientEvent, WebsocketServerEvent } from '@activepieces/shared'

export const flowModule: FastifyPluginAsyncTypebox = async (app) => {
await app.register(flowWorkerController, { prefix: '/v1/worker/flows' })
await app.register(flowVersionController, { prefix: '/v1/flows' })
await app.register(flowController, { prefix: '/v1/flows' })
await app.register(folderController, { prefix: '/v1/folders' })
Expand Down
42 changes: 42 additions & 0 deletions packages/server/api/src/app/flows/flow/flow-worker.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { FastifyPluginAsyncTypebox, Type } from '@fastify/type-provider-typebox'
import { StatusCodes } from 'http-status-codes'
import { entitiesMustBeOwnedByCurrentProject } from '../../authentication/authorization'
import { flowVersionService } from '../flow-version/flow-version.service'
import { flowService } from './flow.service'
import { PopulatedFlow, PrincipalType } from '@activepieces/shared'

export const flowWorkerController: FastifyPluginAsyncTypebox = async (fastify) => {
fastify.addHook('preSerialization', entitiesMustBeOwnedByCurrentProject)

fastify.get('/', GetLockedVersionRequest, async (request) => {
const flowVersion = await flowVersionService.getOneOrThrow(request.query.versionId)
// Check if the flow version is owned by the current project
const flow = await flowService.getOneOrThrow({
id: flowVersion.flowId,
projectId: request.principal.projectId,
})
const lockedVersion = await flowVersionService.lockPieceVersions({
flowVersion,
projectId: request.principal.projectId,
})
return {
...flow,
version: lockedVersion,
}
},
)
}

const GetLockedVersionRequest = {
config: {
allowedPrincipals: [PrincipalType.WORKER],
},
schema: {
querystring: Type.Object({
versionId: Type.String(),
}),
response: {
[StatusCodes.OK]: PopulatedFlow,
},
},
}
2 changes: 1 addition & 1 deletion packages/server/api/src/app/helper/engine-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ export type EngineHelperResponse<Result extends EngineHelperResult> = {
standardOutput: string
}

const generateWorkerToken = ({
export const generateWorkerToken = ({
projectId,
}: GenerateWorkerTokenParams): Promise<string> => {
return accessTokenManager.generateToken({
Expand Down
29 changes: 12 additions & 17 deletions packages/server/api/src/app/workers/flow-worker/flow-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import {
flowRunService,
HookType,
} from '../../flows/flow-run/flow-run-service'
import { flowVersionService } from '../../flows/flow-version/flow-version.service'
import { engineHelper } from '../../helper/engine-helper'
import { engineHelper, generateWorkerToken } from '../../helper/engine-helper'
import { getPiecePackage } from '../../pieces/piece-metadata-service'
import { EngineHttpResponse, engineResponseWatcher } from './engine-response-watcher'
import { flowWorkerHooks } from './flow-worker-hooks'
import { OneTimeJobData } from './job-data'
import { exceptionHandler, logger } from '@activepieces/server-shared'
import { Action, ActionType,
import {
Action, ActionType,
ActivepiecesError,
assertNotNullOrUndefined,
BeginExecuteFlowOperation,
Expand Down Expand Up @@ -40,7 +40,7 @@ import { Action, ActionType,
Trigger,
TriggerType,
} from '@activepieces/shared'
import { logSerializer, Sandbox, SandBoxCacheType, sandboxProvisioner } from 'server-worker'
import { logSerializer, Sandbox, SandBoxCacheType, sandboxProvisioner, serverApiService } from 'server-worker'

type FinishExecutionParams = {
flowRunId: FlowRunId
Expand Down Expand Up @@ -183,36 +183,31 @@ async function executeFlow(jobData: OneTimeJobData): Promise<void> {
)

const startTime = Date.now()

const flowVersionWithLockedPieces = await flowVersionService.getOne(
jobData.flowVersionId,
)

if (isNil(flowVersionWithLockedPieces)) {
const workerToken = await generateWorkerToken({
projectId: jobData.projectId,
})
const serverApi = serverApiService(workerToken)
const flow = await serverApi.getFlowWithExactPieces(jobData.flowVersionId)
if (isNil(flow)) {
logger.info({
message: 'Flow version not found, skipping execution',
flowVersionId: jobData.flowVersionId,
})
return
}
const flowVersion = await flowVersionService.lockPieceVersions({
projectId: jobData.projectId,
flowVersion: flowVersionWithLockedPieces,
})

await flowWorkerHooks
.getHooks()
.preExecute({ projectId: jobData.projectId, runId: jobData.runId })

try {
const { input, logFileId } = await loadInputAndLogFileId({
flowVersion,
flowVersion: flow.version,
jobData,
})

const sandbox = await getSandbox({
projectId: jobData.projectId,
flowVersion,
flowVersion: flow.version,
runEnvironment: jobData.environment,
})

Expand Down
194 changes: 194 additions & 0 deletions packages/server/api/test/integration/ce/flows/flow-consume.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
import { FastifyInstance } from 'fastify'
import { setupApp } from '../../../../src/app/app'
import { databaseConnection } from '../../../../src/app/database/database-connection'
import { flowWorker } from '../../../../src/app/workers/flow-worker/flow-worker'
import {
createMockFlow,
createMockFlowRun,
createMockFlowVersion,
createMockPlatform,
createMockProject,
createMockUser,
} from '../../../helpers/mocks'
import { fileCompressor } from '@activepieces/server-shared'
import {
ActionType,
ExecutionType,
FlowRunStatus,
FlowStatus,
FlowVersionState,
PackageType,
PieceType,
RunEnvironment,
TriggerType,
} from '@activepieces/shared'

let app: FastifyInstance | null = null

beforeAll(async () => {
await databaseConnection.initialize()
app = await setupApp()
await app.listen({
host: '0.0.0.0',
port: 3000,
})
})

afterAll(async () => {
await databaseConnection.destroy()
await app?.close()
})

describe('flow execution', () => {
it('should execute simple flow with code and data mapper', async () => {
const mockUser = createMockUser()
await databaseConnection.getRepository('user').save([mockUser])

const mockPlatform = createMockPlatform({ ownerId: mockUser.id })
await databaseConnection.getRepository('platform').save([mockPlatform])

const mockProject = createMockProject({ ownerId: mockUser.id, platformId: mockPlatform.id })
await databaseConnection.getRepository('project').save([mockProject])

const mockFlow = createMockFlow({
projectId: mockProject.id,
status: FlowStatus.ENABLED,
})
await databaseConnection.getRepository('flow').save([mockFlow])

const mockFlowVersion = createMockFlowVersion({
flowId: mockFlow.id,
updatedBy: mockUser.id,
state: FlowVersionState.LOCKED,
trigger: {
type: TriggerType.PIECE,
settings: {
pieceName: '@activepieces/piece-schedule',
pieceVersion: '0.1.0',
input: {
run_on_weekends: false,
},
triggerName: 'everyHourTrigger',
'pieceType': PieceType.OFFICIAL,
'packageType': PackageType.REGISTRY,
inputUiInfo: {},
},
valid: true,
name: 'webhook',
displayName: 'Webhook',
nextAction: {
name: 'echo_step',
displayName: 'Echo Step',
type: ActionType.CODE,
settings: {
inputUiInfo: {},
input: {
key: '{{ 1 + 2 }}',
},
sourceCode: {
packageJson: '{}',
code: `
export const code = async (inputs) => {
return inputs;
};
`,
},
},
nextAction: {
name: 'datamapper',
displayName: 'Datamapper',
type: ActionType.PIECE,
settings: {
inputUiInfo: {},
pieceName: '@activepieces/piece-data-mapper',
pieceVersion: '0.3.0',
packageType: 'REGISTRY',
pieceType: 'OFFICIAL',
actionName: 'advanced_mapping',
input: {
mapping: {
key: '{{ 1 + 2 }}',
},
},
},
valid: true,
},
valid: true,
},
},
})
await databaseConnection
.getRepository('flow_version')
.save([mockFlowVersion])

const mockFlowRun = createMockFlowRun({
flowVersionId: mockFlowVersion.id,
projectId: mockProject.id,
flowId: mockFlow.id,
status: FlowRunStatus.RUNNING,
})
await databaseConnection.getRepository('flow_run').save([mockFlowRun])

await flowWorker.executeFlow({
flowVersionId: mockFlowVersion.id,
projectId: mockProject.id,
environment: RunEnvironment.PRODUCTION,
runId: mockFlowRun.id,
payload: {},
executionType: ExecutionType.BEGIN,
})

const flowRun = await databaseConnection
.getRepository('flow_run')
.findOneByOrFail({
id: mockFlowRun.id,
})
expect(flowRun.status).toEqual(FlowRunStatus.SUCCEEDED)

const file = await databaseConnection
.getRepository('file')
.findOneByOrFail({
id: flowRun.logsFileId,
})
const decompressedData = await fileCompressor.decompress({
data: file.data,
compression: file.compression,
})
expect(
JSON.parse(decompressedData.toString('utf-8')).executionState,
).toEqual({
steps: {
webhook: {
type: 'PIECE_TRIGGER',
status: 'SUCCEEDED',
input: {},
output: {},
},
echo_step: {
type: 'CODE',
status: 'SUCCEEDED',
input: {
key: 3,
},
output: {
key: 3,
},
duration: expect.any(Number),
},
datamapper: {
type: 'PIECE',
status: 'SUCCEEDED',
input: {
mapping: {
key: 3,
},
},
output: {
key: 3,
},
duration: expect.any(Number),
},
},
})
}, 60000)
})

0 comments on commit 2fcaf15

Please sign in to comment.