Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(stepfunctions-tasks): cannot specify part of execution data or task context as input to the RunLambda service integration #7428

Merged
merged 20 commits into from
Apr 23, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
714c023
fix(stepfunctions-tasks): cannot specify part of execution data or ta…
shivlaks Apr 18, 2020
a74353d
fix test expectations
shivlaks Apr 18, 2020
7d91000
update README
shivlaks Apr 20, 2020
1086bdb
Merge branch 'master' into shivlaks/stepfunctions-run-lambda-fix
shivlaks Apr 20, 2020
1a6cb7f
missed a merge conflict
shivlaks Apr 20, 2020
4cae476
fix comma-dangle linter violations
shivlaks Apr 21, 2020
05ec5d4
add a test for Lambda function to take in task input as payload
shivlaks Apr 21, 2020
e3fc24f
Update packages/@aws-cdk/aws-stepfunctions/README.md
shivlaks Apr 21, 2020
389d580
Update packages/@aws-cdk/aws-stepfunctions/README.md
shivlaks Apr 21, 2020
f157eb6
Update packages/@aws-cdk/aws-stepfunctions/README.md
shivlaks Apr 21, 2020
edd4122
Update packages/@aws-cdk/aws-stepfunctions/README.md
shivlaks Apr 21, 2020
443e5d2
Update packages/@aws-cdk/aws-stepfunctions/README.md
shivlaks Apr 21, 2020
f053868
Update packages/@aws-cdk/aws-stepfunctions/README.md
shivlaks Apr 21, 2020
841074d
Apply suggestions from code review
shivlaks Apr 22, 2020
3f6429f
add comment to integ test including expected verification steps
shivlaks Apr 22, 2020
91892e8
add stack verification commands
shivlaks Apr 22, 2020
1029f30
missed a space
shivlaks Apr 22, 2020
a99b6c5
set the default for payload to a Lambda to use the state input
shivlaks Apr 23, 2020
bc2e83d
Merge branch 'master' into shivlaks/stepfunctions-run-lambda-fix
mergify[bot] Apr 23, 2020
f930df8
Merge branch 'master' into shivlaks/stepfunctions-run-lambda-fix
mergify[bot] Apr 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export interface RunLambdaTaskProps {
*
* @default - No payload
*/
readonly payload?: { [key: string]: any };
readonly payload?: sfn.TaskInput;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth introducing a new field for this? (Let's say, input?: sfn.TaskInput) and @deprecateing the old one?

Are you okay taking the breakage?


Next question: since we're touching this anyway, does it make sense to change the @default while we're at it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with the breaking change, given the migration is straight forward.

Why do you suggest changing the default? It seems to me that the current default makes sense, no?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any problems with no payload being the default as long as there is a code example that shows how to set "Payload.$": "$" as @rix0rrr suggests.

$ (the entire input) could be a sensible default value, mostly out of convenience for users of the "old-style" InvokeFunction which always sent the effective input as the payload. I noted some caveats in #7371 (comment).

It may be worth adding test coverage for setting payload to $ since that's such a common use case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also okay with taking the breakage here for the same reason as @nija-at

good point: I do think the @default should be the entire task input while we're at it as I think that's a more common usage than sending the empty object into a Lambda. I've left it as is for now, but happy to make that change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should optimize for the most common use case. What do we expect MOST people to do? @wong-a I would hope your team has some info on the most common ways people use each task type.

Furthermore, naively I would say that changing payload to $ by default is a non-breaking change. If people leave it empty, today they would get no parameters, so not expect them and not read them, so adding more parameters should be fine.

There's also the case of what the default outputPath should be. Feels like for maximum convenience it should be $.Payload, and similar for all other task types. But changing that is not backwards compatible so I guess that ship has sailed, huh...?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @wong-a, since we got you here anyway... what is the official term for the JSON blob that travels along with the execution, which is updated using inputPath, outputPath, etc?

I was struggling to find a good, clear and concise name for it while writing the library, so ultimately settled on something unsatisfying like Data. I wonder what you call it internally?

I guess we could do ExecutionState or something, but any term I can come up with feels so generic as to be worthless... :/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we expect MOST people to do? @wong-a I would hope your team has some info on the most common ways people use each task type.

The old-style Lambda is the most used Task type, which uses the entire effective input and I think most customers would be happy with it as the default. If someone wants to pass an empty object as the payload they can easily do so by passing an empty object as the value of Payload.

what is the official term for the JSON blob that travels along with the execution, which is updated using inputPath, outputPath, etc?

There's no special name for it. It's generally referred to within the ASL spec and Step Functions documentation as "state input" or output.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for that valuabe input @wong-a

I flipped this to default to $ as I agree with @rix0rrr that changing the payload from empty object to the state input is not likely to break the lambda being invoked as it wasn't expecting to read any input.

I will call it out in the breaking change entry.


/**
* The service integration pattern indicates different ways to invoke Lambda function.
Expand Down Expand Up @@ -92,7 +92,7 @@ export class RunLambdaTask implements sfn.IStepFunctionsTask {
metricDimensions: { LambdaFunctionArn: this.lambdaFunction.functionArn },
parameters: {
FunctionName: this.lambdaFunction.functionName,
Payload: this.props.payload,
Payload: this.props.payload?.value,
InvocationType: this.props.invocationType,
ClientContext: this.props.clientContext,
Qualifier: this.props.qualifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ const callbackHandler = new Function(stack, 'CallbackHandler', {
const taskTokenHandler = new sfn.Task(stack, 'Invoke Handler with task token', {
task: new tasks.RunLambdaTask(callbackHandler, {
integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN,
payload: {
payload: sfn.TaskInput.fromObject({
token: sfn.Context.taskToken,
},
}),
}),
inputPath: '$.guid',
resultPath: '$.status',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
{
"Resources": {
"submitJobLambdaServiceRole4D897ABD": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
}
}
],
"Version": "2012-10-17"
},
"ManagedPolicyArns": [
{
"Fn::Join": [
"",
[
"arn:",
{
"Ref": "AWS::Partition"
},
":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
]
]
}
]
}
},
"submitJobLambdaEFB00F3C": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Code": {
"ZipFile": "exports.handler = async () => {\n return {\n statusCode: '200',\n body: 'hello, world!'\n };\n };"
},
"Handler": "index.handler",
"Role": {
"Fn::GetAtt": [
"submitJobLambdaServiceRole4D897ABD",
"Arn"
]
},
"Runtime": "nodejs10.x"
},
"DependsOn": [
"submitJobLambdaServiceRole4D897ABD"
]
},
"checkJobStateLambdaServiceRoleB8B57B65": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
}
}
],
"Version": "2012-10-17"
},
"ManagedPolicyArns": [
{
"Fn::Join": [
"",
[
"arn:",
{
"Ref": "AWS::Partition"
},
":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
]
]
}
]
}
},
"checkJobStateLambda4618B7B7": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Code": {
"ZipFile": "exports.handler = async function(event, context) {\n return {\n status: event.statusCode === '200' ? 'SUCCEEDED' : 'FAILED';\n };\n };"
},
"Handler": "index.handler",
"Role": {
"Fn::GetAtt": [
"checkJobStateLambdaServiceRoleB8B57B65",
"Arn"
]
},
"Runtime": "nodejs10.x"
},
"DependsOn": [
"checkJobStateLambdaServiceRoleB8B57B65"
]
},
"StateMachineRoleB840431D": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": {
"Fn::Join": [
"",
[
"states.",
{
"Ref": "AWS::Region"
},
".amazonaws.com"
]
]
}
}
}
],
"Version": "2012-10-17"
}
}
},
"StateMachineRoleDefaultPolicyDF1E6607": {
"Type": "AWS::IAM::Policy",
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": "lambda:InvokeFunction",
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"submitJobLambdaEFB00F3C",
"Arn"
]
}
},
{
"Action": "lambda:InvokeFunction",
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"checkJobStateLambda4618B7B7",
"Arn"
]
}
}
],
"Version": "2012-10-17"
},
"PolicyName": "StateMachineRoleDefaultPolicyDF1E6607",
"Roles": [
{
"Ref": "StateMachineRoleB840431D"
}
]
}
},
"StateMachine2E01A3A5": {
"Type": "AWS::StepFunctions::StateMachine",
"Properties": {
"DefinitionString": {
"Fn::Join": [
"",
[
"{\"StartAt\":\"Invoke Handler\",\"States\":{\"Invoke Handler\":{\"Next\":\"Check the job state\",\"Parameters\":{\"FunctionName\":\"",
{
"Ref": "submitJobLambdaEFB00F3C"
},
"\"},\"OutputPath\":\"$.Payload\",\"Type\":\"Task\",\"Resource\":\"arn:",
{
"Ref": "AWS::Partition"
},
":states:::lambda:invoke\"},\"Check the job state\":{\"Next\":\"Job Complete?\",\"Parameters\":{\"FunctionName\":\"",
{
"Ref": "checkJobStateLambda4618B7B7"
},
"\",\"Payload.$\":\"$.Payload\"},\"OutputPath\":\"$.Payload\",\"Type\":\"Task\",\"Resource\":\"arn:",
{
"Ref": "AWS::Partition"
},
":states:::lambda:invoke\"},\"Job Complete?\":{\"Type\":\"Choice\",\"Choices\":[{\"Variable\":\"$.status\",\"StringEquals\":\"FAILED\",\"Next\":\"Job Failed\"},{\"Variable\":\"$.status\",\"StringEquals\":\"SUCCEEDED\",\"Next\":\"Final step\"}]},\"Job Failed\":{\"Type\":\"Fail\",\"Error\":\"Received a status that was not 200\",\"Cause\":\"Job Failed\"},\"Final step\":{\"Type\":\"Pass\",\"End\":true}},\"TimeoutSeconds\":30}"
]
]
},
"RoleArn": {
"Fn::GetAtt": [
"StateMachineRoleB840431D",
"Arn"
]
}
},
"DependsOn": [
"StateMachineRoleDefaultPolicyDF1E6607",
"StateMachineRoleB840431D"
]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { Code, Function, Runtime } from '@aws-cdk/aws-lambda';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as cdk from '@aws-cdk/core';
import * as tasks from '../../lib';

const app = new cdk.App();
shivlaks marked this conversation as resolved.
Show resolved Hide resolved
const stack = new cdk.Stack(app, 'aws-stepfunctions-tasks-run-lambda-integ');

const submitJobLambda = new Function(stack, 'submitJobLambda', {
code: Code.fromInline(`exports.handler = async () => {
return {
statusCode: '200',
body: 'hello, world!'
};
};`),
runtime: Runtime.NODEJS_10_X,
handler: 'index.handler',
});

const submitJob = new sfn.Task(stack, 'Invoke Handler', {
task: new tasks.RunLambdaTask(submitJobLambda),
outputPath: '$.Payload',
});

const checkJobStateLambda = new Function(stack, 'checkJobStateLambda', {
code: Code.fromInline(`exports.handler = async function(event, context) {
return {
status: event.statusCode === '200' ? 'SUCCEEDED' : 'FAILED';
};
};`),
runtime: Runtime.NODEJS_10_X,
handler: 'index.handler',
});

const checkJobState = new sfn.Task(stack, 'Check the job state', {
task: new tasks.RunLambdaTask(checkJobStateLambda, {
payload: sfn.TaskInput.fromDataAt('$.Payload'),
}),
outputPath: '$.Payload',
});

const isComplete = new sfn.Choice(stack, 'Job Complete?');
const jobFailed = new sfn.Fail(stack, 'Job Failed', {
cause: 'Job Failed',
error: 'Received a status that was not 200',
});
const finalStatus = new sfn.Pass(stack, 'Final step');

const chain = sfn.Chain.start(submitJob)
.next(checkJobState)
.next(
isComplete
.when(sfn.Condition.stringEquals('$.status', 'FAILED'), jobFailed)
.when(sfn.Condition.stringEquals('$.status', 'SUCCEEDED'), finalStatus),
);

new sfn.StateMachine(stack, 'StateMachine', {
definition: chain,
timeout: cdk.Duration.seconds(30),
});

app.synth();
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ beforeEach(() => {
test('Invoke lambda with default magic ARN', () => {
const task = new sfn.Task(stack, 'Task', {
task: new tasks.RunLambdaTask(fn, {
payload: {
payload: sfn.TaskInput.fromObject({
foo: 'bar',
},
}),
invocationType: tasks.InvocationType.REQUEST_RESPONSE,
clientContext: 'eyJoZWxsbyI6IndvcmxkIn0=',
qualifier: '1',
Expand Down Expand Up @@ -63,9 +63,9 @@ test('Lambda function can be used in a Task with Task Token', () => {
const task = new sfn.Task(stack, 'Task', {
task: new tasks.RunLambdaTask(fn, {
integrationPattern: sfn.ServiceIntegrationPattern.WAIT_FOR_TASK_TOKEN,
payload: {
payload: sfn.TaskInput.fromObject({
token: sfn.Context.taskToken,
},
}),
}),
});
new sfn.StateMachine(stack, 'SM', {
Expand Down Expand Up @@ -98,6 +98,40 @@ test('Lambda function can be used in a Task with Task Token', () => {
});
});

test('Lambda function can be provided with the task input as the payload', () => {
const task = new sfn.Task(stack, 'Task', {
task: new tasks.RunLambdaTask(fn, {
payload: sfn.TaskInput.fromDataAt('$'),
}),
});
new sfn.StateMachine(stack, 'SM', {
definition: task,
});

expect(stack.resolve(task.toStateJson())).toEqual({
Type: 'Task',
Resource: {
'Fn::Join': [
'',
[
'arn:',
{
Ref: 'AWS::Partition',
},
':states:::lambda:invoke',
],
],
},
End: true,
Parameters: {
'FunctionName': {
Ref: 'Fn9270CBC0',
},
'Payload.$': '$',
},
});
});

test('Task throws if WAIT_FOR_TASK_TOKEN is supplied but task token is not included in payLoad', () => {
expect(() => {
new sfn.Task(stack, 'Task', {
Expand Down