Permalink
Browse files

Removed the extra set of the key_app:queue:group as this should never…

… have been needed and did not turn out to affect the behavior.
  • Loading branch information...
1 parent 139b8dd commit 602ff3faf67a54e7f148be625c9cb10f9d1c3e36 @danpres14 danpres14 committed Nov 5, 2012
Showing with 9 additions and 40 deletions.
  1. +9 −40 lib/redisMsgQueue.js
View
@@ -277,8 +277,6 @@ RedisMsgQueue.prototype.enqueue = function (group, task, callback) {
multi.lpush('list_' + msgGroupName, taskJsonString);
// Increment the number of tasks on the app:queue:group hash
multi.hincrby('hash_' + msgGroupName, 'tasks', 1);
- // Increment the number of tasks on the app:queue:group key
- multi.incrby('key_' + msgGroupName, 1);
// Add the app:queue:group to the app:queue set - this will only
// add the app:queue:group if it does not already exist in the set
multi.sadd('set_' + msgQueueName, msgGroupName);
@@ -291,7 +289,7 @@ RedisMsgQueue.prototype.enqueue = function (group, task, callback) {
' was unable to add the task for the message group ' + group + '.');
}
// Make sure each step was successful
- if (!_.isArray(replies) || replies.length !== 4) {
+ if (!_.isArray(replies) || replies.length !== 3) {
console.warn('RedisMsgQueue.enqueue() The redis msg queue ' + msgQueueName +
' was unable to add the task for the message group ' + msgGroupName +
' - the status of one or more steps was not reported, replies ' + util.inspect(replies) + '.');
@@ -312,17 +310,10 @@ RedisMsgQueue.prototype.enqueue = function (group, task, callback) {
return callback('The redis msg queue ' + queueName +
' was unable to add the task for the message group ' + group + '.');
}
- if (!_.isFinite(replies[2]) || replies[2] <= 0) {
+ if (!_.isFinite(replies[2]) || (replies[2] !== 0 && replies[2] !== 1)) {
console.warn('RedisMsgQueue.enqueue() The redis msg queue ' + msgQueueName +
' was unable to add the task for the message group ' + msgGroupName +
- ' - the key reported a zero or negative value (number of tasks) after adding this task to the number, reply ' + util.inspect(replies[2]) + '.');
- return callback('The redis msg queue ' + queueName +
- ' was unable to add the task for the message group ' + group + '.');
- }
- if (!_.isFinite(replies[3]) || (replies[3] !== 0 && replies[3] !== 1)) {
- console.warn('RedisMsgQueue.enqueue() The redis msg queue ' + msgQueueName +
- ' was unable to add the task for the message group ' + msgGroupName +
- ' - the set reported adding more or less than zero or one message group to the message groups, reply ' + util.inspect(replies[3]) + '.');
+ ' - the set reported adding more or less than zero or one message group to the message groups, reply ' + util.inspect(replies[2]) + '.');
return callback('The redis msg queue ' + queueName +
' was unable to add the task for the message group ' + group + '.');
}
@@ -568,8 +559,6 @@ RedisMsgQueue.prototype._process = function (workerName, workerRedis, msgWorker,
var multi = workerRedis.multi();
// Delete the app:queue:group hash
multi.del('hash_' + randAppQueueGroup);
- // Delete the app:queue:group key
- multi.del('key_' + randAppQueueGroup);
// Delete the app:queue:group list
multi.del('list_' + randAppQueueGroup);
// Remove the app:queue:group from the set
@@ -586,7 +575,7 @@ RedisMsgQueue.prototype._process = function (workerName, workerRedis, msgWorker,
return msgQueue._timeoutProcess(workerName, workerRedis, msgWorker, msgQueue, 1);
}
// Make sure each step was successful
- if (!_.isArray(replies) || replies.length !== 4) {
+ if (!_.isArray(replies) || replies.length !== 3) {
console.warn('RedisMsgQueue._process() The redis msg queue ' + msgQueue.msgQueueName +
' was unable to cleanup the data store for the msg group ' + randAppQueueGroup +
' - the status of one or more steps was not reported, replies ' + util.inspect(replies) + '.');
@@ -608,7 +597,7 @@ RedisMsgQueue.prototype._process = function (workerName, workerRedis, msgWorker,
if (!_.isFinite(reply) || (reply !== 0 && reply !== 1)) {
console.warn('RedisMsgQueue._process() The redis msg queue ' + msgQueue.msgQueueName +
' was unable to cleanup the data store for the msg group ' + randAppQueueGroup +
- ' - the delete app:queue:group hash indicated more or less than one delete, reply ' + util.inspect(replies[1]) + '.');
+ ' - the delete app:queue:group list indicated more or less than one delete, reply ' + util.inspect(replies[1]) + '.');
}
reply = replies[2];
if (_.isString(reply) && !_.isEmpty(reply) && _.isFinite(parseInt(reply))) {
@@ -617,30 +606,19 @@ RedisMsgQueue.prototype._process = function (workerName, workerRedis, msgWorker,
if (!_.isFinite(reply) || (reply !== 0 && reply !== 1)) {
console.warn('RedisMsgQueue._process() The redis msg queue ' + msgQueue.msgQueueName +
' was unable to cleanup the data store for the msg group ' + randAppQueueGroup +
- ' - the delete app:queue:group list indicated more or less than one delete, reply ' + util.inspect(replies[2]) + '.');
- }
- reply = replies[3];
- if (_.isString(reply) && !_.isEmpty(reply) && _.isFinite(parseInt(reply))) {
- reply = parseInt(reply);
- }
- if (!_.isFinite(reply) || (reply !== 0 && reply !== 1)) {
- console.warn('RedisMsgQueue._process() The redis msg queue ' + msgQueue.msgQueueName +
- ' was unable to cleanup the data store for the msg group ' + randAppQueueGroup +
- ' - the remove app:queue:group from the set indicated more or less than one remove, reply ' + util.inspect(replies[3]) + '.');
+ ' - the remove app:queue:group from the set indicated more or less than one remove, reply ' + util.inspect(replies[2]) + '.');
}
return msgQueue._timeoutProcess(workerName, workerRedis, msgWorker, msgQueue, msgQueue.pollInterval);
});
}
// Read and pop the task (in json string format) from the right end of the queue
- var stepCount = 3; // First two steps - rpop and hincrby
+ var stepCount = 2; // First two steps - rpop and hincrby
var multi = workerRedis.multi();
// Read and pop the task from the right end of the app:queue:group list
multi.rpop('list_' + randAppQueueGroup);
// Decrement the number of tasks on the app:queue:group hash
multi.hincrby('hash_' + randAppQueueGroup, 'tasks', -1);
- // Decrement the number of tasks on the app:queue:group key
- multi.incrby('key_' + randAppQueueGroup, -1);
// Set the locked datetime only if the group was specified with the task
if (randAppQueueGroup !== _setMsgGroupName(msgQueue.appName, msgQueue.queueName))
{
@@ -681,24 +659,15 @@ RedisMsgQueue.prototype._process = function (workerName, workerRedis, msgWorker,
' was unable to read a task from the msg group ' + randAppQueueGroup +
' - the list returned an invalid number of tasks, reply ' + util.inspect(replies[1]) + '.');
}
- reply = replies[2];
- if (_.isString(reply) && !_.isEmpty(reply) && _.isFinite(parseInt(reply))) {
- reply = parseInt(reply);
- }
- if (!_.isFinite(reply) || reply < 0) {
- console.warn('RedisMsgQueue._process() The redis msg queue ' + msgQueue.msgQueueName +
- ' was unable to read a task from the msg group ' + randAppQueueGroup +
- ' - the list returned an invalid number of tasks, reply ' + util.inspect(replies[2]) + '.');
- }
if (randAppQueueGroup !== _setMsgGroupName(msgQueue.appName, msgQueue.queueName)) {
- reply = replies[3];
+ reply = replies[2];
if (_.isString(reply) && !_.isEmpty(reply) && _.isFinite(parseInt(reply))) {
reply = parseInt(reply);
}
if (!_.isFinite(reply) || (reply !== 0 && reply !== 1)) {
console.warn('RedisMsgQueue._process() The redis msg queue ' + msgQueue.msgQueueName +
' was unable to read a task from the msg group ' + randAppQueueGroup +
- ' - the hash key \'locked\' indicated that it was not set, reply ' + util.inspect(replies[3]) + '.');
+ ' - the hash key \'locked\' indicated that it was not set, reply ' + util.inspect(replies[2]) + '.');
}
}

0 comments on commit 602ff3f

Please sign in to comment.