Skip to content

Commit

Permalink
feat(lambda-event-sources): tumbling window (#13412)
Browse files Browse the repository at this point in the history
fixes #13411

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
msimpsonnz committed Mar 26, 2021
1 parent 88f2c5a commit e9f2773
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 1 deletion.
2 changes: 2 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ and add it to your Lambda function. The following parameters will impact Amazon
* __parallelizationFactor__: The number of batches to concurrently process on each shard.
* __retryAttempts__: The maximum number of times a record should be retried in the event of failure.
* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all records that arrived prior to attaching the event source.
* __tumblingWindow__: The duration in seconds of a processing window when using streams.
* __enabled__: If the DynamoDB Streams event source mapping should be enabled. The default is true.

```ts
Expand Down Expand Up @@ -192,6 +193,7 @@ behavior:
* __parallelizationFactor__: The number of batches to concurrently process on each shard.
* __retryAttempts__: The maximum number of times a record should be retried in the event of failure.
* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all records that arrived prior to attaching the event source.
* __tumblingWindow__: The duration in seconds of a processing window when using streams.
* __enabled__: If the DynamoDB Streams event source mapping should be enabled. The default is true.

```ts
Expand Down
9 changes: 9 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ export interface StreamEventSourceProps {
*/
readonly maxBatchingWindow?: Duration;

/**
* The size of the tumbling windows to group records sent to DynamoDB or Kinesis
* Valid Range: 0 - 15 minutes
*
* @default - None
*/
readonly tumblingWindow?: Duration;

/**
* If the stream event source mapping should be enabled.
*
Expand Down Expand Up @@ -106,6 +114,7 @@ export abstract class StreamEventSource implements lambda.IEventSource {
retryAttempts: this.props.retryAttempts,
parallelizationFactor: this.props.parallelizationFactor,
onFailure: this.props.onFailure,
tumblingWindow: this.props.tumblingWindow,
enabled: this.props.enabled,
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
"StreamArn"
]
},
"TumblingWindowInSeconds": 60,
"StartingPosition": "TRIM_HORIZON"
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class DynamoEventSourceTest extends cdk.Stack {
fn.addEventSource(new DynamoEventSource(queue, {
batchSize: 5,
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
tumblingWindow: cdk.Duration.seconds(60),
}));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
"Arn"
]
},
"TumblingWindowInSeconds": 60,
"StartingPosition": "TRIM_HORIZON"
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class KinesisEventSourceTest extends cdk.Stack {

fn.addEventSource(new KinesisEventSource(stream, {
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
tumblingWindow: cdk.Duration.seconds(60),
}));
}
}
Expand Down
27 changes: 27 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,33 @@ export = {
test.done();
},

'specific tumblingWindow'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const table = new dynamodb.Table(stack, 'T', {
partitionKey: {
name: 'id',
type: dynamodb.AttributeType.STRING,
},
stream: dynamodb.StreamViewType.NEW_IMAGE,
});

// WHEN
fn.addEventSource(new sources.DynamoEventSource(table, {
batchSize: 50,
startingPosition: lambda.StartingPosition.LATEST,
tumblingWindow: cdk.Duration.seconds(60),
}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
TumblingWindowInSeconds: 60,
}));

test.done();
},

'specific batch size'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
Expand Down
32 changes: 32 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.kinesis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,38 @@ export = {
test.done();
},

'specific tumblingWindowInSeconds'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const stream = new kinesis.Stream(stack, 'S');

// WHEN
fn.addEventSource(new sources.KinesisEventSource(stream, {
batchSize: 50,
startingPosition: lambda.StartingPosition.LATEST,
tumblingWindow: cdk.Duration.seconds(60),
}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
'EventSourceArn': {
'Fn::GetAtt': [
'S509448A1',
'Arn',
],
},
'FunctionName': {
'Ref': 'Fn9270CBC0',
},
'BatchSize': 50,
'StartingPosition': 'LATEST',
'TumblingWindowInSeconds': 60,
}));

test.done();
},

'specific batch size'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
Expand Down
16 changes: 16 additions & 0 deletions packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,17 @@ export interface EventSourceMappingOptions {
*/
readonly kafkaTopic?: string;

/**
* The size of the tumbling windows to group records sent to DynamoDB or Kinesis
*
* @see https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-windows
*
* Valid Range: 0 - 15 minutes
*
* @default - None
*/
readonly tumblingWindow?: cdk.Duration;

/**
* A list of host and port pairs that are the addresses of the Kafka brokers in a self managed "bootstrap" Kafka cluster
* that a Kafka client connects to initially to bootstrap itself.
Expand Down Expand Up @@ -269,6 +280,10 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp
}
});

if (props.tumblingWindow && !cdk.Token.isUnresolved(props.tumblingWindow) && props.tumblingWindow.toSeconds() > 900) {
throw new Error(`tumblingWindow cannot be over 900 seconds, got ${props.tumblingWindow.toSeconds()}`);
}


let destinationConfig;

Expand Down Expand Up @@ -296,6 +311,7 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp
maximumRetryAttempts: props.retryAttempts,
parallelizationFactor: props.parallelizationFactor,
topics: props.kafkaTopic !== undefined ? [props.kafkaTopic] : undefined,
tumblingWindowInSeconds: props.tumblingWindow?.toSeconds(),
sourceAccessConfigurations: props.sourceAccessConfigurations?.map((o) => {return { type: o.type.type, uri: o.uri };}),
selfManagedEventSource,
});
Expand Down
33 changes: 32 additions & 1 deletion packages/@aws-cdk/aws-lambda/test/event-source-mapping.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,4 +261,35 @@ describe('event source mapping', () => {
SelfManagedEventSource: { Endpoints: { KafkaBootstrapServers: kafkaBootstrapServers } },
});
});
});

test('throws if tumblingWindow > 900 seconds', () => {
const stack = new cdk.Stack();
const fn = new Function(stack, 'fn', {
handler: 'index.handler',
code: Code.fromInline('exports.handler = ${handler.toString()}'),
runtime: Runtime.NODEJS_10_X,
});

expect(() => new EventSourceMapping(stack, 'test', {
target: fn,
eventSourceArn: '',
tumblingWindow: cdk.Duration.seconds(901),
})).toThrow(/tumblingWindow cannot be over 900 seconds/);
});

test('accepts if tumblingWindow is a token', () => {
const stack = new cdk.Stack();
const fn = new Function(stack, 'fn', {
handler: 'index.handler',
code: Code.fromInline('exports.handler = ${handler.toString()}'),
runtime: Runtime.NODEJS_10_X,
});
const lazyDuration = cdk.Duration.seconds(cdk.Lazy.number({ produce: () => 60 }));

new EventSourceMapping(stack, 'test', {
target: fn,
eventSourceArn: '',
tumblingWindow: lazyDuration,
});
});
});

0 comments on commit e9f2773

Please sign in to comment.