Skip to content

Commit

Permalink
Rework sqs-plugins auto-create (and fifo queue handling) 🔧
Browse files Browse the repository at this point in the history
  • Loading branch information
AdrieanKhisbe committed Dec 24, 2019
1 parent 9077d62 commit 0b80ee4
Showing 1 changed file with 32 additions and 47 deletions.
79 changes: 32 additions & 47 deletions packages/serverless-offline-sqs/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const {
get,
has,
isEmpty,
isNil,
isUndefined,
lowerFirst,
map,
Expand Down Expand Up @@ -65,37 +66,32 @@ class ServerlessOfflineSQS {
return new SQS(this.getConfig());
}

getQueueName(queueEvent) {
if (typeof queueEvent === 'string') return extractQueueNameFromARN(queueEvent);
if (typeof queueEvent.arn === 'string') return extractQueueNameFromARN(queueEvent.arn);
if (typeof queueEvent.queueName === 'string') return queueEvent.queueName;

const queueName = this.getProperty(queueEvent, 'QueueName', 'string');
if (queueName !== null) return queueName;

throw new Error(
`QueueName not found. See https://github.com/CoorpAcademy/serverless-plugins/tree/master/packages/serverless-offline-sqs#functions`
);
}

getProperty(queueEvent, propertyName, propertyType) {
getProperty(queueEvent, propertyName, propertyType = 'string') {
if (queueEvent && queueEvent.arn['Fn::GetAtt']) {
const [ResourceName] = queueEvent.arn['Fn::GetAtt'];

if (
this.service &&
this.service.resources &&
this.service.resources.Resources &&
this.service.resources.Resources[ResourceName] &&
this.service.resources.Resources[ResourceName].Properties &&
typeof this.service.resources.Resources[ResourceName].Properties[propertyName] ===
propertyType
)
return this.service.resources.Resources[ResourceName].Properties[propertyName];
const propertyValue = get(
['resources', 'Resources', ResourceName, 'Properties'],
this.service
);
if (propertyValue && typeof propertyValue === propertyType) return propertyValue;
}
return null;
}

getQueueName(queueEvent) {
if (typeof queueEvent === 'string') return extractQueueNameFromARN(queueEvent);
if (typeof queueEvent.arn === 'string') return extractQueueNameFromARN(queueEvent.arn);
if (typeof queueEvent.queueName === 'string') return queueEvent.queueName;

const queueName = this.getProperty(queueEvent, 'QueueName');
if (!queueName)
throw new Error(
`QueueName not found. See https://github.com/CoorpAcademy/serverless-plugins/tree/master/packages/serverless-offline-sqs#functions`
);
return queueName;
}

eventHandler(queueEvent, functionName, messages, cb) {
if (!messages) return cb();

Expand Down Expand Up @@ -174,36 +170,25 @@ class ServerlessOfflineSQS {

async createQueueReadable(functionName, queueEvent) {
const client = this.getClient();
const queueName = this.getQueueName(queueEvent);
const QueueName = this.getQueueName(queueEvent);

this.serverless.cli.log(`${queueName}`);
this.serverless.cli.log(`${QueueName}`);

if (this.getConfig().autoCreate) {
const attributes = {};

const fifoQueue = this.getProperty(queueEvent, 'FifoQueue', 'boolean');
if (fifoQueue !== null) attributes.FifoQueue = fifoQueue.toString();

const contentBasedDeduplication = this.getProperty(
queueEvent,
'ContentBasedDeduplication',
'boolean'
);
if (contentBasedDeduplication !== null)
attributes.ContentBasedDeduplication = contentBasedDeduplication.toString();
const attributes = {
FifoQueue: this.getProperty(queueEvent, 'FifoQueue', 'boolean'),
ContentBasedDeduplication: this.getProperty(
queueEvent,
'ContentBasedDeduplication',
'boolean'
)
};

const params = {QueueName: queueName, Attributes: attributes};
const params = {QueueName, Attributes: omitBy(isNil, attributes)};
await fromCallback(cb => client.createQueue(params, cb));
}

const {QueueUrl} = await fromCallback(cb =>
client.getQueueUrl(
{
QueueName: queueName
},
cb
)
);
const {QueueUrl} = await fromCallback(cb => client.getQueueUrl({QueueName}, cb));

const next = async () => {
const {Messages} = await fromCallback(cb =>
Expand Down

0 comments on commit 0b80ee4

Please sign in to comment.