Permalink
Browse files

Added the destroy queue function so that the queue can be gracefully …

…shutdown when the application calls the destroy function - presumably after handling a shutdown signal.
  • Loading branch information...
1 parent 02fe2b3 commit a027ed35c488d3fe25c8d91bf77cc5b19f5427b7 @danpres14 danpres14 committed Dec 28, 2012
Showing with 74 additions and 13 deletions.
  1. +23 −0 examples/example.js
  2. +49 −12 lib/msgQueue.js
  3. +2 −1 package.json
View
@@ -8,6 +8,7 @@
*/
var _ = require('underscore'),
+ async = require('async'),
util = require('util'),
msgQueue = require('../index');
@@ -202,3 +203,25 @@ queues.forEach(function (queue) {
}
});
});
+
+// ----------------------------------------------------------------------------------------------------
+// Shutdown some of the message queues
+// ----------------------------------------------------------------------------------------------------
+process.on('SIGINT', function () {
+ // Shutdown the message queues
+ async.parallel([
+ function (callback) {
+ return msgQueue.destroyRedisMsgQueue(app, 'unqueue', callback);
+ },
+ function (callback) {
+ return msgQueue.destroyRedisMsgQueue(app, 'abc123', callback);
+ }
+ ], function (error) {
+ // Exit the process
+ process.exit();
+ });
+});
+
+process.on('exit', function () {
+ console.log('The example is now exiting.');
+});
View
@@ -18,6 +18,7 @@
var _ = require('underscore')
, mongodb = require('mongodb')
, redis = require('redis')
+ , async = require('async')
, util = require('util')
, worker = require('./worker')
@@ -55,6 +56,7 @@ var _addMsgWorker = function (queueName, msgWorker, callback) {
var workerName = '' + (++_msgWorkerCount) + '-' + queueName;
// Add the worker (with wrapper) to the map
_msgWorkerMap[workerName] = {
+ msgQueueName: queueName,
workerName: workerName,
msgWorker: msgWorker,
stop: false
@@ -222,14 +224,57 @@ var createRedisMsgQueue = function (appName, queueName, redisOptions, callback)
module.exports.createRedisMsgQueue = createRedisMsgQueue;
module.exports.getRedisMsgQueue = createRedisMsgQueue;
+var destroyRedisMsgQueue = function (appName, queueName, callback) {
+ var msgQueueName = _setMsgQueueName(appName, queueName);
+ if (_.isFunction(callback)) {
+ // Find an existing msg queue (if one exists)
+ return _getMsgQueue(msgQueueName, function (error, msgQueue) {
+ if (!error) {
+ // Shutdown the existing msg queue
+ return async.forEach(_.toArray(_msgWorkerMap), function (_msgWorker, callback) {
+ if (_msgWorker && _msgWorker.msgQueueName === queueName) {
+ _setMsgWorkerStop(_msgWorker.workerName);
+ return async.until(function() {
+ return (_getMsgWorker(_msgWorker.workerName) === undefined);
+ }, function (callback) {
+ return setTimeout(callback, 250);
+ }, function (error) {
+ return callback();
+ });
+ }
+ return callback();
+ }, function () {
+ return callback();
+ });
+ }
+ // The msg queue does not exist
+ return callback();
+ });
+ }
+ // Find an existing msg queue (if one exists)
+ var msgQueue = _getMsgQueue(msgQueueName);
+ if (msgQueue) {
+ // Shutdown the existing msg queue
+ _msgWorkerMap.forEach(function (msgWorker) {
+ if (msgWorker && msgWorker.msgQueueName === msgQueueName) {
+ _setMsgWorkerStop(msgWorker.workerName);
+ }
+ });
+ return undefined;
+ }
+ // The msg queue does not exist
+ return undefined;
+};
+module.exports.destroyRedisMsgQueue = destroyRedisMsgQueue;
+
var RedisMsgQueue = function (appName, queueName, redisOptions) {
// Set the app and queue names for this instance of the queue
this.appName = appName;
this.queueName = queueName;
this.msgQueueName = _setMsgQueueName(appName, queueName);
// Set the poll interval, retry delay, and retry limit for this instance of the queue
- this.pollInterval = 15000;
+ this.pollInterval = 1000;
this.lockTimeout = 90000;
this.retryDelay = 1.25;
this.retryLimit = 1;
@@ -341,15 +386,6 @@ RedisMsgQueue.prototype.register = function (worker, callback) {
// return execution along with the callback!
callback();
- // Setup a function to stop each worker
- // when the process is about to exit
- var msgQueue = this;
- process.on('exit', function (error, callback) {
- // Tell the worker to exit
-// TODO - Fix This! I need to add something to gracefully shutdown after all workers really close
- return _setMsgWorkerStop(workerName);
- });
-
// Setup a new redis client and prepare the connection - an individual client
// is required for each worker so that the watch mechanism will function properly
var workerRedis = redis.createClient(this.port, this.host, this.options);
@@ -379,10 +415,11 @@ RedisMsgQueue.prototype._timeoutProcess = function (workerName, workerRedis, msg
RedisMsgQueue.prototype._process = function (workerName, workerRedis, msgWorker, msgQueue) {
// Exit the processing loop if the worker has been told to stop
if (_getMsgWorkerStop(workerName)) {
- if (!msgQueue._removeMsgWorker(workerName)) {
+ if (!_removeMsgWorker(workerName)) {
console.warn('RedisMsgQueue._process() The redis msg worker ' + msgWorkerName +
' was unable to remove itself from the worker map.');
}
+ //console.log('The worker %s is done.', workerName);
return 'This worker is done.';
}
@@ -561,7 +598,7 @@ RedisMsgQueue.prototype._process = function (workerName, workerRedis, msgWorker,
});
}
if (_.isNull(tasks) || (_.isFinite(tasks) && tasks === 0)) {
- console.log('Worker ' + workerName + ' - test point - clean up tasks === 0');
+ //console.log('Worker ' + workerName + ' - test point - clean up tasks === 0');
// Cleanup the data store if there are no more tasks in the app:queue:group
var multi = workerRedis.multi();
// Delete the app:queue:group hash
View
@@ -1,5 +1,5 @@
{
- "version": "0.0.2",
+ "version": "0.0.3",
"name": "msg-queue",
"description": "A lightweight message queue using either mongodb or redis as the data store.",
"author": "Dan Prescott <danpres14@gmail.com> - Short Line Design Inc.",
@@ -10,6 +10,7 @@
},
"dependencies": {
"underscore": "1.4.x",
+ "async": "0.1.x",
"mongodb": "1.2.x",
"redis": "0.8.x"
},

0 comments on commit a027ed3

Please sign in to comment.