Skip to content

Commit

Permalink
[api test] Begin refactor for single instance durability
Browse files Browse the repository at this point in the history
  • Loading branch information
indexzero committed Mar 26, 2011
1 parent aebe232 commit 93c144a
Show file tree
Hide file tree
Showing 11 changed files with 460 additions and 23 deletions.
1 change: 1 addition & 0 deletions lib/neuron.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ neuron.version = [0, 3, 0];
neuron.JobManager = require('neuron/job-manager').JobManager;
neuron.Job = require('neuron/job').Job;
neuron.Worker = require('neuron/worker').Worker;
neuron.JobCache = require('neuron/job-cache').JobCache;
neuron.stringify = require('neuron/job-serializer').stringify;
neuron.parse = require('neuron/job-serializer').parse;
neuron.storeWork = true;
Expand Down
189 changes: 189 additions & 0 deletions lib/neuron/job-cache.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* job-cache.js: Persists a specified job to Redis for single instance durability.
*
* (C) 2010 Charlie Robbins
* MIT LICENSE
*
*/

var util = require('util'),
events = require('events'),
async = require('async'),
redis = require('redis'),
neuron = require('neuron');

var JobCache = exports.JobCache = function (options) {
options = typeof options === 'object' ? options : {};

this.namespace = options.namespace || 'neuron';
this.host = options.host || 'localhost';
this.port = options.port || 6379;

this.connect();
};

JobCache.prototype.connect = function (host, port) {
host = host || this.host;
port = port || this.port;

this.host = host;
this.port = port;

this.redis = redis.createClient(port, host);
};

JobCache.prototype.close = function () {
this.redis.quit();
};

JobCache.prototype.load = function (callback) {
var self = this, jobs = {}, workers = {};

function getJob (name, next) {
self.redis.get(self.key('job', name), function (err, job) {
if (err) {
return next(err);
}

jobs[name] = neuron.parse(job);
self.redis.smembers(self.key('workers', name), function (err, ids) {
function getWorker (name) {
return function (id, next) {
self.redis.get(self.key('workers', name, id), function (err, worker) {
if (err) {
return next(err);
}

if (!workers[name]) {
workers[name] = [];
}

var result = neuron.parse(worker);
result.id = id;
workers[name].push(result);
next();
});
};
}

async.forEachSeries(ids, getWorker(name), function (err) {
return err ? next(err) : next();
});
});
})
}

this.redis.smembers(this.key('jobs'), function (err, names) {
async.forEach(names, getJob, function (err) {
if (callback) {
return err ? callback(err) : callback(null, jobs, workers);
}
});
});
};

JobCache.prototype.addJob = function (name, props, callback) {
var self = this;
this.redis.sadd(this.key('jobs'), name, function (err) {
if (err) {
return callback(err);
}

self.redis.set(self.key('job', name), neuron.stringify(props), function (err) {
if (callback) {
callback(err);
}
});
});
};

JobCache.prototype.getJob = function (name, callback) {
this.redis.get(this.key('job', name), function (err, job) {
if (callback) {
var result = job ? neuron.parse(job) : job;
return err ? callback(err) : callback(null, result);
}
});
};

JobCache.prototype.removeJob = function (name, callback) {
var self = this;
this.redis.srem(this.key('jobs'), name, function (err) {
if (err) {
return callback(err);
}

self.redis.del(self.key('job', name), function (err) {
self.removeWorkers(name, function (err) {
if (callback) {
callback(err);
}
});
});
});
};

JobCache.prototype.addWorker = function (name, workerId, args, callback) {
var self = this;
this.redis.sadd(this.key('workers', name), workerId, function (err) {
if (err) {
return callback(err);
}

self.redis.set(self.key('workers', name, workerId), neuron.stringify(args), function (err) {
if (callback) {
callback(err);
}
});
});
};

JobCache.prototype.getWorker = function (name, workerId, callback) {
this.redis.get(this.key('workers', name, workerId), function (err, worker) {
if (callback) {
var result = worker ? neuron.parse(worker) : worker;
return err ? callback(err) : callback(null, result);
}
});
};

JobCache.prototype.removeWorker = function (name, workerId, callback) {
var self = this;
this.redis.srem(this.key('workers', name), workerId, function (err) {
if (err) {
return callback(err);
}

self.redis.del(self.key('workers', name, workerId), function (err) {
if (callback) {
callback(err);
}
});
});
};

JobCache.prototype.removeWorkers = function (name, callback) {
var self = this;
this.redis.smembers(this.key('workers', name), function (err, ids) {
if (err) {
return callback(err);
}

function remove (id, next) {
self.removeWorker(name, id, next);
}

async.forEach(ids, remove, function (err) {
if (callback) {
return err ? callback(err) : callback(null);
}
});
});
};

JobCache.prototype.key = function () {
var args = Array.prototype.slice.call(arguments);

args.unshift(this.namespace);
return args.join(':');
};
87 changes: 81 additions & 6 deletions lib/neuron/job-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@ var JobManager = exports.JobManager = function (options) {

var self = this;
this.concurrency = options.concurrency || 50;
this.emitErrs = options.emitErrs || false;
this.jobs = {};

if (options.cache) {
this.cache = new neuron.JobCache(options.cache);
}

if (options.jobs) {
Object.keys(options.jobs).forEach(function (name) {
self.addJob(name, options.jobs[name]);
Expand All @@ -39,13 +44,22 @@ util.inherits(JobManager, events.EventEmitter);
// #### @props {Object} Properties to use for this job.
// Sets the job for this instance to manage.
//
JobManager.prototype.addJob = function (name, props) {
JobManager.prototype.addJob = function (name, props, cached) {
if (this.jobs[name]) throw new Error('Job with name `' + name + '` already exists.');
else if (!props) throw new Error('Cannot addJob with no attributes.');

var self = this;
props.concurrency = props.concurrency || this.concurrency;
this.jobs[name] = new neuron.Job(name, props);

// If a cache is used by this instance; add this job to it.
if (this.cache && !cached) {
this.cache.addJob(name, props, function (err) {
if (err && self.emitErrs) {
self.emit('error', err);
}
});
}

this.jobs[name].on('start', function (worker) {
self.emit('start', self.jobs[name], worker);
Expand All @@ -54,6 +68,15 @@ JobManager.prototype.addJob = function (name, props) {
// Re-emit the finish event for each Job managed by this instance.
this.jobs[name].on('finish', function (worker) {
self.emit('finish', self.jobs[name], worker);

// If a cache is used by this instance, remove this worker from it
if (self.cache) {
self.cache.removeWorker(name, worker.id, function (err) {
if (err && self.emitErrs) {
self.emit('error', err);
}
});
}
});

// Re-emit the empty event for each Job managed by this instance.
Expand All @@ -73,7 +96,26 @@ JobManager.prototype.enqueue = function (name) {
if (Object.keys(this.jobs).length === 0) throw new Error('Cannot call start() with no job to perform.');
else if (!this.jobs[name]) throw new Error('Cannot find job with name `' + name + '`.');

return this.jobs[name].enqueue(Array.prototype.slice.call(arguments, 1));
var self = this,
args = [this.jobs[name].getId()].concat(Array.prototype.slice.call(arguments, 1)),
worker = this.jobs[name].enqueue(args);

// If a cache is used by this instance, add this worker to it
if (this.cache) {
this.cache.addWorker(name, worker.id, args, function (err) {
if (err && self.emitErrs) {
self.emit('error', err);
}
});
}

return worker.id;
};

JobManager.prototype.removeJob = function (name) {
//
// TODO: Remove the job from here and the cache
//
};

//
Expand All @@ -83,11 +125,22 @@ JobManager.prototype.enqueue = function (name) {
// Attempts to remove the worker with the specified `workerId` from the job
// managed by this instance with the specified `name`.
//
JobManager.prototype.remove = function (name, workerId) {
if (Object.keys(this.jobs).length === 0) throw new Error('Cannot call remove() with no job to perform.');
JobManager.prototype.removeWorker = function (name, workerId) {
if (Object.keys(this.jobs).length === 0) throw new Error('Cannot call removeWorker() with no jobs to perform.');
else if (!this.jobs[name]) throw new Error('Cannot find job with name `' + name + '`.');

return this.jobs[name].remove(workerId);
var result = this.jobs[name].remove(workerId);

// If a cache is used by this instance, remove this worker from it
if (this.cache) {
this.cache.removeWorker(name, worker.id, function (err) {
if (err && self.emitErrs) {
self.emit('error', err);
}
});
}

return result;
};

//
Expand All @@ -99,6 +152,28 @@ JobManager.prototype.remove = function (name, workerId) {
//
JobManager.prototype.getWorker = function (name, workerId) {
if (!this.jobs[name]) throw new Error ('Cannot get worker for unknown job `' + name + '`');

return this.jobs[name].getWorker(workerId);
};

JobManager.prototype.load = function () {
var self = this;

this.cache.connect();
this.cache.load(function (err, jobs, workers) {
if (err && self.emitErrs) {
self.emit('error', err);
}

Object.keys(jobs).forEach(function (name) {
self.addJob(name, jobs[name], true);
});

Object.keys(workers).forEach(function (name) {
workers[name].forEach(function (worker) {
self.jobs[name].enqueue([worker.id].concat(worker));
});
});

self.emit('load');
});
};
29 changes: 17 additions & 12 deletions lib/neuron/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,12 @@ util.inherits(Job, events.EventEmitter);
// If the number of keys in `this.running` exceeds `this.concurrency` the job is appended
// to the `waiting` set and added to the `queue` managed by this instance.
//
Job.prototype.enqueue = function () {
//
// Create a unique id for this worker.
//
var self = this, workerId = neuron.randomString(32), worker;
while (this.running[workerId] || this.waiting[workerId]) {
workerId = neuron.randomString(32);
Job.prototype.enqueue = function (id) {
if (this.running[id] || this.waiting[id]) {
throw new Error('Worker with id `' + id + '` already exists');
}
var worker = new neuron.Worker(workerId, this, Array.prototype.slice.call(arguments));

var self = this, worker = new neuron.Worker(id, this, Array.prototype.slice.call(arguments, 1));

worker.on('start', function () {
self.emit('start', self, worker);
Expand All @@ -64,17 +60,17 @@ Job.prototype.enqueue = function () {
});

if (Object.keys(this.running).length >= this.concurrency) {
this.waiting[workerId] = worker;
this.waiting[id] = worker;
this.queue.push(workerId);
}
else {
this.running[workerId] = worker;
this.running[id] = worker;
process.nextTick(function () {
worker.run();
});
}

return workerId;
return worker;
};

Job.prototype.remove = function (workerId) {
Expand Down Expand Up @@ -168,4 +164,13 @@ Job.prototype._replenish = function () {
}

return started;
};

Job.prototype.getId = function () {
var workerId = neuron.randomString(32);
while (this.running[workerId] || this.waiting[workerId]) {
workerId = neuron.randomString(32);
}

return workerId;
};
Loading

0 comments on commit 93c144a

Please sign in to comment.