Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: isolate worker form backend #4595

Merged
merged 11 commits into from
May 14, 2024
2 changes: 2 additions & 0 deletions packages/server/api/src/app/flows/flow.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { accessTokenManager } from '../authentication/lib/access-token-manager'
import { websocketService } from '../websockets/websockets.service'
import { engineResponseWatcher } from '../workers/flow-worker/engine-response-watcher'
import { flowVersionController } from './flow/flow-version.controller'
import { flowWorkerController } from './flow/flow-worker.controller'
import { flowController } from './flow/flow.controller'
import { flowRunService } from './flow-run/flow-run-service'
import { folderController } from './folder/folder.controller'
Expand All @@ -12,6 +13,7 @@ import { logger } from '@activepieces/server-shared'
import { CreateStepRunRequestBody, isFlowStateTerminal, StepRunResponse, TestFlowRunRequestBody, WebsocketClientEvent, WebsocketServerEvent } from '@activepieces/shared'

export const flowModule: FastifyPluginAsyncTypebox = async (app) => {
await app.register(flowWorkerController, { prefix: '/v1/worker/flows' })
await app.register(flowVersionController, { prefix: '/v1/flows' })
await app.register(flowController, { prefix: '/v1/flows' })
await app.register(folderController, { prefix: '/v1/folders' })
Expand Down
42 changes: 42 additions & 0 deletions packages/server/api/src/app/flows/flow/flow-worker.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { FastifyPluginAsyncTypebox, Type } from '@fastify/type-provider-typebox'
import { StatusCodes } from 'http-status-codes'
import { entitiesMustBeOwnedByCurrentProject } from '../../authentication/authorization'
import { flowVersionService } from '../flow-version/flow-version.service'
import { flowService } from './flow.service'
import { PopulatedFlow, PrincipalType } from '@activepieces/shared'

export const flowWorkerController: FastifyPluginAsyncTypebox = async (fastify) => {
fastify.addHook('preSerialization', entitiesMustBeOwnedByCurrentProject)

fastify.get('/', GetLockedVersionRequest, async (request) => {
const flowVersion = await flowVersionService.getOneOrThrow(request.query.versionId)
// Check if the flow version is owned by the current project
const flow = await flowService.getOneOrThrow({
id: flowVersion.flowId,
projectId: request.principal.projectId,
})
const lockedVersion = await flowVersionService.lockPieceVersions({
flowVersion,
projectId: request.principal.projectId,
})
return {
...flow,
version: lockedVersion,
}
},
)
}

const GetLockedVersionRequest = {
config: {
allowedPrincipals: [PrincipalType.WORKER],
},
schema: {
querystring: Type.Object({
versionId: Type.String(),
}),
response: {
[StatusCodes.OK]: PopulatedFlow,
},
},
}
2 changes: 1 addition & 1 deletion packages/server/api/src/app/helper/engine-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ export type EngineHelperResponse<Result extends EngineHelperResult> = {
standardOutput: string
}

const generateWorkerToken = ({
export const generateWorkerToken = ({
projectId,
}: GenerateWorkerTokenParams): Promise<string> => {
return accessTokenManager.generateToken({
Expand Down
29 changes: 12 additions & 17 deletions packages/server/api/src/app/workers/flow-worker/flow-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import {
flowRunService,
HookType,
} from '../../flows/flow-run/flow-run-service'
import { flowVersionService } from '../../flows/flow-version/flow-version.service'
import { engineHelper } from '../../helper/engine-helper'
import { engineHelper, generateWorkerToken } from '../../helper/engine-helper'
import { getPiecePackage } from '../../pieces/piece-metadata-service'
import { EngineHttpResponse, engineResponseWatcher } from './engine-response-watcher'
import { flowWorkerHooks } from './flow-worker-hooks'
import { OneTimeJobData } from './job-data'
import { exceptionHandler, logger } from '@activepieces/server-shared'
import { Action, ActionType,
import {
Action, ActionType,
ActivepiecesError,
assertNotNullOrUndefined,
BeginExecuteFlowOperation,
Expand Down Expand Up @@ -40,7 +40,7 @@ import { Action, ActionType,
Trigger,
TriggerType,
} from '@activepieces/shared'
import { logSerializer, Sandbox, SandBoxCacheType, sandboxProvisioner } from 'server-worker'
import { logSerializer, Sandbox, SandBoxCacheType, sandboxProvisioner, serverApiService } from 'server-worker'

type FinishExecutionParams = {
flowRunId: FlowRunId
Expand Down Expand Up @@ -183,36 +183,31 @@ async function executeFlow(jobData: OneTimeJobData): Promise<void> {
)

const startTime = Date.now()

const flowVersionWithLockedPieces = await flowVersionService.getOne(
jobData.flowVersionId,
)

if (isNil(flowVersionWithLockedPieces)) {
const workerToken = await generateWorkerToken({
projectId: jobData.projectId,
})
const serverApi = serverApiService(workerToken)
const flow = await serverApi.getFlowWithExactPieces(jobData.flowVersionId)
if (isNil(flow)) {
logger.info({
message: 'Flow version not found, skipping execution',
flowVersionId: jobData.flowVersionId,
})
return
}
const flowVersion = await flowVersionService.lockPieceVersions({
projectId: jobData.projectId,
flowVersion: flowVersionWithLockedPieces,
})

await flowWorkerHooks
.getHooks()
.preExecute({ projectId: jobData.projectId, runId: jobData.runId })

try {
const { input, logFileId } = await loadInputAndLogFileId({
flowVersion,
flowVersion: flow.version,
jobData,
})

const sandbox = await getSandbox({
projectId: jobData.projectId,
flowVersion,
flowVersion: flow.version,
runEnvironment: jobData.environment,
})

Expand Down
194 changes: 194 additions & 0 deletions packages/server/api/test/integration/ce/flows/flow-consume.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
import { FastifyInstance } from 'fastify'
import { setupApp } from '../../../../src/app/app'
import { databaseConnection } from '../../../../src/app/database/database-connection'
import { flowWorker } from '../../../../src/app/workers/flow-worker/flow-worker'
import {
createMockFlow,
createMockFlowRun,
createMockFlowVersion,
createMockPlatform,
createMockProject,
createMockUser,
} from '../../../helpers/mocks'
import { fileCompressor } from '@activepieces/server-shared'
import {
ActionType,
ExecutionType,
FlowRunStatus,
FlowStatus,
FlowVersionState,
PackageType,
PieceType,
RunEnvironment,
TriggerType,
} from '@activepieces/shared'

let app: FastifyInstance | null = null

beforeAll(async () => {
await databaseConnection.initialize()
app = await setupApp()
await app.listen({
host: '0.0.0.0',
port: 3000,
})
})

afterAll(async () => {
await databaseConnection.destroy()
await app?.close()
})

describe('flow execution', () => {
it('should execute simple flow with code and data mapper', async () => {
const mockUser = createMockUser()
await databaseConnection.getRepository('user').save([mockUser])

const mockPlatform = createMockPlatform({ ownerId: mockUser.id })
await databaseConnection.getRepository('platform').save([mockPlatform])

const mockProject = createMockProject({ ownerId: mockUser.id, platformId: mockPlatform.id })
await databaseConnection.getRepository('project').save([mockProject])

const mockFlow = createMockFlow({
projectId: mockProject.id,
status: FlowStatus.ENABLED,
})
await databaseConnection.getRepository('flow').save([mockFlow])

const mockFlowVersion = createMockFlowVersion({
flowId: mockFlow.id,
updatedBy: mockUser.id,
state: FlowVersionState.LOCKED,
trigger: {
type: TriggerType.PIECE,
settings: {
pieceName: '@activepieces/piece-schedule',
pieceVersion: '0.1.0',
input: {
run_on_weekends: false,
},
triggerName: 'everyHourTrigger',
'pieceType': PieceType.OFFICIAL,
'packageType': PackageType.REGISTRY,
inputUiInfo: {},
},
valid: true,
name: 'webhook',
displayName: 'Webhook',
nextAction: {
name: 'echo_step',
displayName: 'Echo Step',
type: ActionType.CODE,
settings: {
inputUiInfo: {},
input: {
key: '{{ 1 + 2 }}',
},
sourceCode: {
packageJson: '{}',
code: `
export const code = async (inputs) => {
return inputs;
};
`,
},
},
nextAction: {
name: 'datamapper',
displayName: 'Datamapper',
type: ActionType.PIECE,
settings: {
inputUiInfo: {},
pieceName: '@activepieces/piece-data-mapper',
pieceVersion: '0.3.0',
packageType: 'REGISTRY',
pieceType: 'OFFICIAL',
actionName: 'advanced_mapping',
input: {
mapping: {
key: '{{ 1 + 2 }}',
},
},
},
valid: true,
},
valid: true,
},
},
})
await databaseConnection
.getRepository('flow_version')
.save([mockFlowVersion])

const mockFlowRun = createMockFlowRun({
flowVersionId: mockFlowVersion.id,
projectId: mockProject.id,
flowId: mockFlow.id,
status: FlowRunStatus.RUNNING,
})
await databaseConnection.getRepository('flow_run').save([mockFlowRun])

await flowWorker.executeFlow({
flowVersionId: mockFlowVersion.id,
projectId: mockProject.id,
environment: RunEnvironment.PRODUCTION,
runId: mockFlowRun.id,
payload: {},
executionType: ExecutionType.BEGIN,
})

const flowRun = await databaseConnection
.getRepository('flow_run')
.findOneByOrFail({
id: mockFlowRun.id,
})
expect(flowRun.status).toEqual(FlowRunStatus.SUCCEEDED)

const file = await databaseConnection
.getRepository('file')
.findOneByOrFail({
id: flowRun.logsFileId,
})
const decompressedData = await fileCompressor.decompress({
data: file.data,
compression: file.compression,
})
expect(
JSON.parse(decompressedData.toString('utf-8')).executionState,
).toEqual({
steps: {
webhook: {
type: 'PIECE_TRIGGER',
status: 'SUCCEEDED',
input: {},
output: {},
},
echo_step: {
type: 'CODE',
status: 'SUCCEEDED',
input: {
key: 3,
},
output: {
key: 3,
},
duration: expect.any(Number),
},
datamapper: {
type: 'PIECE',
status: 'SUCCEEDED',
input: {
mapping: {
key: 3,
},
},
output: {
key: 3,
},
duration: expect.any(Number),
},
},
})
}, 60000)
})
Loading
Loading