Skip to content

Commit d017a14

Browse files
albegalirix0rrr
authored andcommitted
feat(stepfunctions): waitForTaskToken for Lambda, SQS, SNS (#2686)
This PR allows one to work with Task states that implement the callback service integration pattern. Introduces a new class for integrating with Lambda in the new invocation style, since there are a number of subtle differences with the old invocation style. The supported task types are: * `RunLambdaTask` (AWS Lambda) * `SendToQueue` (AWS SQS) * `PublishToTopic` (AWS SNS) Closes #2658, closes #2735.
1 parent 65014ab commit d017a14

File tree

9 files changed

+591
-12
lines changed

9 files changed

+591
-12
lines changed

packages/@aws-cdk/aws-stepfunctions-tasks/lib/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
export * from './invoke-function';
2+
export * from './run-lambda-task';
23
export * from './invoke-activity';
34
export * from './run-ecs-task-base'; // Remove this once we can
45
export * from './run-ecs-task-base-types';

packages/@aws-cdk/aws-stepfunctions-tasks/lib/invoke-function.ts

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,25 @@ import iam = require('@aws-cdk/aws-iam');
22
import lambda = require('@aws-cdk/aws-lambda');
33
import sfn = require('@aws-cdk/aws-stepfunctions');
44

5+
/**
6+
* Properties for InvokeFunction
7+
*/
8+
export interface InvokeFunctionProps {
9+
/**
10+
* The JSON that you want to provide to your Lambda function as input.
11+
*
12+
* @default - The JSON data indicated by the task's InputPath is used as payload
13+
*/
14+
readonly payload?: { [key: string]: any };
15+
}
16+
517
/**
618
* A StepFunctions Task to invoke a Lambda function.
719
*
8-
* A Function can be used directly as a Resource, but this class mirrors
9-
* integration with other AWS services via a specific class instance.
20+
* OUTPUT: the output of this task is the return value of the Lambda Function.
1021
*/
1122
export class InvokeFunction implements sfn.IStepFunctionsTask {
12-
constructor(private readonly lambdaFunction: lambda.IFunction) {
23+
constructor(private readonly lambdaFunction: lambda.IFunction, private readonly props: InvokeFunctionProps = {}) {
1324
}
1425

1526
public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig {
@@ -22,6 +33,7 @@ export class InvokeFunction implements sfn.IStepFunctionsTask {
2233
metricPrefixSingular: 'LambdaFunction',
2334
metricPrefixPlural: 'LambdaFunctions',
2435
metricDimensions: { LambdaFunctionArn: this.lambdaFunction.functionArn },
36+
parameters: this.props.payload
2537
};
2638
}
27-
}
39+
}

packages/@aws-cdk/aws-stepfunctions-tasks/lib/publish-to-topic.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,13 @@ export interface PublishToTopicProps {
2727
* Message subject
2828
*/
2929
readonly subject?: string;
30+
31+
/**
32+
* Whether to pause the workflow until a task token is returned
33+
*
34+
* @default false
35+
*/
36+
readonly waitForTaskToken?: boolean;
3037
}
3138

3239
/**
@@ -36,12 +43,20 @@ export interface PublishToTopicProps {
3643
* integration with other AWS services via a specific class instance.
3744
*/
3845
export class PublishToTopic implements sfn.IStepFunctionsTask {
46+
47+
private readonly waitForTaskToken: boolean;
48+
3949
constructor(private readonly topic: sns.ITopic, private readonly props: PublishToTopicProps) {
50+
this.waitForTaskToken = props.waitForTaskToken === true;
51+
52+
if (this.waitForTaskToken && !sfn.FieldUtils.containsTaskToken(props.message.value)) {
53+
throw new Error('Task Token is missing in message (pass Context.taskToken somewhere in message)');
54+
}
4055
}
4156

4257
public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig {
4358
return {
44-
resourceArn: 'arn:aws:states:::sns:publish',
59+
resourceArn: 'arn:aws:states:::sns:publish' + (this.waitForTaskToken ? '.waitForTaskToken' : ''),
4560
policyStatements: [new iam.PolicyStatement({
4661
actions: ['sns:Publish'],
4762
resources: [this.topic.topicArn]
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
import iam = require('@aws-cdk/aws-iam');
2+
import lambda = require('@aws-cdk/aws-lambda');
3+
import sfn = require('@aws-cdk/aws-stepfunctions');
4+
import { FieldUtils } from '../../aws-stepfunctions/lib/fields';
5+
6+
/**
7+
* Properties for RunLambdaTask
8+
*/
9+
export interface RunLambdaTaskProps {
10+
/**
11+
* The JSON that you want to provide to your Lambda function as input.
12+
*/
13+
readonly payload?: { [key: string]: any };
14+
15+
/**
16+
* Whether to pause the workflow until a task token is returned
17+
*
18+
* If this is set to true, the Context.taskToken value must be included
19+
* somewhere in the payload and the Lambda must call
20+
* `SendTaskSuccess/SendTaskFailure` using that token.
21+
*
22+
* @default false
23+
*/
24+
readonly waitForTaskToken?: boolean;
25+
26+
/**
27+
* Invocation type of the Lambda function
28+
*
29+
* @default RequestResponse
30+
*/
31+
readonly invocationType?: InvocationType;
32+
33+
/**
34+
* Client context to pass to the function
35+
*
36+
* @default - No context
37+
*/
38+
readonly clientContext?: string;
39+
}
40+
41+
/**
42+
* Invoke a Lambda function as a Task
43+
*
44+
* OUTPUT: the output of this task is either the return value of Lambda's
45+
* Invoke call, or whatever the Lambda Function posted back using
46+
* `SendTaskSuccess/SendTaskFailure` in `waitForTaskToken` mode.
47+
*
48+
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-lambda.html
49+
*/
50+
export class RunLambdaTask implements sfn.IStepFunctionsTask {
51+
private readonly waitForTaskToken: boolean;
52+
53+
constructor(private readonly lambdaFunction: lambda.IFunction, private readonly props: RunLambdaTaskProps = {}) {
54+
this.waitForTaskToken = !!props.waitForTaskToken;
55+
56+
if (this.waitForTaskToken && !FieldUtils.containsTaskToken(props.payload)) {
57+
throw new Error('Task Token is missing in payload (pass Context.taskToken somewhere in payload)');
58+
}
59+
}
60+
61+
public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig {
62+
const resourceArn = 'arn:aws:states:::lambda:invoke' + (this.waitForTaskToken ? '.waitForTaskToken' : '');
63+
64+
return {
65+
resourceArn,
66+
policyStatements: [new iam.PolicyStatement({
67+
resources: [this.lambdaFunction.functionArn],
68+
actions: ["lambda:InvokeFunction"],
69+
})],
70+
metricPrefixSingular: 'LambdaFunction',
71+
metricPrefixPlural: 'LambdaFunctions',
72+
metricDimensions: { LambdaFunctionArn: this.lambdaFunction.functionArn },
73+
parameters: {
74+
FunctionName: this.lambdaFunction.functionName,
75+
Payload: this.props.payload,
76+
InvocationType: this.props.invocationType,
77+
ClientContext: this.props.clientContext,
78+
}
79+
};
80+
}
81+
}
82+
83+
/**
84+
* Invocation type of a Lambda
85+
*/
86+
export enum InvocationType {
87+
/**
88+
* Invoke synchronously
89+
*
90+
* The API response includes the function response and additional data.
91+
*/
92+
RequestResponse = 'RequestResponse',
93+
94+
/**
95+
* Invoke asynchronously
96+
*
97+
* Send events that fail multiple times to the function's dead-letter queue (if it's configured).
98+
* The API response only includes a status code.
99+
*/
100+
Event = 'Event',
101+
}

packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@ export interface SendToQueueProps {
3636
* @default No group ID
3737
*/
3838
readonly messageGroupId?: string;
39+
40+
/**
41+
* Whether to pause the workflow until a task token is returned
42+
*
43+
* @default false
44+
*/
45+
readonly waitForTaskToken?: boolean;
3946
}
4047

4148
/**
@@ -45,12 +52,20 @@ export interface SendToQueueProps {
4552
* integration with other AWS services via a specific class instance.
4653
*/
4754
export class SendToQueue implements sfn.IStepFunctionsTask {
55+
56+
private readonly waitForTaskToken: boolean;
57+
4858
constructor(private readonly queue: sqs.IQueue, private readonly props: SendToQueueProps) {
59+
this.waitForTaskToken = props.waitForTaskToken === true;
60+
61+
if (this.waitForTaskToken && !sfn.FieldUtils.containsTaskToken(props.messageBody.value)) {
62+
throw new Error('Task Token is missing in messageBody (pass Context.taskToken somewhere in messageBody)');
63+
}
4964
}
5065

5166
public bind(_task: sfn.Task): sfn.StepFunctionsTaskConfig {
5267
return {
53-
resourceArn: 'arn:aws:states:::sqs:sendMessage',
68+
resourceArn: 'arn:aws:states:::sqs:sendMessage' + (this.waitForTaskToken ? '.waitForTaskToken' : ''),
5469
policyStatements: [new iam.PolicyStatement({
5570
actions: ['sqs:SendMessage'],
5671
resources: [this.queue.queueArn]

0 commit comments

Comments
 (0)