/
stream.ts
137 lines (123 loc) · 3.82 KB
/
stream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import * as lambda from '@aws-cdk/aws-lambda';
import { Duration } from '@aws-cdk/core';
/**
* The set of properties for event sources that follow the streaming model,
* such as, Dynamo, Kinesis and Kafka.
*/
export interface BaseStreamEventSourceProps{
/**
* The largest number of records that AWS Lambda will retrieve from your event
* source at the time of invoking your function. Your function receives an
* event with all the retrieved records.
*
* Valid Range:
* * Minimum value of 1
* * Maximum value of:
* * 1000 for {@link DynamoEventSource}
* * 10000 for {@link KinesisEventSource}, {@link ManagedKafkaEventSource} and {@link SelfManagedKafkaEventSource}
*
* @default 100
*/
readonly batchSize?: number;
/**
* An Amazon SQS queue or Amazon SNS topic destination for discarded records.
*
* @default discarded records are ignored
*/
readonly onFailure?: lambda.IEventSourceDlq;
/**
* Where to begin consuming the stream.
*/
readonly startingPosition: lambda.StartingPosition;
/**
* The maximum amount of time to gather records before invoking the function.
* Maximum of Duration.minutes(5)
*
* @default Duration.seconds(0)
*/
readonly maxBatchingWindow?: Duration;
/**
* If the stream event source mapping should be enabled.
*
* @default true
*/
readonly enabled?: boolean;
}
/**
* The set of properties for event sources that follow the streaming model,
* such as, Dynamo, Kinesis.
*/
export interface StreamEventSourceProps extends BaseStreamEventSourceProps {
/**
* If the function returns an error, split the batch in two and retry.
*
* @default false
*/
readonly bisectBatchOnError?: boolean;
/**
* The maximum age of a record that Lambda sends to a function for processing.
* Valid Range:
* * Minimum value of 60 seconds
* * Maximum value of 7 days
*
* @default - the retention period configured on the stream
*/
readonly maxRecordAge?: Duration;
/**
* Maximum number of retry attempts
* Valid Range:
* * Minimum value of 0
* * Maximum value of 10000
*
* @default - retry until the record expires
*/
readonly retryAttempts?: number;
/**
* The number of batches to process from each shard concurrently.
* Valid Range:
* * Minimum value of 1
* * Maximum value of 10
*
* @default 1
*/
readonly parallelizationFactor?: number;
/**
* Allow functions to return partially successful responses for a batch of records.
*
* @see https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting
*
* @default false
*/
readonly reportBatchItemFailures?: boolean;
/**
* The size of the tumbling windows to group records sent to DynamoDB or Kinesis
* Valid Range: 0 - 15 minutes
*
* @default - None
*/
readonly tumblingWindow?: Duration;
}
/**
* Use an stream as an event source for AWS Lambda.
*/
export abstract class StreamEventSource implements lambda.IEventSource {
protected constructor(protected readonly props: StreamEventSourceProps) {
}
public abstract bind(_target: lambda.IFunction): void;
protected enrichMappingOptions(options: lambda.EventSourceMappingOptions): lambda.EventSourceMappingOptions {
return {
...options,
batchSize: this.props.batchSize || 100,
bisectBatchOnError: this.props.bisectBatchOnError,
startingPosition: this.props.startingPosition,
reportBatchItemFailures: this.props.reportBatchItemFailures,
maxBatchingWindow: this.props.maxBatchingWindow,
maxRecordAge: this.props.maxRecordAge,
retryAttempts: this.props.retryAttempts,
parallelizationFactor: this.props.parallelizationFactor,
onFailure: this.props.onFailure,
tumblingWindow: this.props.tumblingWindow,
enabled: this.props.enabled,
};
}
}