From 5ecf2577350da2b9ff7115b2868192bcbd56a56e Mon Sep 17 00:00:00 2001 From: Alban Esc Date: Thu, 8 Jul 2021 05:37:03 -0700 Subject: [PATCH] feat(events): DLQ and retry policy support for BatchJob target (#15308) Add DLQ and Retry policy Configuration to BatchJob targets. Using a DLQ on a rule prevents the application to loose events after all retry attempts are exhausted. Update README.md and code examples to support Rosetta translation and compiling. Resolves #15238 ---- *By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license* --- .../@aws-cdk/aws-events-targets/README.md | 103 +++++-- .../@aws-cdk/aws-events-targets/lib/batch.ts | 9 +- .../aws-events-targets/lib/kinesis-stream.ts | 8 +- .../@aws-cdk/aws-events-targets/lib/sns.ts | 8 +- .../@aws-cdk/aws-events-targets/lib/sqs.ts | 8 +- .../lib/withRepoAndKinesisStream.ts-fixture | 23 ++ .../rosetta/default.ts-fixture | 16 + .../withRepoAndKinesisStream.ts-fixture | 23 ++ .../rosetta/withRepoAndSqsQueue.ts-fixture | 23 ++ .../rosetta/withRepoAndTopic.ts-fixture | 23 ++ .../test/batch/batch.test.ts | 281 ++++++++++++++---- .../integ.job-definition-events.expected.json | 58 +++- .../test/batch/integ.job-definition-events.ts | 10 +- 13 files changed, 486 insertions(+), 107 deletions(-) create mode 100644 packages/@aws-cdk/aws-events-targets/lib/withRepoAndKinesisStream.ts-fixture create mode 100644 packages/@aws-cdk/aws-events-targets/rosetta/default.ts-fixture create mode 100644 packages/@aws-cdk/aws-events-targets/rosetta/withRepoAndKinesisStream.ts-fixture create mode 100644 packages/@aws-cdk/aws-events-targets/rosetta/withRepoAndSqsQueue.ts-fixture create mode 100644 packages/@aws-cdk/aws-events-targets/rosetta/withRepoAndTopic.ts-fixture diff --git a/packages/@aws-cdk/aws-events-targets/README.md b/packages/@aws-cdk/aws-events-targets/README.md index 6dcc3598a26e7..994c24ab8a0a5 100644 --- a/packages/@aws-cdk/aws-events-targets/README.md +++ b/packages/@aws-cdk/aws-events-targets/README.md @@ -23,7 +23,7 @@ Currently supported are: * Publish a message to an SNS topic * Send a message to an SQS queue * [Start a StepFunctions state machine](#start-a-stepfunctions-state-machine) -* Queue a Batch job +* [Queue a Batch job](#queue-a-batch-job) * Make an AWS API call * Put a record to a Kinesis stream * [Log an event into a LogGroup](#log-an-event-into-a-loggroup) @@ -47,16 +47,12 @@ triggered for every events from `aws.ec2` source. You can optionally attach a [dead letter queue](https://docs.aws.amazon.com/eventbridge/latest/userguide/rule-dlq.html). ```ts -import * as lambda from "@aws-cdk/aws-lambda"; -import * as events from "@aws-cdk/aws-events"; -import * as sqs from "@aws-cdk/aws-sqs"; -import * as targets from "@aws-cdk/aws-events-targets"; -import * as cdk from '@aws-cdk/core'; +import * as lambda from '@aws-cdk/aws-lambda'; const fn = new lambda.Function(this, 'MyFunc', { runtime: lambda.Runtime.NODEJS_12_X, handler: 'index.handler', - code: lambda.Code.fromInline(`exports.handler = ${handler.toString()}`), + code: lambda.Code.fromInline(`exports.handler = handler.toString()`), }); const rule = new events.Rule(this, 'rule', { @@ -82,9 +78,7 @@ For example, the following code snippet creates an event rule with a CloudWatch Every events sent from the `aws.ec2` source will be sent to the CloudWatch LogGroup. ```ts -import * as logs from "@aws-cdk/aws-logs"; -import * as events from "@aws-cdk/aws-events"; -import * as targets from "@aws-cdk/aws-events-targets"; +import * as logs from '@aws-cdk/aws-logs'; const logGroup = new logs.LogGroup(this, 'MyLogGroup', { logGroupName: 'MyLogGroup', @@ -108,10 +102,8 @@ on commit to the master branch. You can optionally attach a [dead letter queue](https://docs.aws.amazon.com/eventbridge/latest/userguide/rule-dlq.html). ```ts -import * as codebuild from '@aws-sdk/aws-codebuild'; -import * as codecommit from '@aws-sdk/aws-codecommit'; -import * as sqs from '@aws-sdk/aws-sqs'; -import * as targets from "@aws-cdk/aws-events-targets"; +import * as codebuild from '@aws-cdk/aws-codebuild'; +import * as codecommit from '@aws-cdk/aws-codecommit'; const repo = new codecommit.Repository(this, 'MyRepo', { repositoryName: 'aws-cdk-codebuild-events', @@ -139,12 +131,11 @@ Use the `CodePipeline` target to trigger a CodePipeline pipeline. The code snippet below creates a CodePipeline pipeline that is triggered every hour ```ts -import * as codepipeline from '@aws-sdk/aws-codepipeline'; -import * as sqs from '@aws-sdk/aws-sqs'; +import * as codepipeline from '@aws-cdk/aws-codepipeline'; const pipeline = new codepipeline.Pipeline(this, 'Pipeline'); -const rule = new events.Rule(stack, 'Rule', { +const rule = new events.Rule(this, 'Rule', { schedule: events.Schedule.expression('rate(1 hour)'), }); @@ -162,22 +153,20 @@ You can optionally attach a to the target. ```ts -import * as iam from '@aws-sdk/aws-iam'; -import * as sqs from '@aws-sdk/aws-sqs'; +import * as iam from '@aws-cdk/aws-iam'; import * as sfn from '@aws-cdk/aws-stepfunctions'; -import * as targets from "@aws-cdk/aws-events-targets"; -const rule = new events.Rule(stack, 'Rule', { +const rule = new events.Rule(this, 'Rule', { schedule: events.Schedule.rate(cdk.Duration.minutes(1)), }); -const dlq = new sqs.Queue(stack, 'DeadLetterQueue'); +const dlq = new sqs.Queue(this, 'DeadLetterQueue'); -const role = new iam.Role(stack, 'Role', { +const role = new iam.Role(this, 'Role', { assumedBy: new iam.ServicePrincipal('events.amazonaws.com'), }); -const stateMachine = new sfn.StateMachine(stack, 'SM', { - definition: new sfn.Wait(stack, 'Hello', { time: sfn.WaitTime.duration(cdk.Duration.seconds(10)) }), +const stateMachine = new sfn.StateMachine(this, 'SM', { + definition: new sfn.Wait(this, 'Hello', { time: sfn.WaitTime.duration(cdk.Duration.seconds(10)) }), role, }); @@ -187,19 +176,67 @@ rule.addTarget(new targets.SfnStateMachine(stateMachine, { })); ``` +## Queue a Batch job + +Use the `BatchJob` target to queue a Batch job. + +The code snippet below creates a Simple JobQueue that is triggered every hour with a +dummy object as input. +You can optionally attach a +[dead letter queue](https://docs.aws.amazon.com/eventbridge/latest/userguide/rule-dlq.html) +to the target. + +```ts +import * as batch from '@aws-cdk/aws-batch'; +import { ContainerImage } from '@aws-cdk/aws-ecs'; + +const jobQueue = new batch.JobQueue(this, 'MyQueue', { + computeEnvironments: [ + { + computeEnvironment: new batch.ComputeEnvironment(this, 'ComputeEnvironment', { + managed: false, + }), + order: 1, + }, + ], +}); + +const jobDefinition = new batch.JobDefinition(this, 'MyJob', { + container: { + image: ContainerImage.fromRegistry('test-repo'), + }, +}); + +const queue = new sqs.Queue(this, 'Queue'); + +const rule = new events.Rule(this, 'Rule', { + schedule: events.Schedule.rate(cdk.Duration.hours(1)), +}); + +rule.addTarget(new targets.BatchJob( + jobQueue.jobQueueArn, + jobQueue, + jobDefinition.jobDefinitionArn, + jobDefinition, { + deadLetterQueue: queue, + event: events.RuleTargetInput.fromObject({ SomeParam: 'SomeValue' }), + retryAttempts: 2, + maxEventAge: cdk.Duration.hours(2), + }, +)); +``` + ## Invoke a API Gateway REST API Use the `ApiGateway` target to trigger a REST API. The code snippet below creates a Api Gateway REST API that is invoked every hour. -```typescript -import * as iam from '@aws-sdk/aws-iam'; -import * as sqs from '@aws-sdk/aws-sqs'; +```ts import * as api from '@aws-cdk/aws-apigateway'; -import * as targets from "@aws-cdk/aws-events-targets"; +import * as lambda from '@aws-cdk/aws-lambda'; -const rule = new events.Rule(stack, 'Rule', { +const rule = new events.Rule(this, 'Rule', { schedule: events.Schedule.rate(cdk.Duration.minutes(1)), }); @@ -211,12 +248,12 @@ const fn = new lambda.Function( this, 'MyFunc', { const restApi = new api.LambdaRestApi( this, 'MyRestAPI', { handler: fn } ); -const dlq = new sqs.Queue(stack, 'DeadLetterQueue'); +const dlq = new sqs.Queue(this, 'DeadLetterQueue'); rule.addTarget( new targets.ApiGateway( restApi, { path: '/*/test', - mehod: 'GET', + method: 'GET', stage: 'prod', pathParameterValues: ['path-value'], headerParameters: { @@ -225,7 +262,7 @@ rule.addTarget( queryStringParameters: { QueryParam1: 'query-param-1', }, - deadLetterQueue: queue + deadLetterQueue: dlq } ), ) ``` diff --git a/packages/@aws-cdk/aws-events-targets/lib/batch.ts b/packages/@aws-cdk/aws-events-targets/lib/batch.ts index 15a823549055c..f0186efe6089d 100644 --- a/packages/@aws-cdk/aws-events-targets/lib/batch.ts +++ b/packages/@aws-cdk/aws-events-targets/lib/batch.ts @@ -1,12 +1,12 @@ import * as events from '@aws-cdk/aws-events'; import * as iam from '@aws-cdk/aws-iam'; import { Names, IConstruct } from '@aws-cdk/core'; -import { singletonEventRole } from './util'; +import { addToDeadLetterQueueResourcePolicy, bindBaseTargetConfig, singletonEventRole, TargetBaseProps } from './util'; /** * Customize the Batch Job Event Target */ -export interface BatchJobProps { +export interface BatchJobProps extends TargetBaseProps { /** * The event to send to the Lambda * @@ -83,7 +83,12 @@ export class BatchJob implements events.IRuleTarget { retryStrategy: this.props.attempts ? { attempts: this.props.attempts } : undefined, }; + if (this.props.deadLetterQueue) { + addToDeadLetterQueueResourcePolicy(rule, this.props.deadLetterQueue); + } + return { + ...bindBaseTargetConfig(this.props), arn: this.jobQueueArn, // When scoping resource-level access for job submission, you must provide both job queue and job definition resource types. // https://docs.aws.amazon.com/batch/latest/userguide/ExamplePolicies_BATCH.html#iam-example-restrict-job-def diff --git a/packages/@aws-cdk/aws-events-targets/lib/kinesis-stream.ts b/packages/@aws-cdk/aws-events-targets/lib/kinesis-stream.ts index 60bb544223729..743b197e19d52 100644 --- a/packages/@aws-cdk/aws-events-targets/lib/kinesis-stream.ts +++ b/packages/@aws-cdk/aws-events-targets/lib/kinesis-stream.ts @@ -29,10 +29,10 @@ export interface KinesisStreamProps { * Use a Kinesis Stream as a target for AWS CloudWatch event rules. * * @example - * - * // put to a Kinesis stream every time code is committed - * // to a CodeCommit repository - * repository.onCommit(new targets.KinesisStream(stream)); + * /// fixture=withRepoAndKinesisStream + * // put to a Kinesis stream every time code is committed + * // to a CodeCommit repository + * repository.onCommit('onCommit', { target: new targets.KinesisStream(stream) }); * */ export class KinesisStream implements events.IRuleTarget { diff --git a/packages/@aws-cdk/aws-events-targets/lib/sns.ts b/packages/@aws-cdk/aws-events-targets/lib/sns.ts index 13f10f7d80552..81e5d4916718f 100644 --- a/packages/@aws-cdk/aws-events-targets/lib/sns.ts +++ b/packages/@aws-cdk/aws-events-targets/lib/sns.ts @@ -18,10 +18,10 @@ export interface SnsTopicProps { * Use an SNS topic as a target for Amazon EventBridge rules. * * @example - * - * // publish to an SNS topic every time code is committed - * // to a CodeCommit repository - * repository.onCommit(new targets.SnsTopic(topic)); + * /// fixture=withRepoAndTopic + * // publish to an SNS topic every time code is committed + * // to a CodeCommit repository + * repository.onCommit('onCommit', { target: new targets.SnsTopic(topic) }); * */ export class SnsTopic implements events.IRuleTarget { diff --git a/packages/@aws-cdk/aws-events-targets/lib/sqs.ts b/packages/@aws-cdk/aws-events-targets/lib/sqs.ts index 501414ecee348..43fb9b8ed15d0 100644 --- a/packages/@aws-cdk/aws-events-targets/lib/sqs.ts +++ b/packages/@aws-cdk/aws-events-targets/lib/sqs.ts @@ -31,10 +31,10 @@ export interface SqsQueueProps { * Use an SQS Queue as a target for Amazon EventBridge rules. * * @example - * - * // publish to an SQS queue every time code is committed - * // to a CodeCommit repository - * repository.onCommit(new targets.SqsQueue(queue)); + * /// fixture=withRepoAndSqsQueue + * // publish to an SQS queue every time code is committed + * // to a CodeCommit repository + * repository.onCommit('onCommit', { target: new targets.SqsQueue(queue) }); * */ export class SqsQueue implements events.IRuleTarget { diff --git a/packages/@aws-cdk/aws-events-targets/lib/withRepoAndKinesisStream.ts-fixture b/packages/@aws-cdk/aws-events-targets/lib/withRepoAndKinesisStream.ts-fixture new file mode 100644 index 0000000000000..431f8511e02c9 --- /dev/null +++ b/packages/@aws-cdk/aws-events-targets/lib/withRepoAndKinesisStream.ts-fixture @@ -0,0 +1,23 @@ +// Fixture with packages imported, but nothing else +import { Duration, RemovalPolicy, Stack } from '@aws-cdk/core'; +import { Construct } from 'constructs'; + +import * as targets from '@aws-cdk/aws-events-targets'; +import * as events from '@aws-cdk/aws-events'; +import * as kinesis from '@aws-cdk/aws-kinesis'; +import * as codecommit from '@aws-cdk/aws-codecommit'; +import * as cdk from '@aws-cdk/core'; + +class Fixture extends Stack { + constructor(scope: Construct, id: string) { + super(scope, id); + + const repository = new codecommit.Repository(this, 'MyRepo', { + repositoryName: 'aws-cdk-events', + }); + + const stream = new kinesis.Stream(stack, 'MyStream'); + + /// here + } +} diff --git a/packages/@aws-cdk/aws-events-targets/rosetta/default.ts-fixture b/packages/@aws-cdk/aws-events-targets/rosetta/default.ts-fixture new file mode 100644 index 0000000000000..f6bf67d19e31a --- /dev/null +++ b/packages/@aws-cdk/aws-events-targets/rosetta/default.ts-fixture @@ -0,0 +1,16 @@ +// Fixture with packages imported, but nothing else +import { Duration, RemovalPolicy, Stack } from '@aws-cdk/core'; +import { Construct } from 'constructs'; + +import * as targets from '@aws-cdk/aws-events-targets'; +import * as events from '@aws-cdk/aws-events'; +import * as sqs from '@aws-cdk/aws-sqs'; +import * as cdk from '@aws-cdk/core'; + +class Fixture extends Stack { + constructor(scope: Construct, id: string) { + super(scope, id); + + /// here + } +} diff --git a/packages/@aws-cdk/aws-events-targets/rosetta/withRepoAndKinesisStream.ts-fixture b/packages/@aws-cdk/aws-events-targets/rosetta/withRepoAndKinesisStream.ts-fixture new file mode 100644 index 0000000000000..115e1ece7e254 --- /dev/null +++ b/packages/@aws-cdk/aws-events-targets/rosetta/withRepoAndKinesisStream.ts-fixture @@ -0,0 +1,23 @@ +// Fixture with packages imported, but nothing else +import { Duration, RemovalPolicy, Stack } from '@aws-cdk/core'; +import { Construct } from 'constructs'; + +import * as targets from '@aws-cdk/aws-events-targets'; +import * as events from '@aws-cdk/aws-events'; +import * as kinesis from '@aws-cdk/aws-kinesis'; +import * as codecommit from '@aws-cdk/aws-codecommit'; +import * as cdk from '@aws-cdk/core'; + +class Fixture extends Stack { + constructor(scope: Construct, id: string) { + super(scope, id); + + const repository = new codecommit.Repository(this, 'MyRepo', { + repositoryName: 'aws-cdk-events', + }); + + const stream = new kinesis.Stream(this, 'MyStream'); + + /// here + } +} diff --git a/packages/@aws-cdk/aws-events-targets/rosetta/withRepoAndSqsQueue.ts-fixture b/packages/@aws-cdk/aws-events-targets/rosetta/withRepoAndSqsQueue.ts-fixture new file mode 100644 index 0000000000000..98d029d8d8283 --- /dev/null +++ b/packages/@aws-cdk/aws-events-targets/rosetta/withRepoAndSqsQueue.ts-fixture @@ -0,0 +1,23 @@ +// Fixture with packages imported, but nothing else +import { Duration, RemovalPolicy, Stack } from '@aws-cdk/core'; +import { Construct } from 'constructs'; + +import * as targets from '@aws-cdk/aws-events-targets'; +import * as events from '@aws-cdk/aws-events'; +import * as sqs from '@aws-cdk/aws-sqs'; +import * as codecommit from '@aws-cdk/aws-codecommit'; +import * as cdk from '@aws-cdk/core'; + +class Fixture extends Stack { + constructor(scope: Construct, id: string) { + super(scope, id); + + const repository = new codecommit.Repository(this, 'MyRepo', { + repositoryName: 'aws-cdk-events', + }); + + const queue = new sqs.Queue(this, 'MyQueue'); + + /// here + } +} diff --git a/packages/@aws-cdk/aws-events-targets/rosetta/withRepoAndTopic.ts-fixture b/packages/@aws-cdk/aws-events-targets/rosetta/withRepoAndTopic.ts-fixture new file mode 100644 index 0000000000000..30c1f29cc331b --- /dev/null +++ b/packages/@aws-cdk/aws-events-targets/rosetta/withRepoAndTopic.ts-fixture @@ -0,0 +1,23 @@ +// Fixture with packages imported, but nothing else +import { Duration, RemovalPolicy, Stack } from '@aws-cdk/core'; +import { Construct } from 'constructs'; + +import * as targets from '@aws-cdk/aws-events-targets'; +import * as events from '@aws-cdk/aws-events'; +import * as sns from '@aws-cdk/aws-sns'; +import * as codecommit from '@aws-cdk/aws-codecommit'; +import * as cdk from '@aws-cdk/core'; + +class Fixture extends Stack { + constructor(scope: Construct, id: string) { + super(scope, id); + + const repository = new codecommit.Repository(this, 'MyRepo', { + repositoryName: 'aws-cdk-events', + }); + + const topic = new sns.Topic(this, 'MyTopic'); + + /// here + } +} diff --git a/packages/@aws-cdk/aws-events-targets/test/batch/batch.test.ts b/packages/@aws-cdk/aws-events-targets/test/batch/batch.test.ts index acdf2477832f5..c379b72a523c6 100644 --- a/packages/@aws-cdk/aws-events-targets/test/batch/batch.test.ts +++ b/packages/@aws-cdk/aws-events-targets/test/batch/batch.test.ts @@ -2,74 +2,239 @@ import { expect, haveResource } from '@aws-cdk/assert-internal'; import * as batch from '@aws-cdk/aws-batch'; import { ContainerImage } from '@aws-cdk/aws-ecs'; import * as events from '@aws-cdk/aws-events'; -import { Stack } from '@aws-cdk/core'; +import * as sqs from '@aws-cdk/aws-sqs'; +import { Duration, Stack } from '@aws-cdk/core'; import * as targets from '../../lib'; -test('use aws batch job as an eventrule target', () => { - // GIVEN - const stack = new Stack(); - const jobQueue = new batch.JobQueue(stack, 'MyQueue', { - computeEnvironments: [ - { - computeEnvironment: new batch.ComputeEnvironment(stack, 'ComputeEnvironment', { - managed: false, - }), - order: 1, +describe('Batch job event target', () => { + let stack: Stack; + let jobQueue: batch.JobQueue; + let jobDefinition: batch.JobDefinition; + + + beforeEach(() => { + stack = new Stack(); + jobQueue = new batch.JobQueue(stack, 'MyQueue', { + computeEnvironments: [ + { + computeEnvironment: new batch.ComputeEnvironment(stack, 'ComputeEnvironment', { + managed: false, + }), + order: 1, + }, + ], + }); + jobDefinition = new batch.JobDefinition(stack, 'MyJob', { + container: { + image: ContainerImage.fromRegistry('test-repo'), }, - ], - }); - const jobDefinition = new batch.JobDefinition(stack, 'MyJob', { - container: { - image: ContainerImage.fromRegistry('test-repo'), - }, - }); - const rule = new events.Rule(stack, 'Rule', { - schedule: events.Schedule.expression('rate(1 min)'), + }); }); - // WHEN - rule.addTarget(new targets.BatchJob(jobQueue.jobQueueArn, jobQueue, jobDefinition.jobDefinitionArn, jobDefinition)); - - // THEN - expect(stack).to(haveResource('AWS::Events::Rule', { - ScheduleExpression: 'rate(1 min)', - State: 'ENABLED', - Targets: [ - { - Arn: { - Ref: 'MyQueueE6CA6235', + test('use aws batch job as an event rule target', () => { + // GIVEN + const rule = new events.Rule(stack, 'Rule', { + schedule: events.Schedule.expression('rate(1 min)'), + }); + + // WHEN + rule.addTarget(new targets.BatchJob(jobQueue.jobQueueArn, jobQueue, jobDefinition.jobDefinitionArn, jobDefinition)); + + // THEN + expect(stack).to(haveResource('AWS::Events::Rule', { + ScheduleExpression: 'rate(1 min)', + State: 'ENABLED', + Targets: [ + { + Arn: { + Ref: 'MyQueueE6CA6235', + }, + Id: 'Target0', + RoleArn: { + 'Fn::GetAtt': [ + 'MyJobEventsRoleCF43C336', + 'Arn', + ], + }, + BatchParameters: { + JobDefinition: { Ref: 'MyJob8719E923' }, + JobName: 'Rule', + }, }, - Id: 'Target0', - RoleArn: { - 'Fn::GetAtt': [ - 'MyJobEventsRoleCF43C336', - 'Arn', - ], + ], + })); + + expect(stack).to(haveResource('AWS::IAM::Policy', { + PolicyDocument: { + Statement: [ + { + Action: 'batch:SubmitJob', + Effect: 'Allow', + Resource: [ + { Ref: 'MyJob8719E923' }, + { Ref: 'MyQueueE6CA6235' }, + ], + }, + ], + Version: '2012-10-17', + }, + Roles: [ + { Ref: 'MyJobEventsRoleCF43C336' }, + ], + })); + }); + + test('use a Dead Letter Queue for the rule target', () => { + // GIVEN + const rule = new events.Rule(stack, 'Rule', { + schedule: events.Schedule.expression('rate(1 hour)'), + }); + + const queue = new sqs.Queue(stack, 'Queue'); + + // WHEN + const eventInput = { + buildspecOverride: 'buildspecs/hourly.yml', + }; + + rule.addTarget(new targets.BatchJob( + jobQueue.jobQueueArn, + jobQueue, + jobDefinition.jobDefinitionArn, + jobDefinition, { + deadLetterQueue: queue, + event: events.RuleTargetInput.fromObject(eventInput), + }, + )); + + // THEN + expect(stack).to(haveResource('AWS::Events::Rule', { + Targets: [ + { + Arn: { + Ref: 'MyQueueE6CA6235', + }, + Id: 'Target0', + DeadLetterConfig: { + Arn: { + 'Fn::GetAtt': [ + 'Queue4A7E3555', + 'Arn', + ], + }, + }, + Input: JSON.stringify(eventInput), + RoleArn: { + 'Fn::GetAtt': ['MyJobEventsRoleCF43C336', 'Arn'], + }, + BatchParameters: { + JobDefinition: { Ref: 'MyJob8719E923' }, + JobName: 'Rule', + }, }, - BatchParameters: { - JobDefinition: { Ref: 'MyJob8719E923' }, - JobName: 'Rule', + ], + })); + + expect(stack).to(haveResource('AWS::SQS::QueuePolicy', { + PolicyDocument: { + Statement: [ + { + Action: 'sqs:SendMessage', + Condition: { + ArnEquals: { + 'aws:SourceArn': { + 'Fn::GetAtt': [ + 'Rule4C995B7F', + 'Arn', + ], + }, + }, + }, + Effect: 'Allow', + Principal: { + Service: 'events.amazonaws.com', + }, + Resource: { + 'Fn::GetAtt': [ + 'Queue4A7E3555', + 'Arn', + ], + }, + Sid: 'AllowEventRuleRule', + }, + ], + Version: '2012-10-17', + }, + Queues: [ + { + Ref: 'Queue4A7E3555', }, + ], + })); + }); + + test('specifying retry policy', () => { + // GIVEN + const rule = new events.Rule(stack, 'Rule', { + schedule: events.Schedule.expression('rate(1 hour)'), + }); + + const queue = new sqs.Queue(stack, 'Queue'); + + // WHEN + const eventInput = { + buildspecOverride: 'buildspecs/hourly.yml', + }; + + rule.addTarget(new targets.BatchJob( + jobQueue.jobQueueArn, + jobQueue, + jobDefinition.jobDefinitionArn, + jobDefinition, { + deadLetterQueue: queue, + event: events.RuleTargetInput.fromObject(eventInput), + retryAttempts: 2, + maxEventAge: Duration.hours(2), }, - ], - })); + )); - expect(stack).to(haveResource('AWS::IAM::Policy', { - PolicyDocument: { - Statement: [ + // THEN + expect(stack).to(haveResource('AWS::Events::Rule', { + ScheduleExpression: 'rate(1 hour)', + State: 'ENABLED', + Targets: [ { - Action: 'batch:SubmitJob', - Effect: 'Allow', - Resource: [ - { Ref: 'MyJob8719E923' }, - { Ref: 'MyQueueE6CA6235' }, - ], + Arn: { + Ref: 'MyQueueE6CA6235', + }, + BatchParameters: { + JobDefinition: { + Ref: 'MyJob8719E923', + }, + JobName: 'Rule', + }, + DeadLetterConfig: { + Arn: { + 'Fn::GetAtt': [ + 'Queue4A7E3555', + 'Arn', + ], + }, + }, + Id: 'Target0', + Input: JSON.stringify(eventInput), + RetryPolicy: { + MaximumEventAgeInSeconds: 7200, + MaximumRetryAttempts: 2, + }, + RoleArn: { + 'Fn::GetAtt': [ + 'MyJobEventsRoleCF43C336', + 'Arn', + ], + }, }, ], - Version: '2012-10-17', - }, - Roles: [ - { Ref: 'MyJobEventsRoleCF43C336' }, - ], - })); -}); + })); + }); +}); \ No newline at end of file diff --git a/packages/@aws-cdk/aws-events-targets/test/batch/integ.job-definition-events.expected.json b/packages/@aws-cdk/aws-events-targets/test/batch/integ.job-definition-events.expected.json index 9319f72814c38..77a8854041e1f 100644 --- a/packages/@aws-cdk/aws-events-targets/test/batch/integ.job-definition-events.expected.json +++ b/packages/@aws-cdk/aws-events-targets/test/batch/integ.job-definition-events.expected.json @@ -34,13 +34,13 @@ "ComputeEnvironmentC570994D": { "Type": "AWS::Batch::ComputeEnvironment", "Properties": { + "Type": "UNMANAGED", "ServiceRole": { "Fn::GetAtt": [ "ComputeEnvironmentResourceServiceInstanceRoleDC6D4445", "Arn" ] }, - "Type": "UNMANAGED", "State": "ENABLED" } }, @@ -164,7 +164,19 @@ }, "JobName": "batcheventsTimer232549135" }, + "DeadLetterConfig": { + "Arn": { + "Fn::GetAtt": [ + "Queue4A7E3555", + "Arn" + ] + } + }, "Id": "Target0", + "RetryPolicy": { + "MaximumEventAgeInSeconds": 7200, + "MaximumRetryAttempts": 2 + }, "RoleArn": { "Fn::GetAtt": [ "MyJobEventsRoleCF43C336", @@ -174,6 +186,50 @@ } ] } + }, + "Queue4A7E3555": { + "Type": "AWS::SQS::Queue", + "UpdateReplacePolicy": "Delete", + "DeletionPolicy": "Delete" + }, + "QueuePolicy25439813": { + "Type": "AWS::SQS::QueuePolicy", + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Action": "sqs:SendMessage", + "Condition": { + "ArnEquals": { + "aws:SourceArn": { + "Fn::GetAtt": [ + "Timer2B6F162E9", + "Arn" + ] + } + } + }, + "Effect": "Allow", + "Principal": { + "Service": "events.amazonaws.com" + }, + "Resource": { + "Fn::GetAtt": [ + "Queue4A7E3555", + "Arn" + ] + }, + "Sid": "AllowEventRulebatcheventsTimer232549135" + } + ], + "Version": "2012-10-17" + }, + "Queues": [ + { + "Ref": "Queue4A7E3555" + } + ] + } } } } \ No newline at end of file diff --git a/packages/@aws-cdk/aws-events-targets/test/batch/integ.job-definition-events.ts b/packages/@aws-cdk/aws-events-targets/test/batch/integ.job-definition-events.ts index 01d1058eb4064..a3c2fa2f01e2e 100644 --- a/packages/@aws-cdk/aws-events-targets/test/batch/integ.job-definition-events.ts +++ b/packages/@aws-cdk/aws-events-targets/test/batch/integ.job-definition-events.ts @@ -1,6 +1,7 @@ import * as batch from '@aws-cdk/aws-batch'; import { ContainerImage } from '@aws-cdk/aws-ecs'; import * as events from '@aws-cdk/aws-events'; +import * as sqs from '@aws-cdk/aws-sqs'; import * as cdk from '@aws-cdk/core'; import * as targets from '../../lib'; @@ -32,6 +33,13 @@ timer.addTarget(new targets.BatchJob(queue.jobQueueArn, queue, job.jobDefinition const timer2 = new events.Rule(stack, 'Timer2', { schedule: events.Schedule.rate(cdk.Duration.minutes(2)), }); -timer2.addTarget(new targets.BatchJob(queue.jobQueueArn, queue, job.jobDefinitionArn, job)); + +const dlq = new sqs.Queue(stack, 'Queue'); + +timer2.addTarget(new targets.BatchJob(queue.jobQueueArn, queue, job.jobDefinitionArn, job, { + deadLetterQueue: dlq, + retryAttempts: 2, + maxEventAge: cdk.Duration.hours(2), +})); app.synth();