From ac1e1bd9b1b0fc665bcf5e60d9a049f4ec215b32 Mon Sep 17 00:00:00 2001 From: Geoff Wagstaff Date: Fri, 16 Aug 2013 15:17:45 +0100 Subject: [PATCH] Configurable worker concurrency control --- README.md | 36 +++++++++++--- lib/job.js | 6 +-- lib/queue.js | 135 +++++++++++++++++++++++++++++++++++--------------- lib/worker.js | 31 ++++++------ test/queue.js | 14 +++++- 5 files changed, 153 insertions(+), 69 deletions(-) diff --git a/README.md b/README.md index bda40c9..74bdf03 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ Convoy is a Node.JS module for working with a Redis-backed job queue. -It is designed to be distributed and atomic, orchestrating the queuing, delegation and processing of jobs with unique IDs. +It is designed to be distributed and atomic, orchestrating the queuing, delegation and processing of jobs with unique IDs. This means that you can have multiple job publishers and multiple consumers for the same queues, even across many servers, and convoy will ensure that unique jobs only get queued once at a time, and delegated to a single worker until queued again. @@ -11,18 +11,32 @@ This means that you can have multiple job publishers and multiple consumers for ### Usage -```` +#### Options + +```javascript +var opts = { + concurrency: 10, // Spawn up to a maximum of 10 concurrent workers + jobTimeout: 2000 // If a worker does not finish within this time (in ms), its job will be considered failed +}; +``` + +```javascript var Convoy = require('redis-convoy'); +// Set up options +var opts = { + concurrency: 10, + jobTimeout: 2000 +}; + // Create a queue object -var q = Convoy.createQueue('monsterTrucks'); +var q = Convoy.createQueue('monsterTrucks', opts); // Set up our job. Each job must have an ID var jobID = 1; var job = new Convoy.Job(jobID); -// Queue the job once only. If another instance of convoy tries to add a job of the same ID at the same time before any workers process it, it won't get duplicated in the queue - +// Queue the job, only if a job with the same ID already exists in the queue q.addJob(job); // Set up a worker @@ -31,14 +45,22 @@ q.process(function(job, done){ done(); // or done('an error') if error during processing of the job }); -```` +// Clear out jammed jobs +q.jamGuard(5, function(err, jammedJobs){ + console.log(jammedJobs); +}); +``` ### Running tests Make sure you have a local redis running on localhost:6379 (or change these settings in config/default.js), then run: make test +### TODO +Potential features to come: + +* Job payload: Store additional data with a job. ProTip: in the meantime try ```javascript var jobID = JSON.stringify(obj);``` #### Inspiration -Convoy was inspired by TJ Holowaychuk's [kue](https://github.com/LearnBoost/kue) module. I was using Kue, but was caught up with some problems when workers did not fully ack the job, causing it to get stuck in the active/inactive lists. Additionally, kue did not seem to offer convenient support for ensuring unique jobs only get queued once, which is the main focus of convoy. \ No newline at end of file +Convoy was inspired by TJ Holowaychuk's [kue](https://github.com/LearnBoost/kue) module. I was using Kue, but was caught up with some problems when workers did not fully ack the job, causing it to get stuck in the active/inactive lists. Additionally, kue did not seem to offer convenient support for ensuring unique jobs only get queued once, which is the main focus of convoy. diff --git a/lib/job.js b/lib/job.js index 37e5ab0..b60db78 100644 --- a/lib/job.js +++ b/lib/job.js @@ -1,9 +1,7 @@ var Job = function(id){ - if(!id){ - throw new Error('Jobs must have an ID'); - } + if(!id) throw new Error('Jobs must have an ID'); this.id = id; }; -module.exports = Job; \ No newline at end of file +module.exports = Job; diff --git a/lib/queue.js b/lib/queue.js index f5cd900..d5e4c58 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -9,7 +9,14 @@ var prefix = config.keys.prefix; var Queue = function(name, opts){ this.name = name; - this.opts = opts; + this.opts = { + concurrentWorkers: 1, + jobTimeout: 5000 + }; + + for(var optionName in opts){ + this.opts[optionName] = opts[optionName]; + } this.redisClients = { queueClient: opts.redis.createClient(), @@ -22,63 +29,112 @@ var Queue = function(name, opts){ this.workerClient = this.redisClients.workerClient; this.processing = true; + this.workersRunning = 0; }; module.exports = Queue; Queue.prototype.addJob = function(job, cb){ var self = this; - if(!cb) - cb = function(){}; + if(!cb) cb = function(){}; // Set job in committed list this.client.sadd(helpers.key(this.name+':committed'), job.id, function(err, added){ - if(err) - return cb(err); + if(err) return cb(err); if(!added){ // Job is already in the system so we won't try to queue it. It could be queued or processing // It's tempting to just zscore the processing list, but that's not atomic and there is a chance - // that the job is with a worker, but hasn't yet been set in the processing set when we try to queue it again. + // that the job is with a worker, but hasn't yet been added to the processing set before we might try queuing it again debug('job already committed'); return cb('committed'); } // Queue job self.client.rpush(helpers.key(self.name+':queued'), job.id, function(err, added){ - if(err) - return cb(err); + if(err) return cb(err); - cb(null, added); + return cb(err, added); }); }); }; -Queue.prototype.startProcessing = function(fn) { - this.processing = true; +Queue.prototype.startProcessing = function(cb) { + var self = this; + self.processing = true; - this.process(fn); + self.processJob(cb); }; -/* - Spawn a worker for each job -*/ - -Queue.prototype.process = function(fn){ +/** + * Checks if we should fetch a job from the queue based on concurrency settings + * Fetches the job, spawns a worker and passes the job to the worker + * @param {Function} usrCb User function to invoke with job + */ +Queue.prototype.processJob = function(usrCb) { var self = this; - // Check if we're still meant to be processing the queue before entering blocked state - if(!self.processing) - return; + if(!self.processing) return; + if(self.workersRunning >= self.opts.concurrency) return; + + self.fetchJob(function(err, job){ + if(err){ + debug('Failed to fetch job'); + setTimeout(function(){ + self.processJob(usrCb); + }, 1000); + } + + self.spawnWorker(job, function(err, worker){ + // Worked failed to start up + if(err){ + debug('Worker failed to start up'); + self.endWorker(worker); + // Try processing another job + return self.processJob(usrCb); + } + + var usrDone = function(err, workerCb){ + worker.processed(err, workerCb); + self.endWorker(worker); + self.processJob(usrCb); + }; + + var jobTimeout; + var jobTTL = self.opts.jobTimeout; + + if(jobTTL > 0) { + // Set up a timeout in case the user function never calls back to us + jobTimeout = setTimeout(function(){ + debug('Warning: Queue processing function did not call done() within %dms. Job is now considered failed.', jobTTL); + return usrDone('timeout'); + }, jobTTL); + } - this.fetchJob(function(err, job){ - var worker = new Worker(self, job); - worker.start(fn); + // Pass job to user function + run + return usrCb(job, function(err, workerCb){ + if(jobTimeout) clearTimeout(jobTimeout); + return usrDone(err, workerCb); + }); + }); + }); +}; - self.process(fn); +Queue.prototype.spawnWorker = function(job, cb) { + var self = this; + var workerTTL = 1000; + self.workersRunning += 1; + var worker = new Worker(self, job); + return worker.start(function(err){ + return cb(err, worker); }); }; +Queue.prototype.endWorker = function() { + var self = this; + self.workersRunning -= 1; +}; + Queue.prototype.stopProcessing = function() { // If blocked, client will still invoke its fetchJob callback with a job // once it is released from blpop, but will not go back into blocking state @@ -89,12 +145,12 @@ Queue.prototype.stopProcessing = function() { Queue.prototype.fetchJob = function(cb){ var self = this; + var key = helpers.key(this.name+':queued'); this.blocked = true; this.queueClient.blpop(key, 0, function(err, entry){ self.blocked = false; - if(err) - return cb(err); + if(err) return cb(err); var jobID = entry[1]; if(jobID === null){ @@ -102,7 +158,7 @@ Queue.prototype.fetchJob = function(cb){ return cb(); } var job = new Job(jobID); - return cb(null, job); + return cb(err, job); }); }; @@ -141,8 +197,7 @@ Queue.prototype.clearJammedJobs = function(idleTime, cb) { to prevent more than one guard from completing the task */ this.setLock('clearJammedJobs', 5, function(err, lockSet, lockKey){ - if(err) - return cb(err); + if(err) return cb(err); if(!lockSet){ debug('another convoy is clearing jammed jobs'); @@ -188,8 +243,7 @@ Queue.prototype.fetchJammedJobs = function(idleTime, cb) { var range = Math.round(now - idleTime); self.client.zrangebyscore(helpers.key(self.name+':processing'), 0, range, function(err, members){ - if(err) - debug(err); + if(err) debug(err); return cb(err, members); }); @@ -219,15 +273,15 @@ Queue.prototype.unlock = function(key, cb) { * When finished closing, the queue object should no longer be used */ Queue.prototype.close = function(cb) { - if(!cb) + if(!cb){ cb = function(){ debug('All redis clients closed'); }; + } var pendingClients = Object.keys(this.redisClients).length; var done = function(){ - if(!--pendingClients) - cb(); + if(!--pendingClients) cb(); }; // When we are blocked, we're waiting for redis to send a response, so must terminate the client @@ -236,15 +290,16 @@ Queue.prototype.close = function(cb) { this.queueClient.end(); done(); } - else + else{ this.queueClient.quit(done); + } } - else + else { done(); + } for(var i in this.redisClients){ - if(i == 'queueClient') - continue; + if(i == 'queueClient') continue; if(!this.redisClients[i]){ done(); @@ -261,8 +316,8 @@ Queue.prototype.close = function(cb) { */ Queue.prototype.end = function() { for(var i in this.redisClients){ - if(!this.redisClients[i]) - continue; + if(!this.redisClients[i]) continue; + var client = this.redisClients[i]; client.quit(); } diff --git a/lib/worker.js b/lib/worker.js index b0e978c..4af7d67 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -4,22 +4,20 @@ var helpers = require('./helpers'); var Worker = function(queue, job){ this.queue = queue; this.client = queue.workerClient; - if(job) - this.job = job; + if(job) this.job = job; }; module.exports = Worker; -Worker.prototype.start = function(fn) { +Worker.prototype.start = function(cb) { var self = this; self.processing(function(err){ if(err){ - return self.processed(err); + return self.fail(err, cb); } - // Invoke user's job-processing function - return fn(self.job, self.processed.bind(self)); + return cb(err); }); }; @@ -42,17 +40,17 @@ Worker.prototype.notProcessing = function(cb) { Invoked at end of user processing */ Worker.prototype.processed = function(err, cb) { - if(err) - return this.fail(err, cb); - // If job completes without error, remove it from processing - this.notProcessing(function(err){ + if(!cb) cb = function(){}; + + if(err) return this.fail(err, cb); + + return this.notProcessing(function(err){ if(err){ return debug(err); } - if(cb) - cb(); + return cb(); // TODO: perhaps have an option to maintain a list of completed jobs }); @@ -60,13 +58,12 @@ Worker.prototype.processed = function(err, cb) { Worker.prototype.fail = function(jobError, cb) { var self = this; - if(!cb) - cb = function(){}; + if(!cb) cb = function(){}; + // Job has failed, remove it from processing and add it to failed this.notProcessing(function(err){ self.client.zincrby(helpers.key(self.queue.name+':failed'), 1, self.job.id, function(err, res){ - if(err) - return debug(err); + if(err) return debug(err); self.logError(jobError, cb); }); @@ -81,4 +78,4 @@ Worker.prototype.logError = function(message, cb) { .lpush(key, message) .expire(key, 86400) .exec(cb); -}; \ No newline at end of file +}; diff --git a/test/queue.js b/test/queue.js index 83a0a7d..240830f 100644 --- a/test/queue.js +++ b/test/queue.js @@ -84,6 +84,17 @@ describe('Enqueing jobs', function(done){ done(); }); }); + + it('only queues the same job once', function(done){ + q.addJob(job, function(err){ + err.should.equal('committed'); + client.llen(helpers.key(q.name+':queued'), function(err, listLength){ + should.not.exist(err); + (+listLength).should.equal(1); + done(); + }); + }); + }); }); describe('Processing jobs', function(){ @@ -92,7 +103,8 @@ describe('Processing jobs', function(){ q = Convoy.createQueue('the22ndLetter'); var returned = false; var cb = function(j, p){ - job = j, processed = p; + job = j; + processed = p; done(); };