Permalink
Browse files

startProcessing() and stopProcessing() controls + tests

  • Loading branch information...
1 parent 48c9450 commit e612f4131a53786c89e3b090eca57a49c4ecdfb2 @TheDeveloper TheDeveloper committed Oct 14, 2012
Showing with 79 additions and 18 deletions.
  1. +40 −8 lib/queue.js
  2. +39 −10 test/queue.js
View
@@ -9,14 +9,19 @@ var prefix = config.keys.prefix;
var Queue = function(name, opts){
this.name = name;
+ this.opts = opts;
this.redisClients = {
- queueClient: false,
- workerClient: false
+ queueClient: opts.redis.createClient(),
+ workerClient: opts.redis.createClient()
};
- this.client = this.redisClients.queueClient;
+ // The queue client is used for blocking redis commands
+ this.queueClient = this.redisClients.queueClient;
+ this.client = this.redisClients.workerClient;
this.workerClient = this.redisClients.workerClient;
+
+ this.processing = true;
};
module.exports = Queue;
@@ -49,25 +54,44 @@ Queue.prototype.addJob = function(job, cb){
});
};
+Queue.prototype.startProcessing = function(fn) {
+ this.processing = true;
+
+ this.process(fn);
+};
+
/*
Spawn a worker for each job
*/
Queue.prototype.process = function(fn){
var self = this;
+ // Check if we're still meant to be processing the queue before entering blocked state
+ if(!self.processing)
+ return;
+
this.fetchJob(function(err, job){
var worker = new Worker(self, job);
worker.start(fn);
+
self.process(fn);
});
};
+Queue.prototype.stopProcessing = function() {
+ // If blocked, client will still invoke its fetchJob callback with a job
+ // once it is released from blpop, but will not go back into blocking state
+ // again until startProcessing is called.
+
+ this.processing = false;
+};
+
Queue.prototype.fetchJob = function(cb){
var self = this;
var key = helpers.key(this.name+':queued');
this.blocked = true;
- this.client.blpop(key, 0, function(err, entry){
+ this.queueClient.blpop(key, 0, function(err, entry){
self.blocked = false;
if(err)
return cb(err);
@@ -190,6 +214,7 @@ Queue.prototype.unlock = function(name, cb) {
/*
* Try to cleanly close clients after all commands are sent
+ * When finished closing, the queue object should no longer be used
*/
Queue.prototype.close = function(cb) {
if(!cb)
@@ -204,18 +229,25 @@ Queue.prototype.close = function(cb) {
};
// When we are blocked, we're waiting for redis to send a response, so must terminate the client
- if(this.client){
+ if(this.queueClient){
if(this.blocked){
- this.client.end();
+ this.queueClient.end();
done();
}
else
- this.client.quit(done);
+ this.queueClient.quit(done);
}
+ else
+ done();
for(var i in this.redisClients){
- if(i == 'queueClient' || !this.redisClients[i])
+ if(i == 'queueClient')
+ continue;
+
+ if(!this.redisClients[i]){
+ done();
continue;
+ }
var client = this.redisClients[i];
client.quit(done);
View
@@ -13,12 +13,8 @@ before(function(done){
});
describe('Setting up a queue', function(){
- it('Sets up queue object', function(done){
- var q = Convoy.createQueue('rollin');
- done();
- });
- it('Can override redis client', function(done){
+ it('can override redis client', function(done){
Convoy.redis.createClient = function(){
var client = redis.createClient();
client.select(config.redis.database);
@@ -30,12 +26,44 @@ describe('Setting up a queue', function(){
q.client.testProperty.should.equal('cheese');
done();
});
+
+ it('can close the queue gracefully when not processing', function(done){
+ var q = Convoy.createQueue('postOffice');
+ q.close(done);
+ });
+
+ it('can stop processing the queue', function(done){
+ var q = Convoy.createQueue('duckies');
+ var received = 0;
+ q.startProcessing(function(){
+ if(++received > 1)
+ throw new Error('We should only have received one job');
+
+ done();
+ });
+
+ // Since queue is in blocked state, it will only stop processing once
+ // it has received its next job. If you want to stop it receiving any
+ // further jobs, call q.close();
+ q.stopProcessing();
+
+ // Stops processing after the first job is queued
+ q.addJob(new Convoy.Job(1));
+ q.addJob(new Convoy.Job("Job IDs can be strings too"));
+ });
+
+ it('but does not lose the unprocessed job', function(done){
+ var q = Convoy.createQueue('duckies');
+ q.startProcessing(function(job, complete){
+ complete(null, done);
+ });
+ });
});
describe('Enqueing jobs', function(done){
var q, job;
before(function(done){
- q = Convoy.createQueue('q');
+ q = Convoy.createQueue('jamesBond');
job = new Convoy.Job(1);
q.addJob(job, done);
});
@@ -61,14 +89,15 @@ describe('Enqueing jobs', function(done){
describe('Processing jobs', function(){
var q, job, processed;
before(function(done){
- q = Convoy.createQueue('q');
+ q = Convoy.createQueue('the22ndLetter');
var returned = false;
var cb = function(j, p){
job = j, processed = p;
done();
- q.close();
};
- q.process(cb);
+
+ q.startProcessing(cb);
+ q.addJob(new Convoy.Job(1));
});
it('invokes callback with job', function(done){
@@ -142,7 +171,7 @@ describe('When a job gets jammed', function(){
worker = new Convoy.Worker(q, job);
worker.processing(done);
});
- }
+ };
// Simulate a b0rked worker
before(function(done){

0 comments on commit e612f41

Please sign in to comment.