Skip to content

Commit

Permalink
rewrite of pub/sub
Browse files Browse the repository at this point in the history
  • Loading branch information
tj committed Jul 18, 2011
1 parent ff9fa44 commit 20a8d0b
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 48 deletions.
4 changes: 2 additions & 2 deletions examples/callback.js → examples/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ function create() {
}).on('failed', function(){
console.log("Job failed");
}).save();
setTimeout(create, Math.random() * 300 | 0);
setTimeout(create, Math.random() * 2000 | 0);
}

create();

// process video conversion jobs, 3 at a time.

jobs.process('video conversion', 20, function(job, done){
jobs.process('video conversion', 3, function(job, done){
var frames = job.data.frames;

function next(i) {
Expand Down
39 changes: 0 additions & 39 deletions lib/queue/callback.js

This file was deleted.

86 changes: 86 additions & 0 deletions lib/queue/events.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@

/*!
* kue - events
* Copyright (c) 2011 LearnBoost <tj@learnboost.com>
* MIT Licensed
*/

/**
* Module dependencies.
*/

var pool = require('./pool');

/**
* Job map.
*/

exports.jobs = {};

/**
* Pub/sub key.
*/

exports.key = 'q:events';

/**
* Add `job` to the jobs map, used
* to grab the in-process object
* so we can emit relative events.
*
* @param {Job} job
* @api private
*/

exports.add = function(job){
if (job.id) exports.jobs[job.id] = job;
if (!exports.subscribed) {
exports.subscribe();
exports.subscribed = true;
}
};

/**
* Subscribe to "q:events".
*
* @api private
*/

exports.subscribe = function(){
var client = pool.pubSubClient();
client.subscribe(exports.key);
client.on('message', this.onMessage);
};

/**
* Message handler.
*
* @api private
*/

exports.onMessage = function(channel, message){
var message = JSON.parse(message)
, job = exports.jobs[message.id];

if (job) {
job.emit(message.status);
delete exports.jobs[job.id];
}
};

/**
* Publish `status` change for job `id`.
*
* @param {Number} id
* @param {String} status
* @api private
*/

exports.changeStatus = function(id, status) {
var client = pool.alloc()
, message = JSON.stringify({
id: id
, status: status
});
client.publish(exports.key, message);
}
8 changes: 4 additions & 4 deletions lib/queue/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var redis = require('redis')
, pool = require('./pool')
, noop = function(){}
, EventEmitter = require('events').EventEmitter
, jobCallback = require('./callback');
, events = require('./events');

/**
* Expose `Job`.
Expand Down Expand Up @@ -470,11 +470,11 @@ Job.prototype.save = function(fn){
self.set('type', self.type);
self.set('created_at', Date.now());
self.update(fn);


// add the job for event mapping
if ((self.listeners('complete').length + self.listeners('failed').length) > 0) {
jobCallback.addJob(self);
events.add(self);
}

});
};

Expand Down
6 changes: 3 additions & 3 deletions lib/queue/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
var EventEmitter = require('events').EventEmitter
, redis = require('redis')
, Job = require('./job')
, jobCallback = require('./callback');;
, events = require('./events');

/**
* Expose `Worker`.
Expand Down Expand Up @@ -89,7 +89,7 @@ Worker.prototype.error = function(err, job){

Worker.prototype.failed = function(job, err, fn){
var self = this;
jobCallback.changeStatus(job.id, 'failed');
events.changeStatus(job.id, 'failed');
job.failed().error(err);
self.error(err, job);
job.attempt(function(error, remaining, attempts, max){
Expand Down Expand Up @@ -122,7 +122,7 @@ Worker.prototype.process = function(job, fn){
job.complete();
job.set('duration', job.duration = new Date - start);
self.emit('job complete', job);
jobCallback.changeStatus(job.id, 'complete');
events.changeStatus(job.id, 'complete');
self.start(fn);
});
return this;
Expand Down

0 comments on commit 20a8d0b

Please sign in to comment.