Skip to content

Commit

Permalink
feat(lambda): event-source maxBatchingWindow property (#4260)
Browse files Browse the repository at this point in the history
* feat(lambda): event-source maximumBatchingWindow property

* fix: remove duplicate import

* revert: sqs doesn't support maximumBatchingWindow

* chore: set @default

* fix: missing import

* chore: refactor maximum -> max

* chore: refactor kinesis and dynamo in common stream class

* chore: remove space

* chore: fix method order

* chore: refactor StreamingEventSourceProps -> StreamEventSourceProps

* chore: quote fix

* fix: switch stream.ts to @internal

* chore: move batchSize check back to children classes

* chore: more quote fixes

* chore: refactor enrichMappingOptions

* fix: abstract bind method

* chore: remove trailing whitespace
  • Loading branch information
nmussy authored and mergify[bot] committed Sep 30, 2019
1 parent 3917c4b commit 4040032
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 48 deletions.
32 changes: 9 additions & 23 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/dynamodb.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,17 @@
import dynamodb = require('@aws-cdk/aws-dynamodb');
import lambda = require('@aws-cdk/aws-lambda');
import {StreamEventSource, StreamEventSourceProps} from './stream';

export interface DynamoEventSourceProps {
/**
* The largest number of records that AWS Lambda will retrieve from your event
* source at the time of invoking your function. Your function receives an
* event with all the retrieved records.
*
* Valid Range: Minimum value of 1. Maximum value of 1000.
*
* @default 100
*/
readonly batchSize?: number;

/**
* Where to begin consuming the DynamoDB stream.
*/
readonly startingPosition: lambda.StartingPosition;
export interface DynamoEventSourceProps extends StreamEventSourceProps {
}

/**
* Use an Amazon DynamoDB stream as an event source for AWS Lambda.
*/
export class DynamoEventSource implements lambda.IEventSource {
constructor(private readonly table: dynamodb.Table, private readonly props: DynamoEventSourceProps) {
export class DynamoEventSource extends StreamEventSource {
constructor(private readonly table: dynamodb.Table, props: DynamoEventSourceProps) {
super(props);

if (this.props.batchSize !== undefined && (this.props.batchSize < 1 || this.props.batchSize > 1000)) {
throw new Error(`Maximum batch size must be between 1 and 1000 inclusive (given ${this.props.batchSize})`);
}
Expand All @@ -34,11 +22,9 @@ export class DynamoEventSource implements lambda.IEventSource {
throw new Error(`DynamoDB Streams must be enabled on the table ${this.table.node.path}`);
}

target.addEventSourceMapping(`DynamoDBEventSource:${this.table.node.uniqueId}`, {
batchSize: this.props.batchSize || 100,
eventSourceArn: this.table.tableStreamArn,
startingPosition: this.props.startingPosition
});
target.addEventSourceMapping(`DynamoDBEventSource:${this.table.node.uniqueId}`,
this.enrichMappingOptions({eventSourceArn: this.table.tableStreamArn})
);

this.table.grantStreamRead(target);
dynamodb.Table.grantListStreams(target);
Expand Down
34 changes: 10 additions & 24 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/kinesis.ts
Original file line number Diff line number Diff line change
@@ -1,41 +1,27 @@
import kinesis = require('@aws-cdk/aws-kinesis');
import lambda = require('@aws-cdk/aws-lambda');
import {StreamEventSource, StreamEventSourceProps} from './stream';

export interface KinesisEventSourceProps {
/**
* The largest number of records that AWS Lambda will retrieve from your event
* source at the time of invoking your function. Your function receives an
* event with all the retrieved records.
*
* Valid Range: Minimum value of 1. Maximum value of 10000.
*
* @default 100
*/
readonly batchSize?: number;

/**
* Where to begin consuming the Kinesis stream.
*/
readonly startingPosition: lambda.StartingPosition;
export interface KinesisEventSourceProps extends StreamEventSourceProps {
}

/**
* Use an Amazon Kinesis stream as an event source for AWS Lambda.
*/
export class KinesisEventSource implements lambda.IEventSource {
constructor(readonly stream: kinesis.IStream, private readonly props: KinesisEventSourceProps) {
export class KinesisEventSource extends StreamEventSource {
constructor(readonly stream: kinesis.IStream, props: KinesisEventSourceProps) {
super(props);

if (this.props.batchSize !== undefined && (this.props.batchSize < 1 || this.props.batchSize > 10000)) {
throw new Error(`Maximum batch size must be between 1 and 10000 inclusive (given ${this.props.batchSize})`);
}
}

public bind(target: lambda.IFunction) {
target.addEventSourceMapping(`KinesisEventSource:${this.stream.node.uniqueId}`, {
batchSize: this.props.batchSize || 100,
startingPosition: this.props.startingPosition,
eventSourceArn: this.stream.streamArn,
});
target.addEventSourceMapping(`KinesisEventSource:${this.stream.node.uniqueId}`,
this.enrichMappingOptions({eventSourceArn: this.stream.streamArn})
);

this.stream.grantRead(target);
}
}
}
52 changes: 52 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import lambda = require('@aws-cdk/aws-lambda');
import {Duration} from '@aws-cdk/core';

/**
* @internal
*/
export interface StreamEventSourceProps {
/**
* The largest number of records that AWS Lambda will retrieve from your event
* source at the time of invoking your function. Your function receives an
* event with all the retrieved records.
*
* Valid Range: Minimum value of 1. Maximum value of 10000.
*
* @default 100
*/
readonly batchSize?: number;

/**
* Where to begin consuming the stream.
*/
readonly startingPosition: lambda.StartingPosition;

/**
* The maximum amount of time to gather records before invoking the function.
* Maximum of Duration.minutes(5)
*
* @default Duration.seconds(0)
*/
readonly maxBatchingWindow?: Duration;
}

/**
* Use an stream as an event source for AWS Lambda.
*
* @internal
*/
export abstract class StreamEventSource implements lambda.IEventSource {
protected constructor(protected readonly props: StreamEventSourceProps) {
}

public abstract bind(_target: lambda.IFunction): void;

protected enrichMappingOptions(options: lambda.EventSourceMappingOptions): lambda.EventSourceMappingOptions {
return {
...options,
batchSize: this.props.batchSize || 100,
startingPosition: this.props.startingPosition,
maxBatchingWindow: this.props.maxBatchingWindow,
};
}
}
59 changes: 59 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,63 @@ export = {

test.done();
},

'specific maxBatchingWindow'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const table = new dynamodb.Table(stack, 'T', {
partitionKey: {
name: 'id',
type: dynamodb.AttributeType.STRING
},
stream: dynamodb.StreamViewType.NEW_IMAGE
});

// WHEN
fn.addEventSource(new sources.DynamoEventSource(table, {
maxBatchingWindow: cdk.Duration.minutes(2),
startingPosition: lambda.StartingPosition.LATEST
}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
"EventSourceArn": {
"Fn::GetAtt": [
"TD925BC7E",
"StreamArn"
]
},
"FunctionName": {
"Ref": "Fn9270CBC0"
},
"MaximumBatchingWindowInSeconds": 120,
"StartingPosition": "LATEST"
}));

test.done();
},

'throws if maxBatchingWindow > 300 seconds'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const table = new dynamodb.Table(stack, 'T', {
partitionKey: {
name: 'id',
type: dynamodb.AttributeType.STRING
},
stream: dynamodb.StreamViewType.NEW_IMAGE
});

// THEN
test.throws(() =>
fn.addEventSource(new sources.DynamoEventSource(table, {
maxBatchingWindow: cdk.Duration.seconds(301),
startingPosition: lambda.StartingPosition.LATEST
})), /maxBatchingWindow cannot be over 300 seconds/);

test.done();
},

};
30 changes: 30 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.kinesis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,34 @@ export = {

test.done();
},

'specific maxBatchingWindow'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const stream = new kinesis.Stream(stack, 'S');

// WHEN
fn.addEventSource(new sources.KinesisEventSource(stream, {
maxBatchingWindow: cdk.Duration.minutes(2),
startingPosition: lambda.StartingPosition.LATEST
}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
"EventSourceArn": {
"Fn::GetAtt": [
"S509448A1",
"Arn"
]
},
"FunctionName": {
"Ref": "Fn9270CBC0"
},
"MaximumBatchingWindowInSeconds": 120,
"StartingPosition": "LATEST"
}));

test.done();
},
};
15 changes: 14 additions & 1 deletion packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,15 @@ export interface EventSourceMappingOptions {
*
* @default - Required for Amazon Kinesis and Amazon DynamoDB Streams sources.
*/
readonly startingPosition?: StartingPosition
readonly startingPosition?: StartingPosition;

/**
* The maximum amount of time to gather records before invoking the function.
* Maximum of Duration.minutes(5)
*
* @default Duration.seconds(0)
*/
readonly maxBatchingWindow?: cdk.Duration;
}

export interface EventSourceMappingProps extends EventSourceMappingOptions {
Expand All @@ -63,12 +71,17 @@ export class EventSourceMapping extends Resource {
constructor(scope: cdk.Construct, id: string, props: EventSourceMappingProps) {
super(scope, id);

if (props.maxBatchingWindow && props.maxBatchingWindow.toSeconds() > 300) {
throw new Error(`maxBatchingWindow cannot be over 300 seconds, got ${props.maxBatchingWindow.toSeconds()}`);
}

new CfnEventSourceMapping(this, 'Resource', {
batchSize: props.batchSize,
enabled: props.enabled,
eventSourceArn: props.eventSourceArn,
functionName: props.target.functionName,
startingPosition: props.startingPosition,
maximumBatchingWindowInSeconds: props.maxBatchingWindow && props.maxBatchingWindow.toSeconds(),
});
}
}
Expand Down

0 comments on commit 4040032

Please sign in to comment.