Skip to content

Commit

Permalink
feat(events): validate MessageGroupId is specified only for FIFO queu…
Browse files Browse the repository at this point in the history
…es (#3811)

* chore(events-targets): Validate MessageGroupId is specified only for FIFO queues.

Reason: Specifying MessageGroupId for non-FIFO queues fails
CloudFormation deployment.

Include a new readonly attribute `fifo` to sqs.IQueue.

* Use spread operator instead of Object.assign()

* Updated as per PR feedback #3811
  • Loading branch information
nija-at authored and mergify[bot] committed Sep 2, 2019
1 parent 5ef34a1 commit cc88f1a
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 28 deletions.
11 changes: 5 additions & 6 deletions packages/@aws-cdk/aws-events-targets/lib/sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ export interface SqsQueueProps {
export class SqsQueue implements events.IRuleTarget {

constructor(public readonly queue: sqs.IQueue, private readonly props: SqsQueueProps = {}) {
if (props.messageGroupId !== undefined && !queue.fifo) {
throw new Error('messageGroupId cannot be specified for non-FIFO queues');
}
}

/**
Expand All @@ -58,17 +61,13 @@ export class SqsQueue implements events.IRuleTarget {
})
);

const result: events.RuleTargetConfig = {
return {
id: '',
arn: this.queue.queueArn,
input: this.props.message,
targetResource: this.queue,
sqsParameters: this.props.messageGroupId ? { messageGroupId: this.props.messageGroupId } : undefined,
};
if (!!this.props.messageGroupId) {
Object.assign(result, { sqsParameters: { messageGroupId: this.props.messageGroupId } });
}
return result;

}

}
50 changes: 45 additions & 5 deletions packages/@aws-cdk/aws-events-targets/test/sqs/sqs.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { expect, haveResource } from '@aws-cdk/assert';
import { expect as cdkExpect, haveResource } from '@aws-cdk/assert';
import events = require('@aws-cdk/aws-events');
import sqs = require('@aws-cdk/aws-sqs');
import { Duration, Stack } from '@aws-cdk/core';
import targets = require('../../lib');

test('sns topic as an event rule target', () => {
test('sqs queue as an event rule target', () => {
// GIVEN
const stack = new Stack();
const queue = new sqs.Queue(stack, 'MyQueue');
Expand All @@ -16,7 +16,7 @@ test('sns topic as an event rule target', () => {
rule.addTarget(new targets.SqsQueue(queue));

// THEN
expect(stack).to(haveResource('AWS::SQS::QueuePolicy', {
cdkExpect(stack).to(haveResource('AWS::SQS::QueuePolicy', {
PolicyDocument: {
Statement: [
{
Expand Down Expand Up @@ -50,7 +50,7 @@ test('sns topic as an event rule target', () => {
Queues: [{ Ref: "MyQueueE6CA6235" }]
}));

expect(stack).to(haveResource('AWS::Events::Rule', {
cdkExpect(stack).to(haveResource('AWS::Events::Rule', {
ScheduleExpression: "rate(1 hour)",
State: "ENABLED",
Targets: [
Expand Down Expand Up @@ -81,7 +81,7 @@ test('multiple uses of a queue as a target results in multi policy statement bec
}

// THEN
expect(stack).to(haveResource('AWS::SQS::QueuePolicy', {
cdkExpect(stack).to(haveResource('AWS::SQS::QueuePolicy', {
PolicyDocument: {
Statement: [
{
Expand Down Expand Up @@ -140,3 +140,43 @@ test('multiple uses of a queue as a target results in multi policy statement bec
Queues: [{ Ref: "MyQueueE6CA6235" }]
}));
});

test('fail if messageGroupId is specified on non-fifo queues', () => {
const stack = new Stack();
const queue = new sqs.Queue(stack, 'MyQueue');

expect(() => new targets.SqsQueue(queue, { messageGroupId: 'MyMessageGroupId' }))
.toThrow(/messageGroupId cannot be specified/);
});

test('fifo queues are synthesized correctly', () => {
const stack = new Stack();
const queue = new sqs.Queue(stack, 'MyQueue', { fifo: true });
const rule = new events.Rule(stack, 'MyRule', {
schedule: events.Schedule.rate(Duration.hours(1)),
});

// WHEN
rule.addTarget(new targets.SqsQueue(queue, {
messageGroupId: 'MyMessageGroupId',
}));

cdkExpect(stack).to(haveResource('AWS::Events::Rule', {
ScheduleExpression: "rate(1 hour)",
State: "ENABLED",
Targets: [
{
Arn: {
"Fn::GetAtt": [
"MyQueueE6CA6235",
"Arn"
]
},
Id: "Target0",
SqsParameters: {
MessageGroupId: "MyMessageGroupId",
}
}
]
}));
});
10 changes: 10 additions & 0 deletions packages/@aws-cdk/aws-sqs/lib/queue-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ export interface IQueue extends IResource {
*/
readonly encryptionMasterKey?: kms.IKey;

/**
* Whether this queue is an Amazon SQS FIFO queue. If false, this is a standard queue.
*/
readonly fifo: boolean;

/**
* Adds a statement to the IAM resource policy associated with this queue.
*
Expand Down Expand Up @@ -112,6 +117,11 @@ export abstract class QueueBase extends Resource implements IQueue {
*/
public abstract readonly encryptionMasterKey?: kms.IKey;

/**
* Whether this queue is an Amazon SQS FIFO queue. If false, this is a standard queue.
*/
public abstract readonly fifo: boolean;

/**
* Controls automatic creation of policy objects.
*
Expand Down
11 changes: 10 additions & 1 deletion packages/@aws-cdk/aws-sqs/lib/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ export class Queue extends QueueBase {
public readonly encryptionMasterKey = attrs.keyArn
? kms.Key.fromKeyArn(this, 'Key', attrs.keyArn)
: undefined;
public readonly fifo = queueName.endsWith('.fifo') ? true : false;

protected readonly autoCreatePolicy = false;
}
Expand Down Expand Up @@ -227,6 +228,11 @@ export class Queue extends QueueBase {
*/
public readonly encryptionMasterKey?: kms.IKey;

/**
* Whether this queue is an Amazon SQS FIFO queue. If false, this is a standard queue.
*/
public readonly fifo: boolean;

protected readonly autoCreatePolicy = true;

constructor(scope: Construct, id: string, props: QueueProps = {}) {
Expand All @@ -245,9 +251,12 @@ export class Queue extends QueueBase {

const { encryptionMasterKey, encryptionProps } = _determineEncryptionProps.call(this);

const fifoProps = this.determineFifoProps(props);
this.fifo = fifoProps.fifoQueue || false;

const queue = new CfnQueue(this, 'Resource', {
queueName: this.physicalName,
...this.determineFifoProps(props),
...fifoProps,
...encryptionProps,
redrivePolicy,
delaySeconds: props.deliveryDelay && props.deliveryDelay.toSeconds(),
Expand Down
90 changes: 74 additions & 16 deletions packages/@aws-cdk/aws-sqs/test/test.sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import { Queue } from '../lib';
export = {
'default properties'(test: Test) {
const stack = new Stack();
new sqs.Queue(stack, 'Queue');
const q = new sqs.Queue(stack, 'Queue');

test.deepEqual(q.fifo, false);

expect(stack).toMatch({
"Resources": {
Expand Down Expand Up @@ -95,21 +97,32 @@ export = {
test.done();
},

'exporting and importing works'(test: Test) {
// GIVEN
const stack = new Stack();
'export and import': {
'importing works correctly'(test: Test) {
// GIVEN
const stack = new Stack();

// WHEN
const imports = sqs.Queue.fromQueueArn(stack, 'Imported', 'arn:aws:sqs:us-east-1:123456789012:queue1');
// WHEN
const imports = sqs.Queue.fromQueueArn(stack, 'Imported', 'arn:aws:sqs:us-east-1:123456789012:queue1');

// THEN
// THEN

// "import" returns an IQueue bound to `Fn::ImportValue`s.
test.deepEqual(stack.resolve(imports.queueArn), 'arn:aws:sqs:us-east-1:123456789012:queue1');
test.deepEqual(stack.resolve(imports.queueUrl), { 'Fn::Join':
[ '', [ 'https://sqs.', { Ref: 'AWS::Region' }, '.', { Ref: 'AWS::URLSuffix' }, '/', { Ref: 'AWS::AccountId' }, '/queue1' ] ] });
test.deepEqual(stack.resolve(imports.queueName), 'queue1');
test.done();
// "import" returns an IQueue bound to `Fn::ImportValue`s.
test.deepEqual(stack.resolve(imports.queueArn), 'arn:aws:sqs:us-east-1:123456789012:queue1');
test.deepEqual(stack.resolve(imports.queueUrl), { 'Fn::Join':
[ '', [ 'https://sqs.', { Ref: 'AWS::Region' }, '.', { Ref: 'AWS::URLSuffix' }, '/', { Ref: 'AWS::AccountId' }, '/queue1' ] ] });
test.deepEqual(stack.resolve(imports.queueName), 'queue1');
test.done();
},

'importing fifo and standard queues are detected correctly'(test: Test) {
const stack = new Stack();
const stdQueue = sqs.Queue.fromQueueArn(stack, 'StdQueue', 'arn:aws:sqs:us-east-1:123456789012:queue1');
const fifoQueue = sqs.Queue.fromQueueArn(stack, 'FifoQueue', 'arn:aws:sqs:us-east-1:123456789012:queue2.fifo');
test.deepEqual(stdQueue.fifo, false);
test.deepEqual(fifoQueue.fifo, true);
test.done();
},
},

'grants': {
Expand Down Expand Up @@ -276,21 +289,66 @@ export = {
},
},

'test ".fifo" suffixed queues register as fifo'(test: Test) {
const stack = new Stack();
const queue = new Queue(stack, 'Queue', {
queueName: 'MyQueue.fifo'
});

test.deepEqual(queue.fifo, true);

expect(stack).toMatch({
"Resources": {
"Queue4A7E3555": {
"Type": "AWS::SQS::Queue",
"Properties": {
"QueueName": "MyQueue.fifo",
"FifoQueue": true
}
}
}
});

test.done();
},

'test a fifo queue is observed when the "fifo" property is specified'(test: Test) {
const stack = new Stack();
const queue = new Queue(stack, 'Queue', {
fifo: true
});

test.deepEqual(queue.fifo, true);

expect(stack).toMatch({
"Resources": {
"Queue4A7E3555": {
"Type": "AWS::SQS::Queue",
"Properties": {
"FifoQueue": true
}
}
}
});

test.done();
},

'test metrics'(test: Test) {
// GIVEN
const stack = new Stack();
const topic = new Queue(stack, 'Queue');
const queue = new Queue(stack, 'Queue');

// THEN
test.deepEqual(stack.resolve(topic.metricNumberOfMessagesSent()), {
test.deepEqual(stack.resolve(queue.metricNumberOfMessagesSent()), {
dimensions: {QueueName: { 'Fn::GetAtt': [ 'Queue4A7E3555', 'QueueName' ] }},
namespace: 'AWS/SQS',
metricName: 'NumberOfMessagesSent',
period: Duration.minutes(5),
statistic: 'Sum'
});

test.deepEqual(stack.resolve(topic.metricSentMessageSize()), {
test.deepEqual(stack.resolve(queue.metricSentMessageSize()), {
dimensions: {QueueName: { 'Fn::GetAtt': [ 'Queue4A7E3555', 'QueueName' ] }},
namespace: 'AWS/SQS',
metricName: 'SentMessageSize',
Expand Down

0 comments on commit cc88f1a

Please sign in to comment.