Skip to content

Commit

Permalink
feat: appflow stage refactor (#230)
Browse files Browse the repository at this point in the history
* updating startflow

* iam constriction

* removing unneeded output path
  • Loading branch information
malachi-constant committed Jan 24, 2023
1 parent f8675f5 commit 1c84169
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 33 deletions.
17 changes: 14 additions & 3 deletions API.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 26 additions & 25 deletions src/stages/appflow-ingestion.ts
@@ -1,4 +1,5 @@
import * as path from "path";
import * as cdk from "aws-cdk-lib";
import { Duration } from "aws-cdk-lib";
import * as appflow from "aws-cdk-lib/aws-appflow";
import * as events from "aws-cdk-lib/aws-events";
Expand All @@ -21,12 +22,18 @@ export class AppFlowIngestionStage extends StateMachineStage {
readonly targets?: events.IRuleTarget[];
readonly eventPattern?: events.EventPattern;
readonly stateMachine: sfn.StateMachine;
readonly flowObject: sfn.CustomState;
readonly flowObject: tasks.CallAwsService;
readonly flowName: string;

constructor(scope: Construct, id: string, props: AppFlowIngestionStageProps) {
super(scope, id, props);

const { flowName, flowExecutionStatusCheckPeriod, destinationFlowConfig, sourceFlowConfig, flowTasks } = props;
const flowExecutionRecords = this.createCheckFlowExecutionTask();
const flowObjectExecutionStatus = new sfn.Choice(this, "Check Flow Execution Status");
const flowObjectExecutionStatusWait = new sfn.Wait(this, "Wait Before Checking Flow Status", {
time: sfn.WaitTime.duration(flowExecutionStatusCheckPeriod ?? Duration.seconds(15)),
});

if (!flowName) {
// Check required props for CfnFlow create and except if not provided
Expand All @@ -44,18 +51,11 @@ export class AppFlowIngestionStage extends StateMachineStage {
triggerType: "OnDemand",
},
});
this.flowObject = this.createStartFlowCustomTask(flow.flowName);
this.flowName = flow.flowName;
} else {
this.flowObject = this.createStartFlowCustomTask(flowName);
this.flowName = flowName;
}

// Create check flow execution status step function task
const flowExecutionRecords = this.createCheckFlowExecutionTask();
// Create step function loop to check flow execution status
const flowObjectExecutionStatus = new sfn.Choice(this, "check-flow-execution-status");
const flowObjectExecutionStatusWait = new sfn.Wait(this, "wait-before-checking-flow-status-again", {
time: sfn.WaitTime.duration(flowExecutionStatusCheckPeriod ?? Duration.seconds(15)),
});
this.flowObject = this.createStartFlowCustomTask(this.flowName, flowObjectExecutionStatusWait);

const definition = this.flowObject
.next(flowObjectExecutionStatusWait)
Expand Down Expand Up @@ -87,22 +87,23 @@ export class AppFlowIngestionStage extends StateMachineStage {
);
}

private createStartFlowCustomTask(flowName: string): sfn.CustomState {
const stateJson = {
Type: "Task",
Resource: "arn:aws:states:::aws-sdk:appflow:startFlow",
Parameters: {
private createStartFlowCustomTask(flowName: string, waitHandler?: sfn.Wait): tasks.CallAwsService {
const stack = cdk.Stack.of(this);
const task = new tasks.CallAwsService(this, "Start Flow Execution", {
service: "appflow",
action: "startFlow",
iamResources: [`arn:${stack.partition}:appflow:${stack.region}:${stack.account}:flow/${flowName}`],
parameters: {
FlowName: flowName,
},
Catch: [
{
ErrorEquals: ["Appflow.ConflictException"],
Next: "wait-before-checking-flow-status-again",
},
],
};
});
if (waitHandler) {
task.addCatch(waitHandler, {
errors: ["Appflow.ConflictException"],
});
}

return new sfn.CustomState(this, "start-flow-execution", { stateJson });
return task;
}

private createCheckFlowExecutionTask(): tasks.LambdaInvoke {
Expand All @@ -128,7 +129,7 @@ export class AppFlowIngestionStage extends StateMachineStage {
);

// Create check flow execution status step function task
return new tasks.LambdaInvoke(this, "get-flow-execution-status", {
return new tasks.LambdaInvoke(this, "Get Flow Execution Status", {
lambdaFunction: statusLambda,
resultSelector: { "FlowExecutionStatus.$": "$.Payload" },
});
Expand Down
8 changes: 3 additions & 5 deletions test/appflow-ingestion-stage.test.ts
Expand Up @@ -17,8 +17,8 @@ test("AppFlow Ingestion stage creates State Machine, Lambda & Alarm", () => {
"Fn::Join": [
"",
Match.arrayWith([
Match.stringLikeRegexp("start-flow-execution"),
Match.stringLikeRegexp("check-flow-execution-status"),
Match.stringLikeRegexp("Start Flow Execution"),
Match.stringLikeRegexp("Check Flow Execution Status"),
]),
],
},
Expand Down Expand Up @@ -76,9 +76,7 @@ test("AppFlow Ingestion stage creates flow", () => {
DefinitionString: {
"Fn::Join": [
"",
Match.arrayWith([
Match.stringLikeRegexp('"wait-before-checking-flow-status-again":{"Type":"Wait","Seconds":10'),
]),
Match.arrayWith([Match.stringLikeRegexp('"Wait Before Checking Flow Status":{"Type":"Wait","Seconds":10')]),
],
},
});
Expand Down

0 comments on commit 1c84169

Please sign in to comment.