Skip to content

Commit 58a80ab

Browse files
wqzowwrix0rrr
authored andcommitted
fix(stepfunctions): fix passing of Token in RunLambdaTask (#2939)
Fixes #2937.
1 parent f30bdd3 commit 58a80ab

14 files changed

+221
-78
lines changed

packages/@aws-cdk/aws-stepfunctions-tasks/lib/publish-to-topic.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,9 @@ export class PublishToTopic implements sfn.IStepFunctionsTask {
6363
})],
6464
parameters: {
6565
TopicArn: this.topic.topicArn,
66-
...sfn.FieldUtils.renderObject({
67-
Message: this.props.message.value,
68-
MessageStructure: this.props.messagePerSubscriptionType ? "json" : undefined,
69-
Subject: this.props.subject,
70-
})
66+
Message: this.props.message.value,
67+
MessageStructure: this.props.messagePerSubscriptionType ? "json" : undefined,
68+
Subject: this.props.subject,
7169
}
7270
};
7371
}

packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-ecs-task-base.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ function renderOverrides(containerOverrides?: ContainerOverride[]) {
166166

167167
const ret = new Array<any>();
168168
for (const override of containerOverrides) {
169-
ret.push(sfn.FieldUtils.renderObject({
169+
ret.push({
170170
Name: override.containerName,
171171
Command: override.command,
172172
Cpu: override.cpu,
@@ -176,7 +176,7 @@ function renderOverrides(containerOverrides?: ContainerOverride[]) {
176176
Name: e.name,
177177
Value: e.value,
178178
}))
179-
}));
179+
});
180180
}
181181

182182
return { ContainerOverrides: ret };

packages/@aws-cdk/aws-stepfunctions-tasks/lib/run-lambda-task.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@ export interface RunLambdaTaskProps {
3636
* @default - No context
3737
*/
3838
readonly clientContext?: string;
39+
40+
/**
41+
* Version or alias of the function to be invoked
42+
*
43+
* @default - No qualifier
44+
*/
45+
readonly qualifier?: string;
3946
}
4047

4148
/**
@@ -75,6 +82,7 @@ export class RunLambdaTask implements sfn.IStepFunctionsTask {
7582
Payload: this.props.payload,
7683
InvocationType: this.props.invocationType,
7784
ClientContext: this.props.clientContext,
85+
Qualifier: this.props.qualifier
7886
}
7987
};
8088
}

packages/@aws-cdk/aws-stepfunctions-tasks/lib/sagemaker-train-task.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ export class SagemakerTrainTask implements ec2.IConnectable, sfn.IStepFunctionsT
151151
public bind(task: sfn.Task): sfn.StepFunctionsTaskConfig {
152152
return {
153153
resourceArn: 'arn:aws:states:::sagemaker:createTrainingJob' + (this.props.synchronous ? '.sync' : ''),
154-
parameters: sfn.FieldUtils.renderObject(this.renderParameters()),
154+
parameters: this.renderParameters(),
155155
policyStatements: this.makePolicyStatements(task),
156156
};
157157
}

packages/@aws-cdk/aws-stepfunctions-tasks/lib/sagemaker-transform-task.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ export class SagemakerTransformTask implements sfn.IStepFunctionsTask {
125125
public bind(task: sfn.Task): sfn.StepFunctionsTaskConfig {
126126
return {
127127
resourceArn: 'arn:aws:states:::sagemaker:createTransformJob' + (this.props.synchronous ? '.sync' : ''),
128-
parameters: sfn.FieldUtils.renderObject(this.renderParameters()),
128+
parameters: this.renderParameters(),
129129
policyStatements: this.makePolicyStatements(task),
130130
};
131131
}

packages/@aws-cdk/aws-stepfunctions-tasks/lib/send-to-queue.ts

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,10 @@ export class SendToQueue implements sfn.IStepFunctionsTask {
7373
})],
7474
parameters: {
7575
QueueUrl: this.queue.queueUrl,
76-
...sfn.FieldUtils.renderObject({
77-
MessageBody: this.props.messageBody.value,
78-
DelaySeconds: this.props.delay && this.props.delay.toSeconds(),
79-
MessageDeduplicationId: this.props.messageDeduplicationId,
80-
MessageGroupId: this.props.messageGroupId,
81-
})
76+
MessageBody: this.props.messageBody.value,
77+
DelaySeconds: this.props.delay && this.props.delay.toSeconds(),
78+
MessageDeduplicationId: this.props.messageDeduplicationId,
79+
MessageGroupId: this.props.messageGroupId,
8280
}
8381
};
8482
}

packages/@aws-cdk/aws-stepfunctions-tasks/test/integ.invoke-function.expected.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@
272272
{
273273
"Ref": "CallbackHandler4434C38D"
274274
},
275-
"\",\"Payload\":{\"token\":\"$$.Task.Token\"}},\"Type\":\"Task\",\"Resource\":\"arn:aws:states:::lambda:invoke.waitForTaskToken\",\"ResultPath\":\"$.status\"},\"Job Complete?\":{\"Type\":\"Choice\",\"Choices\":[{\"Variable\":\"$.status\",\"StringEquals\":\"FAILED\",\"Next\":\"Job Failed\"},{\"Variable\":\"$.status\",\"StringEquals\":\"SUCCEEDED\",\"Next\":\"Final step\"}]},\"Job Failed\":{\"Type\":\"Fail\",\"Error\":\"DescribeJob returned FAILED\",\"Cause\":\"AWS Batch Job Failed\"},\"Final step\":{\"Type\":\"Pass\",\"End\":true}},\"TimeoutSeconds\":30}"
275+
"\",\"Payload\":{\"token.$\":\"$$.Task.Token\"}},\"Type\":\"Task\",\"Resource\":\"arn:aws:states:::lambda:invoke.waitForTaskToken\",\"ResultPath\":\"$.status\"},\"Job Complete?\":{\"Type\":\"Choice\",\"Choices\":[{\"Variable\":\"$.status\",\"StringEquals\":\"FAILED\",\"Next\":\"Job Failed\"},{\"Variable\":\"$.status\",\"StringEquals\":\"SUCCEEDED\",\"Next\":\"Final step\"}]},\"Job Failed\":{\"Type\":\"Fail\",\"Error\":\"DescribeJob returned FAILED\",\"Cause\":\"AWS Batch Job Failed\"},\"Final step\":{\"Type\":\"Pass\",\"End\":true}},\"TimeoutSeconds\":30}"
276276
]
277277
]
278278
},

packages/@aws-cdk/aws-stepfunctions-tasks/test/invoke-function.test.ts

Lines changed: 3 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ beforeEach(() => {
1515
});
1616
});
1717

18-
test('Lambda function can be used in a Task', () => {
18+
test('Invoke lambda with function ARN', () => {
1919
// WHEN
2020
const task = new sfn.Task(stack, 'Task', { task: new tasks.InvokeFunction(fn) });
2121
new sfn.StateMachine(stack, 'SM', {
@@ -39,7 +39,7 @@ test('Lambda function payload ends up in Parameters', () => {
3939
definition: new sfn.Task(stack, 'Task', {
4040
task: new tasks.InvokeFunction(fn, {
4141
payload: {
42-
foo: 'bar'
42+
foo: sfn.Data.stringAt('$.bar')
4343
}
4444
})
4545
})
@@ -48,45 +48,10 @@ test('Lambda function payload ends up in Parameters', () => {
4848
expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', {
4949
DefinitionString: {
5050
"Fn::Join": ["", [
51-
"{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Parameters\":{\"foo\":\"bar\"},\"Type\":\"Task\",\"Resource\":\"",
51+
"{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Parameters\":{\"foo.$\":\"$.bar\"},\"Type\":\"Task\",\"Resource\":\"",
5252
{ "Fn::GetAtt": ["Fn9270CBC0", "Arn"] },
5353
"\"}}}"
5454
]]
5555
},
5656
});
5757
});
58-
59-
test('Lambda function can be used in a Task with Task Token', () => {
60-
const task = new sfn.Task(stack, 'Task', {
61-
task: new tasks.RunLambdaTask(fn, {
62-
waitForTaskToken: true,
63-
payload: {
64-
token: sfn.Context.taskToken
65-
}
66-
})
67-
});
68-
new sfn.StateMachine(stack, 'SM', {
69-
definition: task
70-
});
71-
72-
// THEN
73-
expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', {
74-
DefinitionString: {
75-
"Fn::Join": ["", [
76-
"{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Parameters\":{\"FunctionName\":\"",
77-
{ Ref: "Fn9270CBC0" },
78-
"\",\"Payload\":{\"token\":\"$$.Task.Token\"}},\"Type\":\"Task\",\"Resource\":\"arn:aws:states:::lambda:invoke.waitForTaskToken\"}}}"
79-
]]
80-
},
81-
});
82-
});
83-
84-
test('Task throws if waitForTaskToken is supplied but task token is not included', () => {
85-
expect(() => {
86-
new sfn.Task(stack, 'Task', {
87-
task: new tasks.RunLambdaTask(fn, {
88-
waitForTaskToken: true
89-
})
90-
});
91-
}).toThrow(/Task Token is missing in payload/i);
92-
});

packages/@aws-cdk/aws-stepfunctions-tasks/test/publish-to-topic.test.ts

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@ import sfn = require('@aws-cdk/aws-stepfunctions');
33
import cdk = require('@aws-cdk/cdk');
44
import tasks = require('../lib');
55

6-
test('publish to SNS', () => {
6+
test('Publish literal message to SNS topic', () => {
77
// GIVEN
88
const stack = new cdk.Stack();
99
const topic = new sns.Topic(stack, 'Topic');
1010

1111
// WHEN
1212
const pub = new sfn.Task(stack, 'Publish', { task: new tasks.PublishToTopic(topic, {
13-
message: sfn.TaskInput.fromText('Send this message')
13+
message: sfn.TaskInput.fromText('Publish this message')
1414
}) });
1515

1616
// THEN
@@ -20,33 +20,72 @@ test('publish to SNS', () => {
2020
End: true,
2121
Parameters: {
2222
TopicArn: { Ref: 'TopicBFC7AF6E' },
23-
Message: 'Send this message'
23+
Message: 'Publish this message'
2424
},
2525
});
2626
});
2727

28-
test('publish JSON to SNS', () => {
28+
test('Publish JSON to SNS topic with task token', () => {
2929
// GIVEN
3030
const stack = new cdk.Stack();
3131
const topic = new sns.Topic(stack, 'Topic');
3232

3333
// WHEN
3434
const pub = new sfn.Task(stack, 'Publish', { task: new tasks.PublishToTopic(topic, {
35+
waitForTaskToken: true,
3536
message: sfn.TaskInput.fromObject({
36-
Input: 'Send this message'
37+
Input: 'Publish this message',
38+
Token: sfn.Context.taskToken
3739
})
3840
}) });
3941

4042
// THEN
4143
expect(stack.resolve(pub.toStateJson())).toEqual({
4244
Type: 'Task',
43-
Resource: 'arn:aws:states:::sns:publish',
45+
Resource: 'arn:aws:states:::sns:publish.waitForTaskToken',
4446
End: true,
4547
Parameters: {
4648
TopicArn: { Ref: 'TopicBFC7AF6E' },
4749
Message: {
48-
Input: 'Send this message'
50+
'Input': 'Publish this message',
51+
'Token.$': '$$.Task.Token'
4952
}
5053
},
5154
});
5255
});
56+
57+
test('Task throws if waitForTaskToken is supplied but task token is not included in message', () => {
58+
expect(() => {
59+
// GIVEN
60+
const stack = new cdk.Stack();
61+
const topic = new sns.Topic(stack, 'Topic');
62+
// WHEN
63+
new sfn.Task(stack, 'Publish', { task: new tasks.PublishToTopic(topic, {
64+
waitForTaskToken: true,
65+
message: sfn.TaskInput.fromText('Publish this message')
66+
}) });
67+
// THEN
68+
}).toThrow(/Task Token is missing in message/i);
69+
});
70+
71+
test('Publish to topic with ARN from payload', () => {
72+
// GIVEN
73+
const stack = new cdk.Stack();
74+
const topic = sns.Topic.fromTopicArn(stack, 'Topic', sfn.Data.stringAt('$.topicArn'));
75+
76+
// WHEN
77+
const pub = new sfn.Task(stack, 'Publish', { task: new tasks.PublishToTopic(topic, {
78+
message: sfn.TaskInput.fromText('Publish this message')
79+
}) });
80+
81+
// THEN
82+
expect(stack.resolve(pub.toStateJson())).toEqual({
83+
Type: 'Task',
84+
Resource: 'arn:aws:states:::sns:publish',
85+
End: true,
86+
Parameters: {
87+
'TopicArn.$': '$.topicArn',
88+
'Message': 'Publish this message'
89+
},
90+
});
91+
});
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import '@aws-cdk/assert/jest';
2+
import lambda = require('@aws-cdk/aws-lambda');
3+
import sfn = require('@aws-cdk/aws-stepfunctions');
4+
import { Stack } from '@aws-cdk/cdk';
5+
import tasks = require('../lib');
6+
7+
let stack: Stack;
8+
let fn: lambda.Function;
9+
beforeEach(() => {
10+
stack = new Stack();
11+
fn = new lambda.Function(stack, 'Fn', {
12+
code: lambda.Code.inline('hello'),
13+
handler: 'index.hello',
14+
runtime: lambda.Runtime.Python27,
15+
});
16+
});
17+
18+
test('Invoke lambda with default magic ARN', () => {
19+
const task = new sfn.Task(stack, 'Task', {
20+
task: new tasks.RunLambdaTask(fn, {
21+
payload: {
22+
foo: 'bar'
23+
},
24+
invocationType: tasks.InvocationType.RequestResponse,
25+
clientContext: "eyJoZWxsbyI6IndvcmxkIn0=",
26+
qualifier: "1",
27+
})
28+
});
29+
new sfn.StateMachine(stack, 'SM', {
30+
definition: task
31+
});
32+
33+
expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', {
34+
DefinitionString: {
35+
"Fn::Join": ["", [
36+
"{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Parameters\":{\"FunctionName\":\"",
37+
{ Ref: "Fn9270CBC0" },
38+
"\",\"Payload\":{\"foo\":\"bar\"},\"InvocationType\":\"RequestResponse\",\"ClientContext\":\"eyJoZWxsbyI6IndvcmxkIn0=\","
39+
+ "\"Qualifier\":\"1\"},\"Type\":\"Task\",\"Resource\":\"arn:aws:states:::lambda:invoke\"}}}"
40+
]]
41+
},
42+
});
43+
});
44+
45+
test('Lambda function can be used in a Task with Task Token', () => {
46+
const task = new sfn.Task(stack, 'Task', {
47+
task: new tasks.RunLambdaTask(fn, {
48+
waitForTaskToken: true,
49+
payload: {
50+
token: sfn.Context.taskToken
51+
}
52+
})
53+
});
54+
new sfn.StateMachine(stack, 'SM', {
55+
definition: task
56+
});
57+
58+
expect(stack).toHaveResource('AWS::StepFunctions::StateMachine', {
59+
DefinitionString: {
60+
"Fn::Join": ["", [
61+
"{\"StartAt\":\"Task\",\"States\":{\"Task\":{\"End\":true,\"Parameters\":{\"FunctionName\":\"",
62+
{ Ref: "Fn9270CBC0" },
63+
"\",\"Payload\":{\"token.$\":\"$$.Task.Token\"}},\"Type\":\"Task\",\"Resource\":\"arn:aws:states:::lambda:invoke.waitForTaskToken\"}}}"
64+
]]
65+
},
66+
});
67+
});
68+
69+
test('Task throws if waitForTaskToken is supplied but task token is not included in payLoad', () => {
70+
expect(() => {
71+
new sfn.Task(stack, 'Task', {
72+
task: new tasks.RunLambdaTask(fn, {
73+
waitForTaskToken: true
74+
})
75+
});
76+
}).toThrow(/Task Token is missing in payload/i);
77+
});

0 commit comments

Comments
 (0)