Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deletes old jobs, addresses search term race conditions, and adds "indexing" #218

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.DS_Store
node_modules
*.sock
testing
lib/http/public/stylesheets/main.css
*.rdb
test/incomplete
18 changes: 18 additions & 0 deletions History.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,22 @@

2013-08-10
==================
* Allows isOutstanding check for jobs
* Exposes ability to get job by id
* Stores outstanding jobs by 'key'
* Removes default text indexing
* Added "outstanding" jobs search
* Added multi-state search

2013-07-13
==================
* Added expiration options

2013-07-12
==================
* Added indexing options


0.6.2 / 2013-04-03
==================

Expand Down
179 changes: 172 additions & 7 deletions lib/kue.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ var EventEmitter = require('events').EventEmitter
, Worker = require('./queue/worker')
, events = require('./queue/events')
, Job = require('./queue/job')
, redis = require('./redis');
, redis = require('./redis')
, reds = require('reds')
, async = require('async');


/**
* Expose `Queue`.
Expand Down Expand Up @@ -55,6 +58,19 @@ Object.defineProperty(exports, 'app', {

exports.redis = redis;


/**
* Search instance.
*/

var search;
function getSearch() {
if (search) return search;
reds.createClient = require('./redis').createClient;
return search = reds.createSearch('q:search');
};


/**
* Create a new `Queue`.
*
Expand Down Expand Up @@ -92,8 +108,8 @@ Queue.prototype.__proto__ = EventEmitter.prototype;
*/

Queue.prototype.create =
Queue.prototype.createJob = function(type, data){
return new Job(type, data);
Queue.prototype.createJob = function(type, data, options){
return new Job(type, data, options);
};

/**
Expand Down Expand Up @@ -196,6 +212,39 @@ Queue.prototype.process = function(type, n, fn){
}
};


/**
* Initializes garbage collection for jobs of `type` after `n` milliseconds.
* Currently, there is a 1000 millisecond minimum (this should not be primary form of removal)
*
* @param {String} type
* @param {Number} n
* @api public
*/

Queue.prototype.setExpiration = function(type, n){
var self = this,
client = this.client,
n = Math.max(n, 1000);


setInterval(function() {

var cutoff = Math.floor((new Date().getTime() - n) / 1000);

self.client.zrangebyscore('q:expiring:' + type, 0, cutoff, function(err, results) {

if (!err && results.length)
for (var i = 0; i < results.length; ++i)
Job.remove(results[i]);

});
}, n);

return this;
};


/**
* Get the job types present and callback `fn(err, types)`.
*
Expand All @@ -209,6 +258,84 @@ Queue.prototype.types = function(fn){
return this;
};


/**
* Enables job retrieval by id
*
* @param {Number} id
* @param {Function} fn
* @api public
*/
Queue.prototype.get =
Queue.prototype.getById = function(id, fn){
Job.get(id, fn);
};


/**
* Determines whether a job is outstanding. If input is number,
* taken to be a Job ID. If a string, taken to be a type + key. If an object
* the key should be either id or key
*
* @param {Number} args or {Object}
* @param {Function} fn
* @api public
*/

Queue.prototype.isOutstanding = function(args, fn){
var isKey = false,
val = null;

if ("object" == typeof args) {
if ("undefined" != typeof args.id) {
val = args.id;
} else if ("undefined" != typeof args.key && "undefined" != typeof args.type) {
isKey = true;
} else {
return fn(new Error("No key or id specified for outstanding check."))
}
} else if ("number" == typeof args) {
val = args;
} else {
return fn(new Error("No key or id specified for outstanding check."))
}

if (isKey)
this.client.sismember("q:outstanding:" + args.type, args.key, function(err, isMember) {
if (err)
return fn(err, null);
else
return fn(null, 1 == isMember);
});
else {
this.get(val, function(err, job) {
if (err)
return fn(err, false);
else {
var state = job.state();
return fn(err, ('active' == state || 'inactive' == state));
}
});
}
};


/**
* Find jobs by a series of identifiers and callback `fn(err)`.
*
* @param {String} lookup
* @param {Function} fn
* @api public
*/

Queue.prototype.find = function(query, fn){
getSearch().query(query).end(function(err, ids){
fn(err, ids);
}, 'and');
};



/**
* Return job ids with the given `state`, and callback `fn(err, ids)`.
*
Expand All @@ -218,11 +345,23 @@ Queue.prototype.types = function(fn){
* @api public
*/

Queue.prototype.state = function(state, fn){
this.client.zrange('q:jobs:' + state, 0, -1, fn);
Queue.prototype.state =
Queue.prototype.states = function(states, fn){
if ('string' == typeof states)
this.client.zrange('q:jobs:' + states, 0, -1, fn);
else {
var self = this;
function getByState(state, cb) { self.client.zrange('q:jobs:' + state, 0, -1, cb); }
async.map(states, getByState, function(err, ids) {
self = null;
fn(err, Array.prototype.concat.apply([], ids));
});
}
return this;
};



/**
* Get queue work time in milliseconds and invoke `fn(err, ms)`.
*
Expand All @@ -248,8 +387,18 @@ Queue.prototype.workTime = function(fn){
* @api public
*/

Queue.prototype.card = function(state, fn){
this.client.zcard('q:jobs:' + state, fn);
Queue.prototype.card =
Queue.prototype.cards = function(states, fn){
if ('string' == typeof states)
this.client.zcard('q:jobs:' + states, fn);
else {
var self = this;
function countByState(state, cb) { self.client.zcard('q:jobs:' + state, cb); }
async.map(states, countByState, function(err, counts) {
self = null;
fn(err, counts.reduce(function(memo, num) { return memo + num; }, 0));
});
}
return this;
};

Expand Down Expand Up @@ -285,6 +434,14 @@ Queue.prototype.active = function(fn){
return this.state('active', fn);
};

/**
* Oustanding jobs (inactive or active).
*/

Queue.prototype.outstanding = function(fn){
return this.state(['active','inactive'], fn);
};

/**
* Completed jobs count.
*/
Expand Down Expand Up @@ -317,6 +474,14 @@ Queue.prototype.activeCount = function(fn){
return this.card('active', fn);
};

/**
* Outstanding jobs (active or inactive).
*/

Queue.prototype.outstandingCount = function(fn){
return this.card(['active','inactive'], fn);
};

/**
* Delayed jobs.
*/
Expand Down
30 changes: 19 additions & 11 deletions lib/queue/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,21 @@ exports.onMessage = function(channel, msg){
var msg = JSON.parse(msg);

// map to Job when in-process
var job = exports.jobs[msg.id];
if (job) {
job.emit.apply(job, msg.args);
// TODO: abstract this out
if ('progress' != msg.event) delete exports.jobs[job.id];
if ('indexed' == msg.events) {
this.emit(msg.id, 'indexed');
} else {
var job = exports.jobs[msg.id];
if (job) {
job.emit.apply(job, msg.args);
// TODO: abstract this out
if ('progress' != msg.event) delete exports.jobs[job.id];
}

// emit args on Queues
msg.args[0] = 'job ' + msg.args[0];
msg.args.push(msg.id);
exports.queue.emit.apply(exports.queue, msg.args);
}

// emit args on Queues
msg.args[0] = 'job ' + msg.args[0];
msg.args.push(msg.id);
exports.queue.emit.apply(exports.queue, msg.args);
};

/**
Expand All @@ -92,5 +96,9 @@ exports.emit = function(id, event) {
, event: event
, args: [].slice.call(arguments, 1)
});
client.publish(exports.key, msg);

if ('indexed' == event)
client.publish('q:indexing', msg);
else
client.publish(exports.key, msg);
};