Skip to content

Commit

Permalink
Rewrite using Rascal as our underlying driver
Browse files Browse the repository at this point in the history
# Breaking Changes
 * Underlying driver has changed from forked-version of postwait's `amqp` to rascal/amqplib
 * Queue configuration has changed, see rascal's configuration scheme
 * QueueService
   * constructor no longer takes `queues` param, this is setup in the rascal config
   * `queues` property has been removed
   * `connect` is now an async function (no more callback)
   * many internal member functions have been removed
 * QueueWorker
   * constructor option `queueName` is now `subscriptionName`
   * constructor requires option `service` (instance of QueueService)
   * many internal members have been removed
   * `init` is now an async function (no more callback)
   * `subscribe` is now an async function (no more callback)
   * `onReady` has been removed
   * `onSubscribed` no longer has arguments
   * `onUnsubscribed` no longer has arguments
   * `onMessage` signature has changed to `(message, content, ackOrNack)`
   * `onMessageHandled` has been removed
   * `handleMessage` signature has changed to `(message, content, ackOrNack)`
   * `onServiceError` has been removed
 * BatchQueueWorker
   * option `batchSize` now translates to a prefetch of (batchSize * 2), so Async.Cargo can optimally deliver the desired batch size to the app.
   * `handleMessageBatch` has changed signature to (messages, defaultAckOrNack)
     * Messages are wrapped, and can be individually acknowledged via `messages[i].ackOrNack(...)`. Likewise, `defaultAckOrNAck(...)` will handle the remaining messages in the batch.
   * `onMessage` signature has changed to `(message, content, ackOrNack)`
   * `onMessageHandled` has been removed
   * `prepareForShutdown` override has been removed

Other notable changes:

 * QueueService
   * Vastly simplified logic, 40% reduction in code thanks to Rascal taking on error handling and recovery
   * `publish` returns a promise, callback is optional.
   * Added `QueueService.generateConfigFromQueueNames(queueNames, config)` helper to generate Rascal vhost config given array of queue names.
 * QueueWorker
   * Vastly simplified logic, 35% reduction in code thanks to Rascal taking on the complexities
 * BatchQueueWorker
   * Simplified logic, 30% reduction in code thanks to Rascal
   * constructor now takes option `skipInit:true` to not start subscribing upon construction
 * Updated example app
 * Rewrote tests from scratch
 * Updated docs
  • Loading branch information
kfitzgerald committed Mar 22, 2019
1 parent 5989940 commit 2b42704
Show file tree
Hide file tree
Showing 18 changed files with 1,532 additions and 3,458 deletions.
2 changes: 2 additions & 0 deletions .eslintrc.json
Expand Up @@ -16,7 +16,9 @@
"describe": true,
"it": true,
"before": true,
"beforeEach": true,
"after": true,
"afterEach": true,
"Promise": true
},
"overrides": [
Expand Down
171 changes: 57 additions & 114 deletions BatchQueueWorker.js
Expand Up @@ -10,181 +10,124 @@ class BatchQueueWorker extends QueueWorker {

constructor(app, options) {

if (!options) {
throw new Error('BatchQueueWorker: options are required');
}

const batchSize = options.batchSize || 5;
const skipInit = options.skipInit || false;

// Set our base queue worker options based on our batch size
options.queueSubscriptionOptions = { ack: true, prefetchCount: batchSize };
// Set the base QueueWorker options based on the given batch size
options.queueSubscriptionOptions = options.queueSubscriptionOptions || {};
options.queueSubscriptionOptions.prefetch = batchSize * 2;
options.queueSubscriptionOptions.retry = options.queueSubscriptionOptions.retry || { delay: 1000 };

// Don't initialize until we're all setup
options.skipInit = true;

// Start the engines
// Initialize underlying QueueWorker
super(app, options);

/**
* Message batch size - How many messages we'll be sent at a time
* @type {number}
*/
this.batchSize = options.batchSize || 5;
this._isProcessing = null; // Not applicable to this worker
this.batchSize = batchSize;

// Get the aggregator setup
this._setupCargo();

// Start accepting messages
this.init();
if (!skipInit) {
this.init();
}
}


/* istanbul ignore next: must be implemented or this does nothing */
//noinspection JSMethodCanBeStatic
/**
* This is the batch handler you need to override!
*
* @param {[*]} messages – Array of message objects, NOT payloads, that would be messages[0].message
* @param {function(requeueMessages:[], rejectMessages:[])} callback - Fire when done processing the batch
* @param {function(err:*=null, recovery:*=null)} defaultAckOrNack - Fire when done processing the batch
*/
handleMessageBatch(messages, callback) {
handleMessageBatch(messages, defaultAckOrNack) {

// YOU MUST IMPLEMENT THIS TO BE USEFUL

// DO NOT MANIPULATE `messages` !! IF YOU DO, YOU'll PROBABLY BREAK IT HARD
// e.g. DO NOT messages.pop/push/slice/splice, etc. THIS IS BAD.
// individually ack or nack a message in the batch
// messages[i].ackOrNack(); // individually ack or nack a message in the batch

// callback(requeueMessages, rejectMessages)
callback(messages, []);
// ack or nack the unhandled messages in the batch
// defaultAckOrNack(); // ack all
// defaultAckOrNack(err); // err all w/ default strategy
// defaultAckOrNack(err, recovery); // err all w/ specific strategy
defaultAckOrNack(true, this.nack.drop); // err all w/ drop strategy
}

/**
* Initialize the cargo data structure
* @private
*/
_setupCargo() {
this._cargo = Async.cargo(this._processCargoBatch.bind(this));
this._cargo = Async.cargo(this._processCargoBatch.bind(this), this.batchSize);
}

/**
* Internal handler for shipping a batch of messages to the application
* @param messages
* @param {[CargoMessage]} messages
* @param callback
* @private
*/
_processCargoBatch(messages, callback) {

// Hold onto the last message in case we get to accept the entire batch
const lastMessage = messages[messages.length-1];

// Pass the batch to the handler
this.handleMessageBatch(messages, (requeue, reject) => {

// Normalize responses if they're empty
requeue = Array.isArray(requeue) ? requeue : [];
reject = Array.isArray(reject) ? reject : [];

// If there's anything to throw away
if (requeue.length + reject.length > 0) {

// Iterate each and accept, or reject/requeue
messages.forEach((message) => {
if (requeue.includes(message)) {
message.messageObject.reject(true);
} else if (reject.includes(message)) {
message.messageObject.reject(false);
} else {
message.messageObject.acknowledge(false);
}
});

} else {
// Ack the entire batch
lastMessage.messageObject.acknowledge(true);
}
this.handleMessageBatch(messages, (err, recovery) => {

// Ack/Nack any unhandled message
messages.forEach((message) => {
if (!message._handled) {
message.ackOrNack(err, recovery);
}
});

callback();
});
}

/**
* Override the default message handler system
*
* @see https://github.com/postwait/node-amqp#queuesubscribeoptions-listener
* Override the default message handler system. Routes messages into async cargo.
*
* @param message - Message body
* @param [headers] - Message headers
* @param [deliveryInfo] - Raw message info
* @param [messageObject] - Message object wrapper (e.g. messageObject.acknowedge(false) )
* @param message - Message object
* @param content – Message body
* @param ackOrNack – Callback to ack or nack the message
*/
onMessage(message, headers, deliveryInfo, messageObject) {
/* istanbul ignore else: I really tried to unit test this but i don't think i can (timing) */
if (!this._isShuttingDown) {

// Queue this into the current batch
this._cargo.push({
message,
headers,
deliveryInfo,
messageObject
});

} else {
// If we're in the process of shutting down, reject+requeue this message so it's handled later
setImmediate(() => messageObject.reject(true));
}
}
onMessage(message, content, ackOrNack) {

/* istanbul ignore next: not implemented in this class */
/**
* Callback provided to the message handler to complete working with the message
* @param reject
* @param requeue
*/
onMessageHandled(reject, requeue) { /* eslint-disable-line no-unused-vars */
throw new Error('This method does not apply to this worker. Do not use it.');
}
/**
* Wrapped Cargo Message
* @typedef {{message: *, content: *, ackOrNack: function(err:*,recovery:*), _handled: boolean}} CargoMessage
*/
const payload = {
message,
content,
// wrapper around given ackOrNack, if message individually handled, flag it
ackOrNack: (...params) => {
payload._handled = true;
ackOrNack.apply(null, params);
},
_handled: false
};

/* istanbul ignore next: not implemented in this class */
/**
* Hook point for handling messages
*
* @see https://github.com/postwait/node-amqp#queuesubscribeoptions-listener
*
* @param message - Message body
* @param {function(reject:boolean, requeue:boolean)} callback - Fire when done processing the message
* @param [headers] - Message headers
* @param [deliveryInfo] - Raw message info
* @param [messageObject] - Message object wrapper (e.g. messageObject.acknowedge(false) )
*/
handleMessage(message, callback, headers, deliveryInfo, messageObject) { /* eslint-disable-line no-unused-vars */
throw new Error('This method does not apply to this worker. Do not use it.');
// Queue this into the current batch
this._cargo.push(payload);
}

//noinspection JSUnusedGlobalSymbols,JSUnusedLocalSymbols
/**
* Starts the internal shutdown process (hook point)
* Do not use this method on this QueueWorker
*/
prepareForShutdown(canAsync) { /* eslint-disable-line no-unused-vars */

this.log(` !! Shutting down the ${this.queueName} queue`);

// Flag that we're shutting down
this._isShuttingDown = true;

// Unsub and shutdown
const done = () => {
this.unsubscribe(() => {
this.shutdown();
});
};

// If the cargo is still working, then drain it and end, otherwise just end
/* istanbul ignore next: it's really hard to test this case of cargo still got junk in it at shutdown */
if (this._cargo.length() > 0) {
this._cargo.drain = () => {
done();
};
} else {
done();
}
handleMessage(message, content, ackOrNack) { /* eslint-disable-line no-unused-vars */
throw new Error('BatchQueueWorker: This method does not apply to this worker. Do not use it.');
}
}

Expand Down

0 comments on commit 2b42704

Please sign in to comment.