Skip to content

Commit

Permalink
isolate domain-based workers into new 'domain-based-workers' branch
Browse files Browse the repository at this point in the history
  • Loading branch information
behrad committed May 24, 2014
1 parent 345af7b commit 657fd45
Showing 1 changed file with 33 additions and 44 deletions.
77 changes: 33 additions & 44 deletions lib/queue/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,6 @@ function Worker(queue, type) {
this.client = Worker.client || (Worker.client = redis.createClient());
this.running = true;
this.job = null;
/*this.domain = domain.create( this.type );
this.domain.on('error', function(err){
console.log( "Worker Uncaught Error: ", err.toString(), err.stack );
process.nextTick( function() {
// done && done(err);
});
});*/
}

/**
Expand Down Expand Up @@ -152,45 +145,41 @@ Worker.prototype.process = function (job, fn) {
var self = this
, start = new Date();
job.active( function(){
// this.domain.run(function(){
// process.nextTick( function(){
fn(
job,
function (err) {
if (err) {
return self.failed(job, err, fn);
}
job.set('duration', job.duration = new Date - start);
job.complete( function(){
job.attempt( function(){
self.emit('job complete', job);
events.emit(job.id, 'complete');
});
}.bind(this));
self.job = null;
self.start(fn);
},{
/**
* @author behrad
* @pause: let the processor to tell worker not to continue processing new jobs
*/
pause: function( fn, timeout ){
timeout = timeout || 5000;
self.queue.shutdown( fn, Number(timeout), self.type);
},
/**
* @author behrad
* @pause: let the processor to trigger restart for they job processing
*/
resume: function () {
if (self.resume()) {
self.start(fn);
}
fn(
job,
function (err) {
if (err) {
return self.failed(job, err, fn);
}
job.set('duration', job.duration = new Date - start);
job.complete( function(){
job.attempt( function(){
self.emit('job complete', job);
events.emit(job.id, 'complete');
});
}.bind(this));
self.job = null;
self.start(fn);
},{
/**
* @author behrad
* @pause: let the processor to tell worker not to continue processing new jobs
*/
pause: function( fn, timeout ){
timeout = timeout || 5000;
self.queue.shutdown( fn, Number(timeout), self.type);
},
/**
* @author behrad
* @pause: let the processor to trigger restart for they job processing
*/
resume: function () {
if (self.resume()) {
self.start(fn);
}
}
);
// }.bind( this ));
// }.bind( this ));
}
);
}.bind(this));
return this;
};
Expand Down

0 comments on commit 657fd45

Please sign in to comment.