Skip to content

Commit

Permalink
Merge pull request #3803 from activepieces/fix/multiple-delays
Browse files Browse the repository at this point in the history
fix: multiple delays should work
  • Loading branch information
abuaboud committed Feb 4, 2024
2 parents a1131a1 + c950f57 commit a2a80db
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 29 deletions.
5 changes: 0 additions & 5 deletions packages/engine/src/lib/handler/context/engine-constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ export class EngineConstants {
public readonly flowRunId: string,
public readonly serverUrl: string,
public readonly retryConstants: RetryConstants,
public readonly executionType: ExecutionType,
public readonly workerToken: string,
public readonly projectId: ProjectId,
public readonly variableService: VariableService,
Expand All @@ -55,7 +54,6 @@ export class EngineConstants {
input.flowRunId,
input.serverUrl,
DEFAULT_RETRY_CONSTANTS,
input.executionType,
input.workerToken,
input.projectId,
new VariableService({
Expand All @@ -74,7 +72,6 @@ export class EngineConstants {
'test-run',
input.serverUrl,
DEFAULT_RETRY_CONSTANTS,
ExecutionType.BEGIN,
input.workerToken,
input.projectId,
new VariableService({
Expand All @@ -92,7 +89,6 @@ export class EngineConstants {
'execute-property',
input.serverUrl,
DEFAULT_RETRY_CONSTANTS,
ExecutionType.BEGIN,
input.workerToken,
input.projectId,
new VariableService({
Expand All @@ -110,7 +106,6 @@ export class EngineConstants {
'execute-trigger',
input.serverUrl,
DEFAULT_RETRY_CONSTANTS,
ExecutionType.BEGIN,
input.workerToken,
input.projectId,
new VariableService({
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ActionType, ExecutionOutput, ExecutionOutputStatus, PauseMetadata, StepOutput, StepOutputStatus, StopResponse, isNil } from '@activepieces/shared'
import { ActionType, ExecutionOutput, ExecutionOutputStatus, LoopStepOutput, PauseMetadata, StepOutput, StepOutputStatus, StopResponse, assertEqual, isNil } from '@activepieces/shared'
import { StepExecutionPath } from './step-execution-path'
import { loggingUtils } from '../../helper/logging-utils'

Expand Down Expand Up @@ -43,6 +43,16 @@ export class FlowExecutorContext {
return new FlowExecutorContext()
}

public getLoopStepOutput({ stepName }: { stepName: string }): LoopStepOutput | undefined {
const stateAtPath = getStateAtPath({ currentPath: this.currentPath, steps: this.steps })
const stepOutput = stateAtPath[stepName]
if (isNil(stepOutput)) {
return undefined
}
assertEqual(stepOutput.type, ActionType.LOOP_ON_ITEMS, 'stepout', 'LoopStepOutput')
return stepOutput as LoopStepOutput
}

public isCompleted({ stepName }: { stepName: string }): boolean {
const stateAtPath = getStateAtPath({ currentPath: this.currentPath, steps: this.steps })
const stepOutput = stateAtPath[stepName]
Expand All @@ -52,6 +62,15 @@ export class FlowExecutorContext {
return stepOutput.status !== StepOutputStatus.PAUSED
}

public isPaused({ stepName }: { stepName: string }): boolean {
const stateAtPath = getStateAtPath({ currentPath: this.currentPath, steps: this.steps })
const stepOutput = stateAtPath[stepName]
if (isNil(stepOutput)) {
return false
}
return stepOutput.status === StepOutputStatus.PAUSED
}

public setDuration(duration: number): FlowExecutorContext {
return new FlowExecutorContext({
...this,
Expand Down
14 changes: 9 additions & 5 deletions packages/engine/src/lib/handler/loop-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,24 @@ export const loopExecutor: BaseExecutor<LoopOnItemsAction> = {
},
executionState,
})

let stepOutput = LoopStepOutput.init({
const previousStepOutput = executionState.getLoopStepOutput({ stepName: action.name })
let stepOutput = previousStepOutput ?? LoopStepOutput.init({
input: censoredInput,
})

let newExecutionContext = executionState.upsertStep(action.name, stepOutput)
const firstLoopAction = action.firstLoopAction


for (let i = 0; i < resolvedInput.items.length; ++i) {
const newCurrentPath = newExecutionContext.currentPath.loopIteration({ loopName: action.name, iteration: i })
stepOutput = stepOutput.addIteration({ index: i + 1, item: resolvedInput.items[i] })

newExecutionContext = newExecutionContext.upsertStep(action.name, stepOutput).setCurrentPath(newCurrentPath)
stepOutput = stepOutput.setItemAndIndex({ item: resolvedInput.items[i], index: i + 1 })
const addEmptyIteration = !stepOutput.hasIteration(i)
if (addEmptyIteration) {
stepOutput = stepOutput.addIteration()
newExecutionContext = newExecutionContext.upsertStep(action.name, stepOutput)
}
newExecutionContext = newExecutionContext.setCurrentPath(newCurrentPath)
if (!isNil(firstLoopAction) && !constants.testSingleStepMode) {
newExecutionContext = await flowExecutor.execute({
action: firstLoopAction,
Expand Down
11 changes: 5 additions & 6 deletions packages/engine/src/lib/handler/piece-executor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { AUTHENTICATION_PROPERTY_NAME, GenericStepOutput, ActionType, ExecutionOutputStatus, PieceAction, StepOutputStatus, assertNotNullOrUndefined, isNil } from '@activepieces/shared'
import { AUTHENTICATION_PROPERTY_NAME, GenericStepOutput, ActionType, ExecutionOutputStatus, PieceAction, StepOutputStatus, assertNotNullOrUndefined, isNil, ExecutionType } from '@activepieces/shared'
import { ActionHandler, BaseExecutor } from './base-executor'
import { ExecutionVerdict, FlowExecutorContext } from './context/flow-execution-context'
import { variableService } from '../services/variable-service'
Expand Down Expand Up @@ -69,9 +69,10 @@ const executeAction: ActionHandler<PieceAction> = async ({ action, executionStat
paused: false,
tags: [],
}

const isPaused = executionState.isPaused({ stepName: action.name })
const context: ActionContext = {
executionType: constants.executionType,
executionType: isPaused ? ExecutionType.RESUME : ExecutionType.BEGIN,
resumePayload: constants.resumePayload,
store: createContextStore({
prefix: '',
flowId: constants.flowId,
Expand Down Expand Up @@ -102,7 +103,6 @@ const executeAction: ActionHandler<PieceAction> = async ({ action, executionStat
stop: createStopHook(hookResponse),
pause: createPauseHook(hookResponse),
},
resumePayload: constants.resumePayload,
project: {
id: constants.projectId,
externalId: constants.externalProjectId,
Expand All @@ -121,8 +121,7 @@ const executeAction: ActionHandler<PieceAction> = async ({ action, executionStat
}
if (hookResponse.paused) {
assertNotNullOrUndefined(hookResponse.pauseResponse, 'pauseResponse')
return newExecutionContext.upsertStep(action.name, stepOutput.setOutput(output)
.setStatus(StepOutputStatus.PAUSED))
return newExecutionContext.upsertStep(action.name, stepOutput.setOutput(output).setStatus(StepOutputStatus.PAUSED))
.setVerdict(ExecutionVerdict.PAUSED, {
reason: ExecutionOutputStatus.PAUSED,
pauseMetadata: hookResponse.pauseResponse.pauseMetadata,
Expand Down
2 changes: 1 addition & 1 deletion packages/engine/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ function getFlowExecutionState(input: ExecuteFlowOperation): FlowExecutorContext
case ExecutionType.RESUME: {
let flowContext = FlowExecutorContext.empty().increaseTask(input.tasks)
for (const [step, output] of Object.entries(input.executionState.steps)) {
if (output.status === StepOutputStatus.SUCCEEDED) {
if ([StepOutputStatus.SUCCEEDED, StepOutputStatus.PAUSED].includes(output.status)) {
flowContext = flowContext.upsertStep(step, output)
}
}
Expand Down
5 changes: 1 addition & 4 deletions packages/engine/test/handler/flow-rerun.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { flowExecutor } from '../../src/lib/handler/flow-executor'
import { ExecutionVerdict, FlowExecutorContext } from '../../src/lib/handler/context/flow-execution-context'
import { buildPieceAction, generateMockEngineConstants } from './test-helper'
import { ExecutionType } from '@activepieces/shared'

const failedHttpAction = buildPieceAction({
name: 'send_http',
Expand Down Expand Up @@ -48,9 +47,7 @@ describe('flow retry', () => {
})

const retryFromFailed = await flowExecutor.execute({
action: successHttpAction, executionState: context, constants: generateMockEngineConstants({
executionType: ExecutionType.RESUME,
}),
action: successHttpAction, executionState: context, constants: generateMockEngineConstants({}),
})
expect(failedResult.verdict).toBe(ExecutionVerdict.FAILED)
expect(retryFromFailed.verdict).toBe(ExecutionVerdict.RUNNING)
Expand Down
64 changes: 61 additions & 3 deletions packages/engine/test/handler/flow-with-pause.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { BranchOperator, ExecutionOutputStatus, ExecutionType, LoopStepOutput } from '@activepieces/shared'
import { BranchOperator, ExecutionOutputStatus, LoopStepOutput } from '@activepieces/shared'
import { ExecutionVerdict, FlowExecutorContext } from '../../src/lib/handler/context/flow-execution-context'
import { flowExecutor } from '../../src/lib/handler/flow-executor'
import { buildActionWithOneCondition, buildCodeAction, buildPieceAction, buildSimpleLoopAction, generateMockEngineConstants } from './test-helper'
Expand All @@ -16,6 +16,29 @@ const simplePauseFlow = buildPieceAction({
}),
})

const flawWithTwoPause = buildPieceAction({
name: 'approval',
pieceName: '@activepieces/piece-approval',
actionName: 'wait_for_approval',
input: {},
nextAction: buildCodeAction({
name: 'echo_step',
input: {},
nextAction: buildPieceAction({
name: 'approval-1',
pieceName: '@activepieces/piece-approval',
actionName: 'wait_for_approval',
input: {},
nextAction: buildCodeAction({
name: 'echo_step_1',
input: {},
}),
}),

}),
})


const pauseFlowWithLoopAndBranch = buildSimpleLoopAction({
name: 'loop',
loopItems: '{{ [1] }}',
Expand Down Expand Up @@ -54,7 +77,6 @@ describe('flow with pause', () => {
resumePayload: {
action: 'approve',
},
executionType: ExecutionType.RESUME,
}),
})
expect(resumeResult.verdict).toBe(ExecutionVerdict.RUNNING)
Expand All @@ -63,6 +85,43 @@ describe('flow with pause', () => {
expect(Object.keys(loopOut.output?.iterations[0] ?? {})).toEqual(['branch', 'approval', 'echo_step'])
})

it('should pause and resume with two different steps in same flow successfully', async () => {
const pauseResult1 = await flowExecutor.execute({
action: flawWithTwoPause,
executionState: FlowExecutorContext.empty(),
constants: generateMockEngineConstants(),
})
const resumeResult1 = await flowExecutor.execute({
action: flawWithTwoPause,
executionState: pauseResult1,
constants: generateMockEngineConstants({
resumePayload: {
action: 'approve',
},
}),
})
expect(resumeResult1.verdict).toBe(ExecutionVerdict.PAUSED)
expect(resumeResult1.verdictResponse).toEqual({
'pauseMetadata': {
'actions': ['approve', 'disapprove'],
'type': 'WEBHOOK',
},
'reason': ExecutionOutputStatus.PAUSED,
})
const resumeResult2 = await flowExecutor.execute({
action: flawWithTwoPause,
executionState: resumeResult1.setVerdict(ExecutionVerdict.RUNNING, undefined),
constants: generateMockEngineConstants({
resumePayload: {
action: 'approve',
},
}),
})
expect(resumeResult2.verdict).toBe(ExecutionVerdict.RUNNING)

})


it('should pause and resume successfully', async () => {
const pauseResult = await flowExecutor.execute({
action: simplePauseFlow,
Expand All @@ -86,7 +145,6 @@ describe('flow with pause', () => {
resumePayload: {
action: 'approve',
},
executionType: ExecutionType.RESUME,
}),
})
expect(resumeResult.verdict).toBe(ExecutionVerdict.RUNNING)
Expand Down
3 changes: 1 addition & 2 deletions packages/engine/test/handler/test-helper.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Action, ActionErrorHandlingOptions, ActionType, BranchAction, BranchCondition, CodeAction, ExecutionType, LoopOnItemsAction, PackageType, PieceAction, PieceType } from '@activepieces/shared'
import { Action, ActionErrorHandlingOptions, ActionType, BranchAction, BranchCondition, CodeAction, LoopOnItemsAction, PackageType, PieceAction, PieceType } from '@activepieces/shared'
import { VariableService } from '../../src/lib/services/variable-service'
import { EngineConstants } from '../../src/lib/handler/context/engine-constants'

Expand All @@ -12,7 +12,6 @@ export const generateMockEngineConstants = (params?: Partial<EngineConstants>):
retryExponential: 1,
retryInterval: 1,
},
params?.executionType ?? ExecutionType.BEGIN,
params?.workerToken ?? 'workerToken',
params?.projectId ?? 'projectId',
params?.variableService ?? new VariableService({
Expand Down
2 changes: 1 addition & 1 deletion packages/shared/package.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "@activepieces/shared",
"version": "0.10.68",
"version": "0.10.69",
"type": "commonjs"
}
18 changes: 17 additions & 1 deletion packages/shared/src/lib/flow-run/execution/step-output.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { TriggerType } from '../../flows/triggers/trigger'
import { ActionType } from '../../flows/actions/action'
import { isNil } from '../../common'

export enum StepOutputStatus {
FAILED = 'FAILED',
Expand Down Expand Up @@ -120,12 +121,27 @@ export class LoopStepOutput extends GenericStepOutput<ActionType.LOOP_ON_ITEMS,
})
}

addIteration({ item, index }: { item: unknown, index: number }): LoopStepOutput {
hasIteration(iteration: number): boolean {
return !isNil(this.output?.iterations[iteration])
}

setItemAndIndex({ item, index }: { item: unknown, index: number }): LoopStepOutput {
return new LoopStepOutput({
...this,
output: {
item,
index,
iterations: this.output?.iterations ?? [],
},
})
}

addIteration(): LoopStepOutput {
return new LoopStepOutput({
...this,
output: {
item: this.output?.item,
index: this.output?.index,
iterations: [...(this.output?.iterations ?? []), {}],
},
})
Expand Down

0 comments on commit a2a80db

Please sign in to comment.