Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Added additional examples and fixed a couple small issues in the queue.

  • Loading branch information...
commit f8dd28821f02a68b32337c00d29a696e3cbf163a 1 parent 1e45205
@danpres14 danpres14 authored
Showing with 107 additions and 5 deletions.
  1. +105 −3 examples/example.js
  2. +2 −2 lib/redisMsgQueue.js
View
108 examples/example.js
@@ -13,11 +13,13 @@ var _ = require('underscore'),
redisMsgQueue = require('../index');
// Setup the example app, queues, and groups
-var app = 'example', unqueue = null
+var app = 'example', unqueue = null, queue = null
, queues = [ { name: 'first' }, { name: 'second' } ]
, groups = [ { name: 'alpha' }, { name: 'bravo' }, { name: 'charlie' } ]
+// ----------------------------------------------------------------------------------------------------
// Setup the ungrouped message queue
+// ----------------------------------------------------------------------------------------------------
console.log('Setup the ungrouped message queue.');
redisMsgQueue.createRedisMsgQueue(app, 'unqueue', function (error, myQueue) {
if (error) {
@@ -27,13 +29,24 @@ redisMsgQueue.createRedisMsgQueue(app, 'unqueue', function (error, myQueue) {
unqueue = { msgQueue: myQueue };
// Register the message queue workers
- for (var i = 0; i < 4; i++) {
+ for (var i = 0; i < 2; i++) {
unqueue.msgQueue.register(function (myTask, callback) {
console.log('One of five ungrouped workers - processing task ' + util.inspect(myTask) + '.');
// Assume that some work is being done at this time
return setTimeout(function (myTask, callback) {
console.log('One of five ungrouped workers - processing task ' + util.inspect(myTask) + '.');
+ if (myTask.number) {
+ // Add a new tasks to the ungrouped message queue from within the worker
+ var task = { name:'un-worker-task ' + myTask.number };
+ unqueue.msgQueue.enqueue(undefined, task, function (error, myTask) {
+ if (error) {
+ console.warn('The example was unable to add the worker task ' + util.inspect(task) +
+ ' to the ungrouped message queue - ' + util.inspect(myTask) +
+ ', ' + util.inspect(error) + '.');
+ }
+ });
+ }
return callback();
}, 2500, myTask, callback);
@@ -60,19 +73,108 @@ redisMsgQueue.createRedisMsgQueue(app, 'unqueue', function (error, myQueue) {
}
});
+// ----------------------------------------------------------------------------------------------------
+// Setup the abc123 grouped message queue
+// ----------------------------------------------------------------------------------------------------
+console.log('Setup the abc123 grouped message queue.');
+redisMsgQueue.createRedisMsgQueue(app, 'abc123', function (error, myQueue) {
+ if (error) {
+ console.warn('The example code was unable to create the abc123 grouped message queue - ' + util.inspect(error) + '.');
+ }
+ // Assign the returned queue to the example grouped message queue
+ queue = myQueue;
+
+ // Register the abc123 message queue worker
+ queue.register(function (myTask, callback) {
+ console.log('The abc123 grouped worker - processing task ' + util.inspect(myTask) + '.');
+ if (myTask.number && myTask.number < 5) {
+ // Add a new tasks to the ungrouped message queue from within the worker
+ var task = { name:'abc123-worker-task ' + (myTask.number + 1), number: (myTask.number + 1) };
+ queue.enqueue('xyz789', task, function (error, myTask) {
+ if (error) {
+ return console.warn('The example was unable to add the worker task ' + util.inspect(task) +
+ ' to the abc123 grouped message queue - ' + util.inspect(myTask) +
+ ', ' + util.inspect(error) + '.');
+ }
+ return console.log('The abc123 grouped worker added task ' + util.inspect(myTask) + ' to the abc123 grouped message queue.');
+ });
+ }
+
+ // Assume that some work is being done at this time
+ return setTimeout(function (myTask, callback) {
+ console.log('The abc123 grouped worker - completed processing task ' + util.inspect(myTask) + '.');
+ if (myTask.number && myTask.number < 5) {
+ // Add a new tasks to the ungrouped message queue from within the worker
+ var task = { name:'abc123-worker-task ' + (myTask.number + 101), number: (myTask.number + 101) };
+ queue.enqueue('xyz789', task, function (error, myTask) {
+ if (error) {
+ return console.warn('The example was unable to add the worker task ' + util.inspect(task) +
+ ' to the abc123 grouped message queue - ' + util.inspect(myTask) +
+ ', ' + util.inspect(error) + '.');
+ }
+ return console.log('The abc123 grouped worker added task ' + util.inspect(myTask) + ' to the abc123 grouped message queue.');
+ });
+ } else if (myTask.number && myTask.number >= 100 && myTask.number < 105) {
+ // Add a new tasks to the ungrouped message queue from within the worker
+ var task = { name:'abc123-worker-task ' + (myTask.number + 1001), number: (myTask.number + 1001) };
+ queue.enqueue('xyz789', task, function (error, myTask) {
+ if (error) {
+ return console.warn('The example was unable to add the worker task ' + util.inspect(task) +
+ ' to the abc123 grouped message queue - ' + util.inspect(myTask) +
+ ', ' + util.inspect(error) + '.');
+ }
+ return console.log('The abc123 grouped worker added task ' + util.inspect(myTask) + ' to the abc123 grouped message queue.');
+ });
+ }
+ return callback();
+ }, 250, myTask, callback);
+ }, function (error, worker) {
+ if (error) {
+ return console.warn('The example code was unable to register the abc123 grouped message queue worker ' +
+ ' - ' + util.inspect(error) + '.');
+ }
+ return console.log('Registered abc123 grouped message queue worker.');
+ });
+
+ // Add a single task to the example abc123 grouped message queue
+ console.log('Adding a single task to the abc123 grouped message queue.');
+ var task = { name:'abc123-task 1', number: 1 };
+ return myQueue.enqueue('xyz789', task, function (error, myTask) {
+ if (error) {
+ return console.warn('The example was unable to add the task ' + util.inspect(task) +
+ ' to the abc123 grouped message queue - ' + util.inspect(myTask) +
+ ', ' + util.inspect(error) + '.');
+ }
+ return console.log('Added task ' + util.inspect(myTask) + ' to the abc123 grouped message queue.');
+ });
+});
+
+// ----------------------------------------------------------------------------------------------------
// Setup the grouped message queues
+// ----------------------------------------------------------------------------------------------------
console.log('Setup each grouped message queue.');
queues.forEach(function (queue) {
queue.msgQueue = redisMsgQueue.createRedisMsgQueue(app, queue.name);
// Register the message queue workers
- for (var i = 0; i < 3; i++) {
+ for (var i = 0; i < 2; i++) {
queue.msgQueue.register(function (myTask, callback) {
console.log('One of five grouped workers for ' + queue.name + ' - processing task ' + util.inspect(myTask) + '.');
// Assume that some work is being done at this time
return setTimeout(function (myTask, callback) {
console.log('One of five grouped workers for ' + queue.name + ' - processing task ' + util.inspect(myTask) + '.');
+ if (myTask.number) {
+ // Add a new tasks to the ungrouped message queue from within the worker
+ var task = { name:'worker-task ' + myTask.number };
+ queue.msgQueue.enqueue(undefined, task, function (error, myTask) {
+ if (error) {
+ console.warn('The example was unable to add the worker task ' + util.inspect(task) +
+ ' to the ungrouped message queue - ' + util.inspect(myTask) +
+ ', ' + util.inspect(error) + '.');
+ }
+ });
+ }
return callback();
}, 500, myTask, callback);
View
4 lib/redisMsgQueue.js
@@ -318,7 +318,7 @@ RedisMsgQueue.prototype.enqueue = function (group, task, callback) {
' was unable to add the task for the message group ' + group + '.');
}
// Return success via callback
- return callback();
+ return callback(null, task);
});
};
@@ -533,7 +533,7 @@ RedisMsgQueue.prototype._process = function (workerName, workerRedis, msgWorker,
locked = Date.parse(replies[1]);
} else {
console.warn('RedisMsgQueue._process() The redis msg queue ' + msgQueue.msgQueueName + ' was unable to read the' +
- ' locked datetime in the app:queue:group hash ' + randAppQueueGroup + ' - reply ' + util.inspect(reply) +
+ ' locked datetime in the app:queue:group hash ' + randAppQueueGroup + ' - reply ' + util.inspect(replies[1]) +
', error ' + util.inspect(error) + '.');
return workerRedis.unwatch(function (error, reply) {
if (error || reply !== 'OK') {
Please sign in to comment.
Something went wrong with that request. Please try again.