From 1c84169efa70153214b538df44fab4c2e731da72 Mon Sep 17 00:00:00 2001 From: Lucas Hanson Date: Tue, 24 Jan 2023 13:34:50 -0800 Subject: [PATCH] feat: appflow stage refactor (#230) * updating startflow * iam constriction * removing unneeded output path --- API.md | 17 ++++++++-- src/stages/appflow-ingestion.ts | 51 ++++++++++++++-------------- test/appflow-ingestion-stage.test.ts | 8 ++--- 3 files changed, 43 insertions(+), 33 deletions(-) diff --git a/API.md b/API.md index 6ee26449..9be60453 100644 --- a/API.md +++ b/API.md @@ -111,7 +111,8 @@ Any object. | alarmsEnabled | boolean | *No description.* | | cloudwatchAlarms | aws-cdk-lib.aws_cloudwatch.Alarm[] | *No description.* | | stateMachine | aws-cdk-lib.aws_stepfunctions.StateMachine | *No description.* | -| flowObject | aws-cdk-lib.aws_stepfunctions.CustomState | *No description.* | +| flowName | string | *No description.* | +| flowObject | aws-cdk-lib.aws_stepfunctions_tasks.CallAwsService | *No description.* | --- @@ -197,13 +198,23 @@ public readonly stateMachine: StateMachine; --- +##### `flowName`Required + +```typescript +public readonly flowName: string; +``` + +- *Type:* string + +--- + ##### `flowObject`Required ```typescript -public readonly flowObject: CustomState; +public readonly flowObject: CallAwsService; ``` -- *Type:* aws-cdk-lib.aws_stepfunctions.CustomState +- *Type:* aws-cdk-lib.aws_stepfunctions_tasks.CallAwsService --- diff --git a/src/stages/appflow-ingestion.ts b/src/stages/appflow-ingestion.ts index 623145c5..8d96aa52 100644 --- a/src/stages/appflow-ingestion.ts +++ b/src/stages/appflow-ingestion.ts @@ -1,4 +1,5 @@ import * as path from "path"; +import * as cdk from "aws-cdk-lib"; import { Duration } from "aws-cdk-lib"; import * as appflow from "aws-cdk-lib/aws-appflow"; import * as events from "aws-cdk-lib/aws-events"; @@ -21,12 +22,18 @@ export class AppFlowIngestionStage extends StateMachineStage { readonly targets?: events.IRuleTarget[]; readonly eventPattern?: events.EventPattern; readonly stateMachine: sfn.StateMachine; - readonly flowObject: sfn.CustomState; + readonly flowObject: tasks.CallAwsService; + readonly flowName: string; constructor(scope: Construct, id: string, props: AppFlowIngestionStageProps) { super(scope, id, props); const { flowName, flowExecutionStatusCheckPeriod, destinationFlowConfig, sourceFlowConfig, flowTasks } = props; + const flowExecutionRecords = this.createCheckFlowExecutionTask(); + const flowObjectExecutionStatus = new sfn.Choice(this, "Check Flow Execution Status"); + const flowObjectExecutionStatusWait = new sfn.Wait(this, "Wait Before Checking Flow Status", { + time: sfn.WaitTime.duration(flowExecutionStatusCheckPeriod ?? Duration.seconds(15)), + }); if (!flowName) { // Check required props for CfnFlow create and except if not provided @@ -44,18 +51,11 @@ export class AppFlowIngestionStage extends StateMachineStage { triggerType: "OnDemand", }, }); - this.flowObject = this.createStartFlowCustomTask(flow.flowName); + this.flowName = flow.flowName; } else { - this.flowObject = this.createStartFlowCustomTask(flowName); + this.flowName = flowName; } - - // Create check flow execution status step function task - const flowExecutionRecords = this.createCheckFlowExecutionTask(); - // Create step function loop to check flow execution status - const flowObjectExecutionStatus = new sfn.Choice(this, "check-flow-execution-status"); - const flowObjectExecutionStatusWait = new sfn.Wait(this, "wait-before-checking-flow-status-again", { - time: sfn.WaitTime.duration(flowExecutionStatusCheckPeriod ?? Duration.seconds(15)), - }); + this.flowObject = this.createStartFlowCustomTask(this.flowName, flowObjectExecutionStatusWait); const definition = this.flowObject .next(flowObjectExecutionStatusWait) @@ -87,22 +87,23 @@ export class AppFlowIngestionStage extends StateMachineStage { ); } - private createStartFlowCustomTask(flowName: string): sfn.CustomState { - const stateJson = { - Type: "Task", - Resource: "arn:aws:states:::aws-sdk:appflow:startFlow", - Parameters: { + private createStartFlowCustomTask(flowName: string, waitHandler?: sfn.Wait): tasks.CallAwsService { + const stack = cdk.Stack.of(this); + const task = new tasks.CallAwsService(this, "Start Flow Execution", { + service: "appflow", + action: "startFlow", + iamResources: [`arn:${stack.partition}:appflow:${stack.region}:${stack.account}:flow/${flowName}`], + parameters: { FlowName: flowName, }, - Catch: [ - { - ErrorEquals: ["Appflow.ConflictException"], - Next: "wait-before-checking-flow-status-again", - }, - ], - }; + }); + if (waitHandler) { + task.addCatch(waitHandler, { + errors: ["Appflow.ConflictException"], + }); + } - return new sfn.CustomState(this, "start-flow-execution", { stateJson }); + return task; } private createCheckFlowExecutionTask(): tasks.LambdaInvoke { @@ -128,7 +129,7 @@ export class AppFlowIngestionStage extends StateMachineStage { ); // Create check flow execution status step function task - return new tasks.LambdaInvoke(this, "get-flow-execution-status", { + return new tasks.LambdaInvoke(this, "Get Flow Execution Status", { lambdaFunction: statusLambda, resultSelector: { "FlowExecutionStatus.$": "$.Payload" }, }); diff --git a/test/appflow-ingestion-stage.test.ts b/test/appflow-ingestion-stage.test.ts index 6ba9d520..5daaf5fd 100644 --- a/test/appflow-ingestion-stage.test.ts +++ b/test/appflow-ingestion-stage.test.ts @@ -17,8 +17,8 @@ test("AppFlow Ingestion stage creates State Machine, Lambda & Alarm", () => { "Fn::Join": [ "", Match.arrayWith([ - Match.stringLikeRegexp("start-flow-execution"), - Match.stringLikeRegexp("check-flow-execution-status"), + Match.stringLikeRegexp("Start Flow Execution"), + Match.stringLikeRegexp("Check Flow Execution Status"), ]), ], }, @@ -76,9 +76,7 @@ test("AppFlow Ingestion stage creates flow", () => { DefinitionString: { "Fn::Join": [ "", - Match.arrayWith([ - Match.stringLikeRegexp('"wait-before-checking-flow-status-again":{"Type":"Wait","Seconds":10'), - ]), + Match.arrayWith([Match.stringLikeRegexp('"Wait Before Checking Flow Status":{"Type":"Wait","Seconds":10')]), ], }, });