Skip to content

Commit

Permalink
Fixed a problem with the tasks checking with the latest isFinite chec…
Browse files Browse the repository at this point in the history
…k where a string '0' value was matching the isFinite check.
  • Loading branch information
danpres14 committed Dec 27, 2012
1 parent 0442fea commit 02fe2b3
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 23 deletions.
6 changes: 3 additions & 3 deletions examples/example.js
Expand Up @@ -88,7 +88,7 @@ msgQueue.createRedisMsgQueue(app, 'abc123', function (error, myQueue) {
queue.register(function (myTask, callback) {
console.log('The abc123 grouped worker - processing task ' + util.inspect(myTask) + '.');
if (myTask.number && myTask.number < 5) {
// Add a new tasks to the ungrouped message queue from within the worker
// Add a new tasks to the abc123 grouped message queue from within the worker
var task = { name:'abc123-worker-task ' + (myTask.number + 1), number: (myTask.number + 1) };
queue.enqueue('xyz789', task, function (error, myTask) {
if (error) {
Expand All @@ -104,7 +104,7 @@ msgQueue.createRedisMsgQueue(app, 'abc123', function (error, myQueue) {
return setTimeout(function (myTask, callback) {
console.log('The abc123 grouped worker - completed processing task ' + util.inspect(myTask) + '.');
if (myTask.number && myTask.number < 5) {
// Add a new tasks to the ungrouped message queue from within the worker
// Add a new tasks to the abc123 grouped message queue from within the worker
var task = { name:'abc123-worker-task ' + (myTask.number + 101), number: (myTask.number + 101) };
queue.enqueue('xyz789', task, function (error, myTask) {
if (error) {
Expand All @@ -115,7 +115,7 @@ msgQueue.createRedisMsgQueue(app, 'abc123', function (error, myQueue) {
return console.log('The abc123 grouped worker added task ' + util.inspect(myTask) + ' to the abc123 grouped message queue.');
});
} else if (myTask.number && myTask.number >= 100 && myTask.number < 105) {
// Add a new tasks to the ungrouped message queue from within the worker
// Add a new tasks to the abc123 grouped message queue from within the worker
var task = { name:'abc123-worker-task ' + (myTask.number + 1001), number: (myTask.number + 1001) };
queue.enqueue('xyz789', task, function (error, myTask) {
if (error) {
Expand Down
40 changes: 22 additions & 18 deletions lib/msgQueue.js
Expand Up @@ -230,6 +230,7 @@ var RedisMsgQueue = function (appName, queueName, redisOptions) {

// Set the poll interval, retry delay, and retry limit for this instance of the queue
this.pollInterval = 15000;
this.lockTimeout = 90000;
this.retryDelay = 1.25;
this.retryLimit = 1;

Expand Down Expand Up @@ -289,6 +290,7 @@ RedisMsgQueue.prototype.enqueue = function (group, task, callback) {
return callback('The redis msg queue ' + queueName +
' was unable to add the task for the message group ' + group + '.');
}

// Make sure each step was successful
if (!_.isArray(replies) || replies.length !== 3) {
console.warn('RedisMsgQueue.enqueue() The redis msg queue ' + msgQueueName +
Expand Down Expand Up @@ -392,6 +394,8 @@ RedisMsgQueue.prototype._process = function (workerName, workerRedis, msgWorker,
' - reply ' + util.inspect(reply) + ', error ' + util.inspect(error) + '.');
return msgQueue._timeoutProcess(workerName, workerRedis, msgWorker, msgQueue, msgQueue.pollInterval);
}

// Remove the app:queue set if the set is empty
if (!_.isString(reply) || _.isEmpty(reply)) {
return workerRedis.watch('set_' + msgQueue.msgQueueName, function(error, reply) {
if (error || reply !== 'OK') {
Expand Down Expand Up @@ -440,13 +444,14 @@ RedisMsgQueue.prototype._process = function (workerName, workerRedis, msgWorker,
}
// Check for non-execution due to watched key changing
if (_.isNull(replies)) {
return msgQueue._timeoutProcess(workerName, workerRedis, msgWorker, msgQueue, 0);
return msgQueue._timeoutProcess(workerName, workerRedis, msgWorker, msgQueue, 1);
}
if (!_.isArray(replies) || replies.length !== 1) {
console.warn('RedisMsgQueue._process() The redis msg queue ' + msgQueue.msgQueueName + 'was unable to remove the' +
' app:queue set ' + msgQueue.msgQueueName + ' - reply ' + util.inspect(replies) + '.');
return msgQueue._timeoutProcess(workerName, workerRedis, msgWorker, msgQueue, msgQueue.pollInterval);
}
// Make sure a single set instance was removed from the data store
var reply = replies[0];
if (_.isString(reply) && !_.isEmpty(reply) && _.isFinite(parseInt(reply))) {
reply = parseInt(reply);
Expand Down Expand Up @@ -506,16 +511,15 @@ RedisMsgQueue.prototype._process = function (workerName, workerRedis, msgWorker,
});
}

// Run the process loop again if the number of tasks is zero or if the locked datetime is set
// Run the process loop again if the number of tasks is zero or if the locked datetime is older than the lock timeout
var tasks = null;
if (_.isNull(replies[0])) {
} else if (_.isFinite(replies[0])) {
tasks = replies[0];
} else if (_.isString(replies[0]) && !_.isEmpty(replies[0]) && _.isFinite(parseInt(replies[0]))) {
tasks = parseInt(replies[0]);
// Do nothing - null values are valid
} else if (_.isString(replies[0]) && !_.isEmpty(replies[0]) && _.isFinite(Number(replies[0]))) {
tasks = Number(replies[0]);
} else {
console.warn('RedisMsgQueue._process() The redis msg queue ' + msgQueue.msgQueueName + ' was unable to read the' +
' number of tasks in the app:queue:group hash ' + randAppQueueGroup + ' - reply ' + util.inspect(replies[0]) +
' number of tasks in the app:queue:group hash ' + randAppQueueGroup + ' - reply[0] ' + util.inspect(replies) +
', error ' + util.inspect(error) + '.');
return workerRedis.unwatch(function (error, reply) {
if (error || reply !== 'OK') {
Expand All @@ -528,13 +532,12 @@ RedisMsgQueue.prototype._process = function (workerName, workerRedis, msgWorker,
}
var locked = null;
if (_.isNull(replies[1])) {
} else if (_.isDate(replies[1])) {
locked = replies[1];
} else if (_.isString(replies[1]) && !_.isEmpty(replies[1]) && _.isDate(Date.parse(replies[1]))) {
locked = Date.parse(replies[1]);
// Do nothing - null values are valid
} else if (_.isString(replies[1]) && !_.isEmpty(replies[1]) && _.isFinite(Date.parse(replies[1]))) {
locked = new Date(Date.parse(replies[1]));
} else {
console.warn('RedisMsgQueue._process() The redis msg queue ' + msgQueue.msgQueueName + ' was unable to read the' +
' locked datetime in the app:queue:group hash ' + randAppQueueGroup + ' - reply ' + util.inspect(replies[1]) +
' locked datetime in the app:queue:group hash ' + randAppQueueGroup + ' - reply[1] ' + util.inspect(replies) +
', error ' + util.inspect(error) + '.');
return workerRedis.unwatch(function (error, reply) {
if (error || reply !== 'OK') {
Expand All @@ -545,7 +548,8 @@ RedisMsgQueue.prototype._process = function (workerName, workerRedis, msgWorker,
return msgQueue._timeoutProcess(workerName, workerRedis, msgWorker, msgQueue, msgQueue.pollInterval);
});
}
if (_.isDate(locked)) {
if (_.isDate(locked) && (msgQueue.lockTimeout === 0 ||
locked.getTime() >= ((new Date()).getTime() - msgQueue.lockTimeout))) {
//console.log('Worker ' + workerName + ' - test point - locked');
return workerRedis.unwatch(function (error, reply) {
if (error || reply !== 'OK') {
Expand All @@ -556,8 +560,8 @@ RedisMsgQueue.prototype._process = function (workerName, workerRedis, msgWorker,
return msgQueue._timeoutProcess(workerName, workerRedis, msgWorker, msgQueue, 1);
});
}
if (tasks === 0) {
//console.log('Worker ' + workerName + ' - test point - clean up tasks === 0');
if (_.isNull(tasks) || (_.isFinite(tasks) && tasks === 0)) {
console.log('Worker ' + workerName + ' - test point - clean up tasks === 0');
// Cleanup the data store if there are no more tasks in the app:queue:group
var multi = workerRedis.multi();
// Delete the app:queue:group hash
Expand Down Expand Up @@ -652,7 +656,7 @@ RedisMsgQueue.prototype._process = function (workerName, workerRedis, msgWorker,
if (!_.isObject(replies[0]) && (!_.isString(replies[0]) || _.isEmpty(replies[0]))) {
console.warn('RedisMsgQueue._process() The redis msg queue ' + msgQueue.msgQueueName +
' was unable to read a task from the msg group ' + randAppQueueGroup +
' - the list returned an invalid task, reply ' + util.inspect(replies[0]) + '.');
' - the list returned an invalid task, reply[0] ' + util.inspect(replies) + '.');
return msgQueue._timeoutProcess(workerName, workerRedis, msgWorker, msgQueue, 1);
}
var reply = replies[1];
Expand All @@ -662,7 +666,7 @@ RedisMsgQueue.prototype._process = function (workerName, workerRedis, msgWorker,
if (!_.isFinite(reply) || reply < 0) {
console.warn('RedisMsgQueue._process() The redis msg queue ' + msgQueue.msgQueueName +
' was unable to read a task from the msg group ' + randAppQueueGroup +
' - the list returned an invalid number of tasks, reply ' + util.inspect(replies[1]) + '.');
' - the list returned an invalid number of tasks, reply[1] ' + util.inspect(replies) + '.');
}
if (randAppQueueGroup !== _setMsgGroupName(msgQueue.appName, msgQueue.queueName)) {
reply = replies[2];
Expand All @@ -672,7 +676,7 @@ RedisMsgQueue.prototype._process = function (workerName, workerRedis, msgWorker,
if (!_.isFinite(reply) || (reply !== 0 && reply !== 1)) {
console.warn('RedisMsgQueue._process() The redis msg queue ' + msgQueue.msgQueueName +
' was unable to read a task from the msg group ' + randAppQueueGroup +
' - the hash key \'locked\' indicated that it was not set, reply ' + util.inspect(replies[2]) + '.');
' - the hash key \'locked\' indicated that it was not set, reply[2] ' + util.inspect(replies) + '.');
}
}

Expand Down
4 changes: 2 additions & 2 deletions package.json
@@ -1,5 +1,5 @@
{
"version": "0.0.1",
"version": "0.0.2",
"name": "msg-queue",
"description": "A lightweight message queue using either mongodb or redis as the data store.",
"author": "Dan Prescott <danpres14@gmail.com> - Short Line Design Inc.",
Expand All @@ -10,7 +10,7 @@
},
"dependencies": {
"underscore": "1.4.x",
"mongodb": "1.1.x",
"mongodb": "1.2.x",
"redis": "0.8.x"
},
"devDependencies": {
Expand Down

0 comments on commit 02fe2b3

Please sign in to comment.