/
event-source-mapping.ts
209 lines (183 loc) · 6.44 KB
/
event-source-mapping.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
import * as cdk from '@aws-cdk/core';
import { IEventSourceDlq } from './dlq';
import { IFunction } from './function-base';
import { CfnEventSourceMapping } from './lambda.generated';
export interface EventSourceMappingOptions {
/**
* The Amazon Resource Name (ARN) of the event source. Any record added to
* this stream can invoke the Lambda function.
*/
readonly eventSourceArn: string;
/**
* The largest number of records that AWS Lambda will retrieve from your event
* source at the time of invoking your function. Your function receives an
* event with all the retrieved records.
*
* Valid Range: Minimum value of 1. Maximum value of 10000.
*
* @default - Amazon Kinesis and Amazon DynamoDB is 100 records.
* Both the default and maximum for Amazon SQS are 10 messages.
*/
readonly batchSize?: number;
/**
* If the function returns an error, split the batch in two and retry.
*
* @default false
*/
readonly bisectBatchOnError?: boolean;
/**
* An Amazon SQS queue or Amazon SNS topic destination for discarded records.
*
* @default discarded records are ignored
*/
readonly onFailure?: IEventSourceDlq;
/**
* Set to false to disable the event source upon creation.
*
* @default true
*/
readonly enabled?: boolean;
/**
* The position in the DynamoDB or Kinesis stream where AWS Lambda should
* start reading.
*
* @see https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#Kinesis-GetShardIterator-request-ShardIteratorType
*
* @default - Required for Amazon Kinesis and Amazon DynamoDB Streams sources.
*/
readonly startingPosition?: StartingPosition;
/**
* The maximum amount of time to gather records before invoking the function.
* Maximum of Duration.minutes(5)
*
* @default Duration.seconds(0)
*/
readonly maxBatchingWindow?: cdk.Duration;
/**
* The maximum age of a record that Lambda sends to a function for processing.
* Valid Range:
* * Minimum value of 60 seconds
* * Maximum value of 7 days
*
* @default Duration.days(7)
*/
readonly maxRecordAge?: cdk.Duration;
/**
* The maximum number of times to retry when the function returns an error.
*
* Valid Range:
* * Minimum value of 0
* * Maximum value of 10000
*
* @default 10000
*/
readonly retryAttempts?: number;
/**
* The number of batches to process from each shard concurrently.
* Valid Range:
* * Minimum value of 1
* * Maximum value of 10
*
* @default 1
*/
readonly parallelizationFactor?: number;
}
/**
* Properties for declaring a new event source mapping.
*/
export interface EventSourceMappingProps extends EventSourceMappingOptions {
/**
* The target AWS Lambda function.
*/
readonly target: IFunction;
}
/**
* Represents an event source mapping for a lambda function.
* @see https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html
*/
export interface IEventSourceMapping extends cdk.IResource {
/**
* The identifier for this EventSourceMapping
* @attribute
*/
readonly eventSourceMappingId: string;
}
/**
* Defines a Lambda EventSourceMapping resource.
*
* Usually, you won't need to define the mapping yourself. This will usually be done by
* event sources. For example, to add an SQS event source to a function:
*
* import { SqsEventSource } from '@aws-cdk/aws-lambda-event-sources';
* lambda.addEventSource(new SqsEventSource(sqs));
*
* The `SqsEventSource` class will automatically create the mapping, and will also
* modify the Lambda's execution role so it can consume messages from the queue.
*/
export class EventSourceMapping extends cdk.Resource implements IEventSourceMapping {
/**
* Import an event source into this stack from its event source id.
*/
public static fromEventSourceMappingId(scope: cdk.Construct, id: string, eventSourceMappingId: string): IEventSourceMapping {
class Import extends cdk.Resource implements IEventSourceMapping {
public readonly eventSourceMappingId = eventSourceMappingId;
}
return new Import(scope, id);
}
public readonly eventSourceMappingId: string;
constructor(scope: cdk.Construct, id: string, props: EventSourceMappingProps) {
super(scope, id);
if (props.maxBatchingWindow && props.maxBatchingWindow.toSeconds() > 300) {
throw new Error(`maxBatchingWindow cannot be over 300 seconds, got ${props.maxBatchingWindow.toSeconds()}`);
}
if (props.maxRecordAge && (props.maxRecordAge.toSeconds() < 60 || props.maxRecordAge.toDays({ integral: false }) > 7)) {
throw new Error('maxRecordAge must be between 60 seconds and 7 days inclusive');
}
props.retryAttempts !== undefined && cdk.withResolved(props.retryAttempts, (attempts) => {
if (attempts < 0 || attempts > 10000) {
throw new Error(`retryAttempts must be between 0 and 10000 inclusive, got ${attempts}`);
}
});
props.parallelizationFactor !== undefined && cdk.withResolved(props.parallelizationFactor, (factor) => {
if (factor < 1 || factor > 10) {
throw new Error(`parallelizationFactor must be between 1 and 10 inclusive, got ${factor}`);
}
});
let destinationConfig;
if (props.onFailure) {
destinationConfig = {
onFailure: props.onFailure.bind(this, props.target),
};
}
const cfnEventSourceMapping = new CfnEventSourceMapping(this, 'Resource', {
batchSize: props.batchSize,
bisectBatchOnFunctionError: props.bisectBatchOnError,
destinationConfig,
enabled: props.enabled,
eventSourceArn: props.eventSourceArn,
functionName: props.target.functionName,
startingPosition: props.startingPosition,
maximumBatchingWindowInSeconds: props.maxBatchingWindow?.toSeconds(),
maximumRecordAgeInSeconds: props.maxRecordAge?.toSeconds(),
maximumRetryAttempts: props.retryAttempts,
parallelizationFactor: props.parallelizationFactor,
});
this.eventSourceMappingId = cfnEventSourceMapping.ref;
}
}
/**
* The position in the DynamoDB or Kinesis stream where AWS Lambda should start
* reading.
*/
export enum StartingPosition {
/**
* Start reading at the last untrimmed record in the shard in the system,
* which is the oldest data record in the shard.
*/
TRIM_HORIZON = 'TRIM_HORIZON',
/**
* Start reading just after the most recent record in the shard, so that you
* always read the most recent data in the shard
*/
LATEST = 'LATEST',
}