Skip to content

Commit fa48e89

Browse files
wqzowwmergify[bot]
authored andcommitted
refactor(stepfunctions-tasks): make integrationPattern an enum (#3115)
* refactor(aws-stepfunctions-tasks): implement service integration patterns for tasks Step Functions allows users to call different integrated services in different ways. They are also called service integration patterns, including Request Response, Run a Job and Wait for Callback. Users must choose exactly one of them and specify it in the "Resource" field. This commit introduces a new member variable, serviceIntegrationPattern, in the interface of properties within each existing integrated service. This helps to avoid using multiple boolean variables in the service such as ECS, which supports different service integration patterns. It is also beneficial for code maintenances: if Step Functions adds new integrated services or updates the existing integration patterns in the future, keeping pace with these changes will be simply updating this variable of enum type. BREAKING CHANGE: To define a callback task, users should specify "serviceIntegrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN" instead of "waitForTaskToken: true". For a sync task, users should use "serviceIntegrationPattern: sfn.ServiceIntegrationPattern.SYNC" in the place of "synchronous: true". In addition, this commit enables users to define callback task with ECS. **@aws-cdk/aws-stepfunctions-tasks** Closes #3114 * serviceIntegrationPattern -> integrationPattern
1 parent c95eab6 commit fa48e89

20 files changed

+300
-60
lines changed

packages/@aws-cdk/aws-stepfunctions-tasks/lib/publish-to-topic.ts

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import iam = require('@aws-cdk/aws-iam');
22
import sns = require('@aws-cdk/aws-sns');
33
import sfn = require('@aws-cdk/aws-stepfunctions');
4+
import { resourceArnSuffix } from './resource-arn-suffix';
45

56
/**
67
* Properties for PublishTask
@@ -29,11 +30,13 @@ export interface PublishToTopicProps {
2930
readonly subject?: string;
3031

3132
/**
32-
* Whether to pause the workflow until a task token is returned
33+
* The service integration pattern indicates different ways to call Publish to SNS.
3334
*
34-
* @default false
35+
* The valid value is either FIRE_AND_FORGET or WAIT_FOR_TASK_TOKEN.
36+
*
37+
* @default FIRE_AND_FORGET
3538
*/
36-
readonly waitForTaskToken?: boolean;
39+
readonly integrationPattern?: sfn.ServiceIntegrationPattern;
3740
}
3841

3942
/**
@@ -44,19 +47,30 @@ export interface PublishToTopicProps {
4447
*/
4548
export class PublishToTopic implements sfn.IStepFunctionsTask {
4649

47-
private readonly waitForTaskToken: boolean;
50+
private readonly integrationPattern: sfn.ServiceIntegrationPattern;
4851

4952
constructor(private readonly topic: sns.ITopic, private readonly props: PublishToTopicProps) {
50-
this.waitForTaskToken = props.waitForTaskToken === true;
53+
this.integrationPattern = props.integrationPattern || sfn.ServiceIntegrationPattern.FIRE_AND_FORGET;
54+
55+
const supportedPatterns = [
56+
sfn.ServiceIntegrationPattern.FIRE_AND_FORGET,
57+
sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN
58+
];
5159

52-
if (this.waitForTaskToken && !sfn.FieldUtils.containsTaskToken(props.message.value)) {
53-
throw new Error('Task Token is missing in message (pass Context.taskToken somewhere in message)');
60+
if (!supportedPatterns.includes(this.integrationPattern)) {
61+
throw new Error(`Invalid Service Integration Pattern: ${this.integrationPattern} is not supported to call SNS.`);
62+
}
63+
64+
if (this.integrationPattern === sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN) {
65+
if (!sfn.FieldUtils.containsTaskToken(props.message)) {
66+
throw new Error('Task Token is missing in message (pass Context.taskToken somewhere in message)');
67+
}
5468
}
5569
}
5670

5771
public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig {
5872
return {
59-
resourceArn: 'arn:aws:states:::sns:publish' + (this.waitForTaskToken ? '.waitForTaskToken' : ''),
73+
resourceArn: 'arn:aws:states:::sns:publish' + resourceArnSuffix.get(this.integrationPattern),
6074
policyStatements: [new iam.PolicyStatement({
6175
actions: ['sns:Publish'],
6276
resources: [this.topic.topicArn]
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import sfn = require('@aws-cdk/aws-stepfunctions');
2+
3+
/**
4+
* Suffixes corresponding to different service integration patterns
5+
*
6+
* Key is the service integration pattern, value is the resource ARN suffix.
7+
*
8+
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html
9+
*/
10+
const resourceArnSuffix = new Map<sfn.ServiceIntegrationPattern, string>();
11+
resourceArnSuffix.set(sfn.ServiceIntegrationPattern.FIRE_AND_FORGET, "");
12+
resourceArnSuffix.set(sfn.ServiceIntegrationPattern.SYNC, ".sync");
13+
resourceArnSuffix.set(sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN, ".waitForTaskToken");
14+
15+
export { resourceArnSuffix };

packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-ecs-task-base.ts

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import iam = require('@aws-cdk/aws-iam');
44
import sfn = require('@aws-cdk/aws-stepfunctions');
55
import cdk = require('@aws-cdk/core');
66
import { Stack } from '@aws-cdk/core';
7+
import { resourceArnSuffix } from './resource-arn-suffix';
78
import { ContainerOverride } from './run-ecs-task-base-types';
89

910
/**
@@ -32,11 +33,13 @@ export interface CommonEcsRunTaskProps {
3233
readonly containerOverrides?: ContainerOverride[];
3334

3435
/**
35-
* Whether to wait for the task to complete and return the response
36+
* The service integration pattern indicates different ways to call RunTask in ECS.
3637
*
37-
* @default true
38+
* The valid value for Lambda is FIRE_AND_FORGET, SYNC and WAIT_FOR_TASK_TOKEN.
39+
*
40+
* @default FIRE_AND_FORGET
3841
*/
39-
readonly synchronous?: boolean;
42+
readonly integrationPattern?: sfn.ServiceIntegrationPattern;
4043
}
4144

4245
/**
@@ -60,10 +63,25 @@ export class EcsRunTaskBase implements ec2.IConnectable, sfn.IStepFunctionsTask
6063

6164
private securityGroup?: ec2.ISecurityGroup;
6265
private networkConfiguration?: any;
63-
private readonly sync: boolean;
66+
private readonly integrationPattern: sfn.ServiceIntegrationPattern;
6467

6568
constructor(private readonly props: EcsRunTaskBaseProps) {
66-
this.sync = props.synchronous !== false;
69+
this.integrationPattern = props.integrationPattern || sfn.ServiceIntegrationPattern.FIRE_AND_FORGET;
70+
71+
const supportedPatterns = [
72+
sfn.ServiceIntegrationPattern.FIRE_AND_FORGET,
73+
sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN,
74+
sfn.ServiceIntegrationPattern.SYNC
75+
];
76+
77+
if (!supportedPatterns.includes(this.integrationPattern)) {
78+
throw new Error(`Invalid Service Integration Pattern: ${this.integrationPattern} is not supported to call ECS.`);
79+
}
80+
81+
if (this.integrationPattern === sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN
82+
&& !sfn.FieldUtils.containsTaskToken(props.containerOverrides)) {
83+
throw new Error('Task Token is missing in containerOverrides (pass Context.taskToken somewhere in containerOverrides)');
84+
}
6785

6886
for (const override of this.props.containerOverrides || []) {
6987
const name = override.containerName;
@@ -86,7 +104,7 @@ export class EcsRunTaskBase implements ec2.IConnectable, sfn.IStepFunctionsTask
86104
}
87105

88106
return {
89-
resourceArn: 'arn:aws:states:::ecs:runTask' + (this.sync ? '.sync' : ''),
107+
resourceArn: 'arn:aws:states:::ecs:runTask' + resourceArnSuffix.get(this.integrationPattern),
90108
parameters: {
91109
Cluster: this.props.cluster.clusterArn,
92110
TaskDefinition: this.props.taskDefinition.taskDefinitionArn,
@@ -139,7 +157,7 @@ export class EcsRunTaskBase implements ec2.IConnectable, sfn.IStepFunctionsTask
139157
}),
140158
];
141159

142-
if (this.sync) {
160+
if (this.integrationPattern === sfn.ServiceIntegrationPattern.SYNC) {
143161
policyStatements.push(new iam.PolicyStatement({
144162
actions: ["events:PutTargets", "events:PutRule", "events:DescribeRule"],
145163
resources: [stack.formatArn({

packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-lambda-task.ts

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import iam = require('@aws-cdk/aws-iam');
22
import lambda = require('@aws-cdk/aws-lambda');
33
import sfn = require('@aws-cdk/aws-stepfunctions');
4-
import { FieldUtils } from '../../aws-stepfunctions/lib/fields';
4+
import { resourceArnSuffix } from './resource-arn-suffix';
55

66
/**
77
* Properties for RunLambdaTask
@@ -13,15 +13,18 @@ export interface RunLambdaTaskProps {
1313
readonly payload?: { [key: string]: any };
1414

1515
/**
16-
* Whether to pause the workflow until a task token is returned
16+
* The service integration pattern indicates different ways to invoke Lambda function.
1717
*
18-
* If this is set to true, the Context.taskToken value must be included
18+
* The valid value for Lambda is either FIRE_AND_FORGET or WAIT_FOR_TASK_TOKEN,
19+
* it determines whether to pause the workflow until a task token is returned.
20+
*
21+
* If this is set to WAIT_FOR_TASK_TOKEN, the Context.taskToken value must be included
1922
* somewhere in the payload and the Lambda must call
2023
* `SendTaskSuccess/SendTaskFailure` using that token.
2124
*
22-
* @default false
25+
* @default FIRE_AND_FORGET
2326
*/
24-
readonly waitForTaskToken?: boolean;
27+
readonly integrationPattern?: sfn.ServiceIntegrationPattern;
2528

2629
/**
2730
* Invocation type of the Lambda function
@@ -55,18 +58,28 @@ export interface RunLambdaTaskProps {
5558
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-lambda.html
5659
*/
5760
export class RunLambdaTask implements sfn.IStepFunctionsTask {
58-
private readonly waitForTaskToken: boolean;
61+
private readonly integrationPattern: sfn.ServiceIntegrationPattern;
5962

6063
constructor(private readonly lambdaFunction: lambda.IFunction, private readonly props: RunLambdaTaskProps = {}) {
61-
this.waitForTaskToken = !!props.waitForTaskToken;
64+
this.integrationPattern = props.integrationPattern || sfn.ServiceIntegrationPattern.FIRE_AND_FORGET;
65+
66+
const supportedPatterns = [
67+
sfn.ServiceIntegrationPattern.FIRE_AND_FORGET,
68+
sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN
69+
];
70+
71+
if (!supportedPatterns.includes(this.integrationPattern)) {
72+
throw new Error(`Invalid Service Integration Pattern: ${this.integrationPattern} is not supported to call Lambda.`);
73+
}
6274

63-
if (this.waitForTaskToken && !FieldUtils.containsTaskToken(props.payload)) {
75+
if (this.integrationPattern === sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN
76+
&& !sfn.FieldUtils.containsTaskToken(props.payload)) {
6477
throw new Error('Task Token is missing in payload (pass Context.taskToken somewhere in payload)');
6578
}
6679
}
6780

6881
public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig {
69-
const resourceArn = 'arn:aws:states:::lambda:invoke' + (this.waitForTaskToken ? '.waitForTaskToken' : '');
82+
const resourceArn = 'arn:aws:states:::lambda:invoke' + resourceArnSuffix.get(this.integrationPattern);
7083

7184
return {
7285
resourceArn,
@@ -106,4 +119,9 @@ export enum InvocationType {
106119
* The API response only includes a status code.
107120
*/
108121
EVENT = 'Event',
122+
123+
/**
124+
* TValidate parameter values and verify that the user or role has permission to invoke the function.
125+
*/
126+
DRY_RUN = 'DryRun'
109127
}

packages/@aws-cdk/aws-stepfunctions-tasks/lib/sagemaker-train-task.ts

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import ec2 = require('@aws-cdk/aws-ec2');
22
import iam = require('@aws-cdk/aws-iam');
33
import sfn = require('@aws-cdk/aws-stepfunctions');
44
import { Construct, Duration, Stack } from '@aws-cdk/core';
5+
import { resourceArnSuffix } from './resource-arn-suffix';
56
import { AlgorithmSpecification, Channel, InputMode, OutputDataConfig, ResourceConfig,
67
S3DataType, StoppingCondition, VpcConfig, } from './sagemaker-task-base-types';
78

@@ -26,11 +27,13 @@ export interface SagemakerTrainTaskProps {
2627
readonly role?: iam.IRole;
2728

2829
/**
29-
* Specify if the task is synchronous or asychronous.
30+
* The service integration pattern indicates different ways to call SageMaker APIs.
3031
*
31-
* @default false
32+
* The valid value is either FIRE_AND_FORGET or SYNC.
33+
*
34+
* @default FIRE_AND_FORGET
3235
*/
33-
readonly synchronous?: boolean;
36+
readonly integrationPattern?: sfn.ServiceIntegrationPattern;
3437

3538
/**
3639
* Identifies the training algorithm to use.
@@ -114,7 +117,19 @@ export class SagemakerTrainTask implements iam.IGrantable, ec2.IConnectable, sfn
114117
*/
115118
private readonly stoppingCondition: StoppingCondition;
116119

120+
private readonly integrationPattern: sfn.ServiceIntegrationPattern;
121+
117122
constructor(scope: Construct, private readonly props: SagemakerTrainTaskProps) {
123+
this.integrationPattern = props.integrationPattern || sfn.ServiceIntegrationPattern.FIRE_AND_FORGET;
124+
125+
const supportedPatterns = [
126+
sfn.ServiceIntegrationPattern.FIRE_AND_FORGET,
127+
sfn.ServiceIntegrationPattern.SYNC
128+
];
129+
130+
if (!supportedPatterns.includes(this.integrationPattern)) {
131+
throw new Error(`Invalid Service Integration Pattern: ${this.integrationPattern} is not supported to call SageMaker.`);
132+
}
118133

119134
// set the default resource config if not defined.
120135
this.resourceConfig = props.resourceConfig || {
@@ -194,7 +209,7 @@ export class SagemakerTrainTask implements iam.IGrantable, ec2.IConnectable, sfn
194209

195210
public bind(task: sfn.Task): sfn.StepFunctionsTaskConfig {
196211
return {
197-
resourceArn: 'arn:aws:states:::sagemaker:createTrainingJob' + (this.props.synchronous ? '.sync' : ''),
212+
resourceArn: 'arn:aws:states:::sagemaker:createTrainingJob' + resourceArnSuffix.get(this.integrationPattern),
198213
parameters: this.renderParameters(),
199214
policyStatements: this.makePolicyStatements(task),
200215
};
@@ -322,7 +337,7 @@ export class SagemakerTrainTask implements iam.IGrantable, ec2.IConnectable, sfn
322337
})
323338
];
324339

325-
if (this.props.synchronous) {
340+
if (this.integrationPattern === sfn.ServiceIntegrationPattern.SYNC) {
326341
policyStatements.push(new iam.PolicyStatement({
327342
actions: ["events:PutTargets", "events:PutRule", "events:DescribeRule"],
328343
resources: [stack.formatArn({

packages/@aws-cdk/aws-stepfunctions-tasks/lib/sagemaker-transform-task.ts

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import ec2 = require('@aws-cdk/aws-ec2');
22
import iam = require('@aws-cdk/aws-iam');
33
import sfn = require('@aws-cdk/aws-stepfunctions');
44
import { Construct, Stack } from '@aws-cdk/core';
5+
import { resourceArnSuffix } from './resource-arn-suffix';
56
import { BatchStrategy, S3DataType, TransformInput, TransformOutput, TransformResources } from './sagemaker-task-base-types';
67

78
/**
@@ -20,9 +21,13 @@ export interface SagemakerTransformProps {
2021
readonly role?: iam.IRole;
2122

2223
/**
23-
* Specify if the task is synchronous or asychronous.
24+
* The service integration pattern indicates different ways to call SageMaker APIs.
25+
*
26+
* The valid value is either FIRE_AND_FORGET or SYNC.
27+
*
28+
* @default FIRE_AND_FORGET
2429
*/
25-
readonly synchronous?: boolean;
30+
readonly integrationPattern?: sfn.ServiceIntegrationPattern;
2631

2732
/**
2833
* Number of records to include in a mini-batch for an HTTP inference request.
@@ -94,7 +99,19 @@ export class SagemakerTransformTask implements sfn.IStepFunctionsTask {
9499
*/
95100
private readonly transformResources: TransformResources;
96101

102+
private readonly integrationPattern: sfn.ServiceIntegrationPattern;
103+
97104
constructor(scope: Construct, private readonly props: SagemakerTransformProps) {
105+
this.integrationPattern = props.integrationPattern || sfn.ServiceIntegrationPattern.FIRE_AND_FORGET;
106+
107+
const supportedPatterns = [
108+
sfn.ServiceIntegrationPattern.FIRE_AND_FORGET,
109+
sfn.ServiceIntegrationPattern.SYNC
110+
];
111+
112+
if (!supportedPatterns.includes(this.integrationPattern)) {
113+
throw new Error(`Invalid Service Integration Pattern: ${this.integrationPattern} is not supported to call SageMaker.`);
114+
}
98115

99116
// set the sagemaker role or create new one
100117
this.role = props.role || new iam.Role(scope, 'SagemakerRole', {
@@ -124,7 +141,7 @@ export class SagemakerTransformTask implements sfn.IStepFunctionsTask {
124141

125142
public bind(task: sfn.Task): sfn.StepFunctionsTaskConfig {
126143
return {
127-
resourceArn: 'arn:aws:states:::sagemaker:createTransformJob' + (this.props.synchronous ? '.sync' : ''),
144+
resourceArn: 'arn:aws:states:::sagemaker:createTransformJob' + resourceArnSuffix.get(this.integrationPattern),
128145
parameters: this.renderParameters(),
129146
policyStatements: this.makePolicyStatements(task),
130147
};
@@ -216,7 +233,7 @@ export class SagemakerTransformTask implements sfn.IStepFunctionsTask {
216233
})
217234
];
218235

219-
if (this.props.synchronous) {
236+
if (this.integrationPattern === sfn.ServiceIntegrationPattern.SYNC) {
220237
policyStatements.push(new iam.PolicyStatement({
221238
actions: ["events:PutTargets", "events:PutRule", "events:DescribeRule"],
222239
resources: [stack.formatArn({

0 commit comments

Comments
 (0)