Skip to content

Commit

Permalink
feat(events): DLQ and retry policy support for BatchJob target (#15308)
Browse files Browse the repository at this point in the history
Add DLQ and Retry policy Configuration to BatchJob targets. Using a DLQ on a rule prevents the application to loose events after all retry attempts are exhausted.
Update README.md and code examples to support Rosetta translation and compiling.

Resolves #15238

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
DaWyz committed Jul 8, 2021
1 parent bf6f7ef commit 5ecf257
Show file tree
Hide file tree
Showing 13 changed files with 486 additions and 107 deletions.
103 changes: 70 additions & 33 deletions packages/@aws-cdk/aws-events-targets/README.md
Expand Up @@ -23,7 +23,7 @@ Currently supported are:
* Publish a message to an SNS topic
* Send a message to an SQS queue
* [Start a StepFunctions state machine](#start-a-stepfunctions-state-machine)
* Queue a Batch job
* [Queue a Batch job](#queue-a-batch-job)
* Make an AWS API call
* Put a record to a Kinesis stream
* [Log an event into a LogGroup](#log-an-event-into-a-loggroup)
Expand All @@ -47,16 +47,12 @@ triggered for every events from `aws.ec2` source. You can optionally attach a
[dead letter queue](https://docs.aws.amazon.com/eventbridge/latest/userguide/rule-dlq.html).

```ts
import * as lambda from "@aws-cdk/aws-lambda";
import * as events from "@aws-cdk/aws-events";
import * as sqs from "@aws-cdk/aws-sqs";
import * as targets from "@aws-cdk/aws-events-targets";
import * as cdk from '@aws-cdk/core';
import * as lambda from '@aws-cdk/aws-lambda';

const fn = new lambda.Function(this, 'MyFunc', {
runtime: lambda.Runtime.NODEJS_12_X,
handler: 'index.handler',
code: lambda.Code.fromInline(`exports.handler = ${handler.toString()}`),
code: lambda.Code.fromInline(`exports.handler = handler.toString()`),
});

const rule = new events.Rule(this, 'rule', {
Expand All @@ -82,9 +78,7 @@ For example, the following code snippet creates an event rule with a CloudWatch
Every events sent from the `aws.ec2` source will be sent to the CloudWatch LogGroup.

```ts
import * as logs from "@aws-cdk/aws-logs";
import * as events from "@aws-cdk/aws-events";
import * as targets from "@aws-cdk/aws-events-targets";
import * as logs from '@aws-cdk/aws-logs';

const logGroup = new logs.LogGroup(this, 'MyLogGroup', {
logGroupName: 'MyLogGroup',
Expand All @@ -108,10 +102,8 @@ on commit to the master branch. You can optionally attach a
[dead letter queue](https://docs.aws.amazon.com/eventbridge/latest/userguide/rule-dlq.html).

```ts
import * as codebuild from '@aws-sdk/aws-codebuild';
import * as codecommit from '@aws-sdk/aws-codecommit';
import * as sqs from '@aws-sdk/aws-sqs';
import * as targets from "@aws-cdk/aws-events-targets";
import * as codebuild from '@aws-cdk/aws-codebuild';
import * as codecommit from '@aws-cdk/aws-codecommit';

const repo = new codecommit.Repository(this, 'MyRepo', {
repositoryName: 'aws-cdk-codebuild-events',
Expand Down Expand Up @@ -139,12 +131,11 @@ Use the `CodePipeline` target to trigger a CodePipeline pipeline.
The code snippet below creates a CodePipeline pipeline that is triggered every hour

```ts
import * as codepipeline from '@aws-sdk/aws-codepipeline';
import * as sqs from '@aws-sdk/aws-sqs';
import * as codepipeline from '@aws-cdk/aws-codepipeline';

const pipeline = new codepipeline.Pipeline(this, 'Pipeline');

const rule = new events.Rule(stack, 'Rule', {
const rule = new events.Rule(this, 'Rule', {
schedule: events.Schedule.expression('rate(1 hour)'),
});

Expand All @@ -162,22 +153,20 @@ You can optionally attach a
to the target.

```ts
import * as iam from '@aws-sdk/aws-iam';
import * as sqs from '@aws-sdk/aws-sqs';
import * as iam from '@aws-cdk/aws-iam';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as targets from "@aws-cdk/aws-events-targets";

const rule = new events.Rule(stack, 'Rule', {
const rule = new events.Rule(this, 'Rule', {
schedule: events.Schedule.rate(cdk.Duration.minutes(1)),
});

const dlq = new sqs.Queue(stack, 'DeadLetterQueue');
const dlq = new sqs.Queue(this, 'DeadLetterQueue');

const role = new iam.Role(stack, 'Role', {
const role = new iam.Role(this, 'Role', {
assumedBy: new iam.ServicePrincipal('events.amazonaws.com'),
});
const stateMachine = new sfn.StateMachine(stack, 'SM', {
definition: new sfn.Wait(stack, 'Hello', { time: sfn.WaitTime.duration(cdk.Duration.seconds(10)) }),
const stateMachine = new sfn.StateMachine(this, 'SM', {
definition: new sfn.Wait(this, 'Hello', { time: sfn.WaitTime.duration(cdk.Duration.seconds(10)) }),
role,
});

Expand All @@ -187,19 +176,67 @@ rule.addTarget(new targets.SfnStateMachine(stateMachine, {
}));
```

## Queue a Batch job

Use the `BatchJob` target to queue a Batch job.

The code snippet below creates a Simple JobQueue that is triggered every hour with a
dummy object as input.
You can optionally attach a
[dead letter queue](https://docs.aws.amazon.com/eventbridge/latest/userguide/rule-dlq.html)
to the target.

```ts
import * as batch from '@aws-cdk/aws-batch';
import { ContainerImage } from '@aws-cdk/aws-ecs';

const jobQueue = new batch.JobQueue(this, 'MyQueue', {
computeEnvironments: [
{
computeEnvironment: new batch.ComputeEnvironment(this, 'ComputeEnvironment', {
managed: false,
}),
order: 1,
},
],
});

const jobDefinition = new batch.JobDefinition(this, 'MyJob', {
container: {
image: ContainerImage.fromRegistry('test-repo'),
},
});

const queue = new sqs.Queue(this, 'Queue');

const rule = new events.Rule(this, 'Rule', {
schedule: events.Schedule.rate(cdk.Duration.hours(1)),
});

rule.addTarget(new targets.BatchJob(
jobQueue.jobQueueArn,
jobQueue,
jobDefinition.jobDefinitionArn,
jobDefinition, {
deadLetterQueue: queue,
event: events.RuleTargetInput.fromObject({ SomeParam: 'SomeValue' }),
retryAttempts: 2,
maxEventAge: cdk.Duration.hours(2),
},
));
```

## Invoke a API Gateway REST API

Use the `ApiGateway` target to trigger a REST API.

The code snippet below creates a Api Gateway REST API that is invoked every hour.

```typescript
import * as iam from '@aws-sdk/aws-iam';
import * as sqs from '@aws-sdk/aws-sqs';
```ts
import * as api from '@aws-cdk/aws-apigateway';
import * as targets from "@aws-cdk/aws-events-targets";
import * as lambda from '@aws-cdk/aws-lambda';

const rule = new events.Rule(stack, 'Rule', {
const rule = new events.Rule(this, 'Rule', {
schedule: events.Schedule.rate(cdk.Duration.minutes(1)),
});

Expand All @@ -211,12 +248,12 @@ const fn = new lambda.Function( this, 'MyFunc', {

const restApi = new api.LambdaRestApi( this, 'MyRestAPI', { handler: fn } );

const dlq = new sqs.Queue(stack, 'DeadLetterQueue');
const dlq = new sqs.Queue(this, 'DeadLetterQueue');

rule.addTarget(
new targets.ApiGateway( restApi, {
path: '/*/test',
mehod: 'GET',
method: 'GET',
stage: 'prod',
pathParameterValues: ['path-value'],
headerParameters: {
Expand All @@ -225,7 +262,7 @@ rule.addTarget(
queryStringParameters: {
QueryParam1: 'query-param-1',
},
deadLetterQueue: queue
deadLetterQueue: dlq
} ),
)
```
9 changes: 7 additions & 2 deletions packages/@aws-cdk/aws-events-targets/lib/batch.ts
@@ -1,12 +1,12 @@
import * as events from '@aws-cdk/aws-events';
import * as iam from '@aws-cdk/aws-iam';
import { Names, IConstruct } from '@aws-cdk/core';
import { singletonEventRole } from './util';
import { addToDeadLetterQueueResourcePolicy, bindBaseTargetConfig, singletonEventRole, TargetBaseProps } from './util';

/**
* Customize the Batch Job Event Target
*/
export interface BatchJobProps {
export interface BatchJobProps extends TargetBaseProps {
/**
* The event to send to the Lambda
*
Expand Down Expand Up @@ -83,7 +83,12 @@ export class BatchJob implements events.IRuleTarget {
retryStrategy: this.props.attempts ? { attempts: this.props.attempts } : undefined,
};

if (this.props.deadLetterQueue) {
addToDeadLetterQueueResourcePolicy(rule, this.props.deadLetterQueue);
}

return {
...bindBaseTargetConfig(this.props),
arn: this.jobQueueArn,
// When scoping resource-level access for job submission, you must provide both job queue and job definition resource types.
// https://docs.aws.amazon.com/batch/latest/userguide/ExamplePolicies_BATCH.html#iam-example-restrict-job-def
Expand Down
8 changes: 4 additions & 4 deletions packages/@aws-cdk/aws-events-targets/lib/kinesis-stream.ts
Expand Up @@ -29,10 +29,10 @@ export interface KinesisStreamProps {
* Use a Kinesis Stream as a target for AWS CloudWatch event rules.
*
* @example
*
* // put to a Kinesis stream every time code is committed
* // to a CodeCommit repository
* repository.onCommit(new targets.KinesisStream(stream));
* /// fixture=withRepoAndKinesisStream
* // put to a Kinesis stream every time code is committed
* // to a CodeCommit repository
* repository.onCommit('onCommit', { target: new targets.KinesisStream(stream) });
*
*/
export class KinesisStream implements events.IRuleTarget {
Expand Down
8 changes: 4 additions & 4 deletions packages/@aws-cdk/aws-events-targets/lib/sns.ts
Expand Up @@ -18,10 +18,10 @@ export interface SnsTopicProps {
* Use an SNS topic as a target for Amazon EventBridge rules.
*
* @example
*
* // publish to an SNS topic every time code is committed
* // to a CodeCommit repository
* repository.onCommit(new targets.SnsTopic(topic));
* /// fixture=withRepoAndTopic
* // publish to an SNS topic every time code is committed
* // to a CodeCommit repository
* repository.onCommit('onCommit', { target: new targets.SnsTopic(topic) });
*
*/
export class SnsTopic implements events.IRuleTarget {
Expand Down
8 changes: 4 additions & 4 deletions packages/@aws-cdk/aws-events-targets/lib/sqs.ts
Expand Up @@ -31,10 +31,10 @@ export interface SqsQueueProps {
* Use an SQS Queue as a target for Amazon EventBridge rules.
*
* @example
*
* // publish to an SQS queue every time code is committed
* // to a CodeCommit repository
* repository.onCommit(new targets.SqsQueue(queue));
* /// fixture=withRepoAndSqsQueue
* // publish to an SQS queue every time code is committed
* // to a CodeCommit repository
* repository.onCommit('onCommit', { target: new targets.SqsQueue(queue) });
*
*/
export class SqsQueue implements events.IRuleTarget {
Expand Down
@@ -0,0 +1,23 @@
// Fixture with packages imported, but nothing else
import { Duration, RemovalPolicy, Stack } from '@aws-cdk/core';
import { Construct } from 'constructs';

import * as targets from '@aws-cdk/aws-events-targets';
import * as events from '@aws-cdk/aws-events';
import * as kinesis from '@aws-cdk/aws-kinesis';
import * as codecommit from '@aws-cdk/aws-codecommit';
import * as cdk from '@aws-cdk/core';

class Fixture extends Stack {
constructor(scope: Construct, id: string) {
super(scope, id);

const repository = new codecommit.Repository(this, 'MyRepo', {
repositoryName: 'aws-cdk-events',
});

const stream = new kinesis.Stream(stack, 'MyStream');

/// here
}
}
16 changes: 16 additions & 0 deletions packages/@aws-cdk/aws-events-targets/rosetta/default.ts-fixture
@@ -0,0 +1,16 @@
// Fixture with packages imported, but nothing else
import { Duration, RemovalPolicy, Stack } from '@aws-cdk/core';
import { Construct } from 'constructs';

import * as targets from '@aws-cdk/aws-events-targets';
import * as events from '@aws-cdk/aws-events';
import * as sqs from '@aws-cdk/aws-sqs';
import * as cdk from '@aws-cdk/core';

class Fixture extends Stack {
constructor(scope: Construct, id: string) {
super(scope, id);

/// here
}
}
@@ -0,0 +1,23 @@
// Fixture with packages imported, but nothing else
import { Duration, RemovalPolicy, Stack } from '@aws-cdk/core';
import { Construct } from 'constructs';

import * as targets from '@aws-cdk/aws-events-targets';
import * as events from '@aws-cdk/aws-events';
import * as kinesis from '@aws-cdk/aws-kinesis';
import * as codecommit from '@aws-cdk/aws-codecommit';
import * as cdk from '@aws-cdk/core';

class Fixture extends Stack {
constructor(scope: Construct, id: string) {
super(scope, id);

const repository = new codecommit.Repository(this, 'MyRepo', {
repositoryName: 'aws-cdk-events',
});

const stream = new kinesis.Stream(this, 'MyStream');

/// here
}
}
@@ -0,0 +1,23 @@
// Fixture with packages imported, but nothing else
import { Duration, RemovalPolicy, Stack } from '@aws-cdk/core';
import { Construct } from 'constructs';

import * as targets from '@aws-cdk/aws-events-targets';
import * as events from '@aws-cdk/aws-events';
import * as sqs from '@aws-cdk/aws-sqs';
import * as codecommit from '@aws-cdk/aws-codecommit';
import * as cdk from '@aws-cdk/core';

class Fixture extends Stack {
constructor(scope: Construct, id: string) {
super(scope, id);

const repository = new codecommit.Repository(this, 'MyRepo', {
repositoryName: 'aws-cdk-events',
});

const queue = new sqs.Queue(this, 'MyQueue');

/// here
}
}
@@ -0,0 +1,23 @@
// Fixture with packages imported, but nothing else
import { Duration, RemovalPolicy, Stack } from '@aws-cdk/core';
import { Construct } from 'constructs';

import * as targets from '@aws-cdk/aws-events-targets';
import * as events from '@aws-cdk/aws-events';
import * as sns from '@aws-cdk/aws-sns';
import * as codecommit from '@aws-cdk/aws-codecommit';
import * as cdk from '@aws-cdk/core';

class Fixture extends Stack {
constructor(scope: Construct, id: string) {
super(scope, id);

const repository = new codecommit.Repository(this, 'MyRepo', {
repositoryName: 'aws-cdk-events',
});

const topic = new sns.Topic(this, 'MyTopic');

/// here
}
}

0 comments on commit 5ecf257

Please sign in to comment.