Permalink
Browse files

fix setting global redis/search options, callback based state changes

  • Loading branch information...
1 parent 0ad405a commit cecd631a434f128fca1024afc93fba92cab3943e @behrad behrad committed Jan 27, 2014
Showing with 51 additions and 50 deletions.
  1. +9 −3 lib/kue.js
  2. +12 −12 lib/queue/job.js
  3. +26 −23 lib/queue/worker.js
  4. +2 −10 lib/redis.js
  5. +2 −2 package.json
View
@@ -24,7 +24,7 @@ exports = module.exports = Queue;
* Library version.
*/
-exports.version = '0.7.0';
+exports.version = '0.7.4';
/**
* Expose `Job`.
@@ -83,10 +83,16 @@ exports.workers = [];
function Queue( options ) {
options = options || {};
- this.client = redis.createClient( options.redis );
+ if( options.redis ){
+ redis.createClient = function() {
+ // redis.debug_mode = true;
+ return require('redis').createClient( options.redis );
+ };
+ }
+ this.client = redis.createClient();
this.promoter = null;
this.workers = exports.workers;
- this.disableSearch = options.disableSearch;
+ Job.disableSearch = options.disableSearch;
}
/**
View
@@ -22,6 +22,8 @@ var EventEmitter = require('events').EventEmitter
exports = module.exports = Job;
+exports.disableSearch = false;
+
/**
* Search instance.
*/
@@ -430,12 +432,11 @@ Job.prototype.remove = function (fn) {
client.del('q:job:' + this.id + ':log');
client.del('q:job:' + this.id);
// multi.exec(function (err, replies) {
-// console.log( "removeJob", this.id, err, replies );
// events.remove(this);
fn && fn(err);
- // TODO bad smell: change singelton access!
- if( !require('../kue').singleton.disableSearch ){
+ if( !exports.disableSearch ){
getSearch().remove(this.id, function(){
+ console.log( "remove index...");
}.bind( this ));
}
// }.bind(this));
@@ -487,9 +488,10 @@ Job.prototype.state = function (state, fn) {
// multi.exec(function (err, replies) {
// console.log( "setState(%d) End ", this.id, state, this._state/*, replies*/ );
this.set('updated_at', Date.now());
- this.set('state', state, fn);
- // increase available jobs, used by Worker#getJob()
- if ('inactive' == state) client.lpush('q:' + this.type + ':jobs', 1);
+ this.set('state', state, function(){
+ // increase available jobs, used by Worker#getJob()
+ ('inactive' == state) ? client.lpush('q:' + this.type + ':jobs', 1, fn) : fn();
+ }.bind(this));
// }.bind(this));
}.bind(this));
return this;
@@ -525,9 +527,7 @@ Job.prototype.error = function (err) {
*/
Job.prototype.complete = function (clbk) {
- return this.set('progress', 100).state('complete', function () {
- clbk && clbk();
- }.bind(this));
+ return this.set('progress', 100).state('complete', clbk);
};
/**
@@ -629,11 +629,11 @@ Job.prototype.update = function (fn) {
this.state(this._state, fn);
}.bind(this));
- // TODO bad smell: change singelton access!
-// if( require('../kue').singleton.disableSearch ) {
+ if( !exports.disableSearch ) {
getSearch().index(json, this.id, function(){
+ console.log( "add index...");
}.bind(this));
-// }
+ }
};
/**
View
@@ -114,27 +114,28 @@ Worker.prototype.error = function (err, job, asWorkerError) {
Worker.prototype.failed = function (job, err, fn) {
var self = this;
self.error(err, job, false);
- job.error(err).failed();
- job.attempt(function (error, remaining, attempts, max) {
- if (error) return self.error(error, job);
- /*remaining
- ? job.inactive()
- : job.failed();*/
- if (remaining) {
- self.emit('job failed attempt', job);
- events.emit(job.id, 'failed attempt', attempts);
- if (job.delay()) { //TODO WHEN should we honor the original delay in retries?
- job.inactive(); //job.delayed();
+ job.error(err).failed( function(){
+ job.attempt(function (error, remaining, attempts, max) {
+ if (error) return self.error(error, job);
+ /*remaining
+ ? job.inactive()
+ : job.failed();*/
+ if (remaining) {
+ self.emit('job failed attempt', job);
+ events.emit(job.id, 'failed attempt', attempts);
+ if (job.delay()) { //TODO WHEN should we honor the original delay in retries?
+ job.inactive(); //job.delayed();
+ } else {
+ job.inactive();
+ }
} else {
- job.inactive();
+ self.emit('job failed', job);
+ events.emit(job.id, 'failed');
+ // job.failed();
}
- } else {
- self.emit('job failed', job);
- events.emit(job.id, 'failed');
-// job.failed();
- }
- self.start(fn);
- });
+ self.start(fn);
+ }.bind(this));
+ }.bind(this));
};
/**
@@ -153,7 +154,7 @@ Worker.prototype.process = function (job, fn) {
var self = this
, start = new Date();
this.job = job;
- job.active();
+ job.active( function(){
// this.domain.run(function(){
// process.nextTick( function(){
@@ -163,10 +164,11 @@ Worker.prototype.process = function (job, fn) {
if (err) {
return self.failed(job, err, fn);
}
- job.complete();
job.set('duration', job.duration = new Date - start);
- self.emit('job complete', job);
- events.emit(job.id, 'complete');
+ job.complete( function(){
+ self.emit('job complete', job);
+ events.emit(job.id, 'complete');
+ }.bind(this));
self.job = null;
self.start(fn);
},{
@@ -191,6 +193,7 @@ Worker.prototype.process = function (job, fn) {
);
// }.bind( this ));
// }.bind( this ));
+ }.bind(this));
return this;
};
View
@@ -18,17 +18,9 @@ var redis = require('redis');
* @api private
*/
-exports.createClient = function ( options ) {
+exports.createClient = function () {
var client;
- if( options ) {
- client = redis.createClient(
- options.port || 6379,
- options.host,
- options.options || {}
- );
- } else {
- client = redis.createClient();
- }
+ client = redis.createClient();
return client;
};
View
@@ -15,12 +15,12 @@
"url": "https://github.com/LearnBoost/kue.git"
},
"dependencies": {
- "redis": "0.7.2",
+ "redis": "~0.10.0",
"express": "~3.1.1",
"jade": "0.26.3",
"stylus": "0.27.2",
"nib": "0.5.0",
- "reds": "0.1.4"
+ "reds": "~0.2.4"
},
"main": "index",
"devDependencies": {

0 comments on commit cecd631

Please sign in to comment.