Skip to content

Commit

Permalink
feat(sqs): add support for high throughput fifo (#15202)
Browse files Browse the repository at this point in the history
Closes #15063

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
paul-doherty committed Jun 24, 2021
1 parent 32d63a3 commit d0c9602
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 3 deletions.
3 changes: 2 additions & 1 deletion packages/@aws-cdk/aws-sqs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,5 @@ features in order to help guarantee exactly-once processing. For more informatio
the SQS manual. Note that FIFO queues are not available in all AWS regions.

A queue can be made a FIFO queue by either setting `fifo: true`, giving it a name which ends
in `".fifo"`, or enabling content-based deduplication (which requires FIFO queues).
in `".fifo"`, or by enabling a FIFO specific feature such as: content-based deduplication,
deduplication scope or fifo throughput limit.
62 changes: 62 additions & 0 deletions packages/@aws-cdk/aws-sqs/lib/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,26 @@ export interface QueueProps {
*/
readonly contentBasedDeduplication?: boolean;

/**
* For high throughput for FIFO queues, specifies whether message deduplication
* occurs at the message group or queue level.
*
* (Only applies to FIFO queues.)
*
* @default DeduplicationScope.QUEUE
*/
readonly deduplicationScope?: DeduplicationScope;

/**
* For high throughput for FIFO queues, specifies whether the FIFO queue
* throughput quota applies to the entire queue or per message group.
*
* (Only applies to FIFO queues.)
*
* @default FifoThroughputLimit.PER_QUEUE
*/
readonly fifoThroughputLimit?: FifoThroughputLimit;

/**
* Policy to apply when the user pool is removed from the stack
*
Expand Down Expand Up @@ -188,6 +208,34 @@ export enum QueueEncryption {
KMS = 'KMS',
}

/**
* What kind of deduplication scope to apply
*/
export enum DeduplicationScope {
/**
* Deduplication occurs at the message group level
*/
MESSAGE_GROUP = 'messageGroup',
/**
* Deduplication occurs at the message queue level
*/
QUEUE = 'queue',
}

/**
* Whether the FIFO queue throughput quota applies to the entire queue or per message group
*/
export enum FifoThroughputLimit {
/**
* Throughput quota applies per queue
*/
PER_QUEUE = 'perQueue',
/**
* Throughput quota applies per message group id
*/
PER_MESSAGE_GROUP_ID = 'perMessageGroupId',
}

/**
* A new Amazon SQS queue
*/
Expand Down Expand Up @@ -342,6 +390,8 @@ export class Queue extends QueueBase {
const queueName = props.queueName;
if (typeof fifoQueue === 'undefined' && queueName && !Token.isUnresolved(queueName) && queueName.endsWith('.fifo')) { fifoQueue = true; }
if (typeof fifoQueue === 'undefined' && props.contentBasedDeduplication) { fifoQueue = true; }
if (typeof fifoQueue === 'undefined' && props.deduplicationScope) { fifoQueue = true; }
if (typeof fifoQueue === 'undefined' && props.fifoThroughputLimit) { fifoQueue = true; }

// If we have a name, see that it agrees with the FIFO setting
if (typeof queueName === 'string') {
Expand All @@ -357,8 +407,18 @@ export class Queue extends QueueBase {
throw new Error('Content-based deduplication can only be defined for FIFO queues');
}

if (props.deduplicationScope && !fifoQueue) {
throw new Error('Deduplication scope can only be defined for FIFO queues');
}

if (props.fifoThroughputLimit && !fifoQueue) {
throw new Error('FIFO throughput limit can only be defined for FIFO queues');
}

return {
contentBasedDeduplication: props.contentBasedDeduplication,
deduplicationScope: props.deduplicationScope,
fifoThroughputLimit: props.fifoThroughputLimit,
fifoQueue,
};
}
Expand All @@ -367,6 +427,8 @@ export class Queue extends QueueBase {
interface FifoProps {
readonly fifoQueue?: boolean;
readonly contentBasedDeduplication?: boolean;
readonly deduplicationScope?: DeduplicationScope;
readonly fifoThroughputLimit?: FifoThroughputLimit;
}

interface EncryptionProps {
Expand Down
28 changes: 27 additions & 1 deletion packages/@aws-cdk/aws-sqs/test/integ.sqs.expected.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@
"UpdateReplacePolicy": "Delete",
"DeletionPolicy": "Delete"
},
"HighThroughputFifoQueue40A0EEE4": {
"Type": "AWS::SQS::Queue",
"Properties": {
"DeduplicationScope": "messageGroup",
"FifoQueue": true,
"FifoThroughputLimit": "perMessageGroupId"
},
"UpdateReplacePolicy": "Delete",
"DeletionPolicy": "Delete"
},
"Role1ABCC5F0": {
"Type": "AWS::IAM::Role",
"Properties": {
Expand Down Expand Up @@ -165,6 +175,22 @@
"Arn"
]
}
},
{
"Action": [
"sqs:ReceiveMessage",
"sqs:ChangeMessageVisibility",
"sqs:GetQueueUrl",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes"
],
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"HighThroughputFifoQueue40A0EEE4",
"Arn"
]
}
}
],
"Version": "2012-10-17"
Expand All @@ -185,4 +211,4 @@
}
}
}
}
}
8 changes: 7 additions & 1 deletion packages/@aws-cdk/aws-sqs/test/integ.sqs.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { AccountRootPrincipal, Role } from '@aws-cdk/aws-iam';
import { Key } from '@aws-cdk/aws-kms';
import { App, CfnOutput, RemovalPolicy, Stack } from '@aws-cdk/core';
import { Queue, QueueEncryption } from '../lib';
import { DeduplicationScope, FifoThroughputLimit, Queue, QueueEncryption } from '../lib';

const app = new App();

Expand All @@ -16,6 +16,11 @@ const fifo = new Queue(stack, 'FifoQueue', {
fifo: true,
encryptionMasterKey: new Key(stack, 'EncryptionKey', { removalPolicy: RemovalPolicy.DESTROY }),
});
const highThroughputFifo = new Queue(stack, 'HighThroughputFifoQueue', {
fifo: true,
fifoThroughputLimit: FifoThroughputLimit.PER_MESSAGE_GROUP_ID,
deduplicationScope: DeduplicationScope.MESSAGE_GROUP,
});

const role = new Role(stack, 'Role', {
assumedBy: new AccountRootPrincipal(),
Expand All @@ -24,6 +29,7 @@ const role = new Role(stack, 'Role', {
dlq.grantConsumeMessages(role);
queue.grantConsumeMessages(role);
fifo.grantConsumeMessages(role);
highThroughputFifo.grantConsumeMessages(role);

new CfnOutput(stack, 'QueueUrl', { value: queue.queueUrl });

Expand Down
49 changes: 49 additions & 0 deletions packages/@aws-cdk/aws-sqs/test/test.sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,55 @@ export = {
test.done();
},

'test a fifo queue is observed when high throughput properties are specified'(test: Test) {
const stack = new Stack();
const queue = new sqs.Queue(stack, 'Queue', {
fifo: true,
fifoThroughputLimit: sqs.FifoThroughputLimit.PER_MESSAGE_GROUP_ID,
deduplicationScope: sqs.DeduplicationScope.MESSAGE_GROUP,
});

test.deepEqual(queue.fifo, true);
expect(stack).toMatch({
'Resources': {
'Queue4A7E3555': {
'Type': 'AWS::SQS::Queue',
'Properties': {
'DeduplicationScope': 'messageGroup',
'FifoQueue': true,
'FifoThroughputLimit': 'perMessageGroupId',
},
'UpdateReplacePolicy': 'Delete',
'DeletionPolicy': 'Delete',
},
},
});

test.done();
},

'test a queue throws when fifoThroughputLimit specified on non fifo queue'(test: Test) {
const stack = new Stack();
test.throws(() => {
new sqs.Queue(stack, 'Queue', {
fifo: false,
fifoThroughputLimit: sqs.FifoThroughputLimit.PER_MESSAGE_GROUP_ID,
});
});
test.done();
},

'test a queue throws when deduplicationScope specified on non fifo queue'(test: Test) {
const stack = new Stack();
test.throws(() => {
new sqs.Queue(stack, 'Queue', {
fifo: false,
deduplicationScope: sqs.DeduplicationScope.MESSAGE_GROUP,
});
});
test.done();
},

'test metrics'(test: Test) {
// GIVEN
const stack = new Stack();
Expand Down

0 comments on commit d0c9602

Please sign in to comment.