/
sqs.ts
63 lines (54 loc) · 1.73 KB
/
sqs.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import { IPipe, ISource, SourceConfig } from '@aws-cdk/aws-pipes-alpha';
import { Duration } from 'aws-cdk-lib';
import { IRole } from 'aws-cdk-lib/aws-iam';
import { IQueue } from 'aws-cdk-lib/aws-sqs';
/**
* Parameters for the SQS source.
*/
export interface SqsSourceParameters {
/**
* The maximum number of records to include in each batch.
*
* @see http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcesqsqueueparameters.html#cfn-pipes-pipe-pipesourcesqsqueueparameters-batchsize
* @default 10
*/
readonly batchSize?: number;
/**
* The maximum length of a time to wait for events.
*
* @see http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcesqsqueueparameters.html#cfn-pipes-pipe-pipesourcesqsqueueparameters-maximumbatchingwindowinseconds
* @default 1
*/
readonly maximumBatchingWindow?: Duration;
}
/**
* A source that reads from an SQS queue.
*/
export class SqsSource implements ISource {
private readonly queue: IQueue;
readonly sourceArn;
private sourceParameters;
constructor(queue: IQueue, parameters?: SqsSourceParameters) {
this.queue = queue;
this.sourceArn = queue.queueArn;
if (parameters) {
this.sourceParameters = parameters;
}
}
bind(_pipe: IPipe): SourceConfig {
if (!this.sourceParameters) {
return {};
}
return {
sourceParameters: {
sqsQueueParameters: {
batchSize: this.sourceParameters?.batchSize,
maximumBatchingWindowInSeconds: this.sourceParameters?.maximumBatchingWindow?.toSeconds(),
},
},
};
}
grantRead(grantee: IRole): void {
this.queue.grantConsumeMessages(grantee);
}
}