Skip to content

Commit

Permalink
Fixed a couple typos and commented additional logging.
Browse files Browse the repository at this point in the history
  • Loading branch information
danpres14 committed Nov 5, 2012
1 parent 602ff3f commit feed495
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 16 deletions.
40 changes: 25 additions & 15 deletions examples/example.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,19 @@ redisMsgQueue.createRedisMsgQueue(app, 'unqueue', function (error, myQueue) {
console.warn('The example code was unable to create the ungrouped message queue - ' + util.inspect(error) + '.');
}
// Assign the returned queue to the example ungrouped message queue
unqueue = myQueue;
unqueue = { msgQueue: myQueue };

// Register the message queue workers
for (var i = 0; i < 2; i++) {
unqueue.register(function (myTask, callback) {
console.log('Worker ' + i + ' - processing task ' + util.inspect(myTask) + '.');
callback();
for (var i = 0; i < 4; i++) {
unqueue.msgQueue.register(function (myTask, callback) {
console.log('One of five ungrouped workers - processing task ' + util.inspect(myTask) + '.');

// Assume that some work is being done at this time
return setTimeout(function (myTask, callback) {
console.log('One of five ungrouped workers - processing task ' + util.inspect(myTask) + '.');
return callback();
}, 2500, myTask, callback);

}, function (error, worker) {
if (error) {
console.warn('The example code was unable to register the ungrouped message queue worker ' + i +
Expand All @@ -42,9 +48,9 @@ redisMsgQueue.createRedisMsgQueue(app, 'unqueue', function (error, myQueue) {

// Add a few tasks to the example ungrouped message queue
console.log('Adding a few tasks to the ungrouped message queue.');
for (var i = 0; i < 1; i++) {
var task = { name:'un-task ' + i, number: i };
unqueue.enqueue(undefined, task, function (error, myTask) {
for (var j = 0; j < 500; j++) {
var task = { name:'un-task ' + j, number: j };
unqueue.msgQueue.enqueue(undefined, task, function (error, myTask) {
if (error) {
console.warn('The example was unable to add the task ' + util.inspect(task) +
' to the ungrouped message queue - ' + util.inspect(myTask) +
Expand All @@ -55,16 +61,21 @@ redisMsgQueue.createRedisMsgQueue(app, 'unqueue', function (error, myQueue) {
});

// Setup the grouped message queues
/*
console.log('Setup each grouped message queue.');
queues.forEach(function (queue) {
queue.msgQueue = redisMsgQueue.createRedisMsgQueue(app, queue.name);

// Register the message queue workers
for (var i = 0; i < 1; i++) {
for (var i = 0; i < 3; i++) {
queue.msgQueue.register(function (myTask, callback) {
console.log('Worker ' + i + ' - processing task ' + util.inspect(myTask) + '.');
callback();
console.log('One of five ungrouped workers for ' + queue.name + ' - processing task ' + util.inspect(myTask) + '.');

// Assume that some work is being done at this time
return setTimeout(function (myTask, callback) {
console.log('One of five ungrouped workers for ' + queue.name + ' - processing task ' + util.inspect(myTask) + '.');
return callback();
}, 500, myTask, callback);

}, function (error, worker) {
if (error) {
console.warn('The example code was unable to register the grouped message queue worker ' + i +
Expand All @@ -77,8 +88,8 @@ queues.forEach(function (queue) {
// Add a few tasks to the each grouped message queue
console.log('Adding a few tasks to the grouped message queue.');
groups.forEach(function (group) {
for (var i = 0; i < 5; i++) {
var task = { name:'task ' + i, number: i };
for (var j = 0; j < 250; j++) {
var task = { name:'task ' + j, number: j, queue: queue.name, group: group };
queue.msgQueue.enqueue(group, task, function (error, myTask) {
if (error) {
console.warn('The example was unable to add the task ' + util.inspect(task) +
Expand All @@ -89,4 +100,3 @@ queues.forEach(function (queue) {
}
});
});
*/
7 changes: 6 additions & 1 deletion lib/redisMsgQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@ RedisMsgQueue.prototype._process = function (workerName, workerRedis, msgWorker,
});
}
if (_.isDate(locked)) {
//console.log('Worker ' + workerName + ' - test point - locked');
return workerRedis.unwatch(function (error, reply) {
if (error || reply !== 'OK') {
console.warn('RedisMsgQueue._process() The redis msg queue ' + msgQueue.msgQueueName +
Expand All @@ -555,6 +556,7 @@ RedisMsgQueue.prototype._process = function (workerName, workerRedis, msgWorker,
});
}
if (tasks === 0) {
//console.log('Worker ' + workerName + ' - test point - clean up tasks === 0');
// Cleanup the data store if there are no more tasks in the app:queue:group
var multi = workerRedis.multi();
// Delete the app:queue:group hash
Expand Down Expand Up @@ -612,6 +614,7 @@ RedisMsgQueue.prototype._process = function (workerName, workerRedis, msgWorker,
});
}

//console.log('Worker ' + workerName + ' - test point - just before read and pop task');
// Read and pop the task (in json string format) from the right end of the queue
var stepCount = 2; // First two steps - rpop and hincrby
var multi = workerRedis.multi();
Expand All @@ -635,6 +638,7 @@ RedisMsgQueue.prototype._process = function (workerName, workerRedis, msgWorker,

// Check for non-execution due to watched key changing
if (_.isNull(replies)) {
//console.log('Worker ' + workerName + ' - test point - multi returned null multi-bulk due to watch');
return msgQueue._timeoutProcess(workerName, workerRedis, msgWorker, msgQueue, 1);
}
// Make sure each step was successful
Expand All @@ -648,7 +652,7 @@ RedisMsgQueue.prototype._process = function (workerName, workerRedis, msgWorker,
console.warn('RedisMsgQueue._process() The redis msg queue ' + msgQueue.msgQueueName +
' was unable to read a task from the msg group ' + randAppQueueGroup +
' - the list returned an invalid task, reply ' + util.inspect(replies[0]) + '.');
return msgQueue._timeoutProcess(workerName, workerRedis, workerRedis, msgWorker, msgQueue, 1);
return msgQueue._timeoutProcess(workerName, workerRedis, msgWorker, msgQueue, 1);
}
var reply = replies[1];
if (_.isString(reply) && !_.isEmpty(reply) && _.isFinite(parseInt(reply))) {
Expand All @@ -671,6 +675,7 @@ RedisMsgQueue.prototype._process = function (workerName, workerRedis, msgWorker,
}
}

//console.log('Worker ' + workerName + ' - test point - just before execute worker');
// Execute the worker with the task
var task = _.isObject(replies[0]) ? replies[0] : JSON.parse(replies[0]);
return msgWorker(task, function(error, myTask) {
Expand Down

0 comments on commit feed495

Please sign in to comment.