Permalink
Browse files

[api test] Update jobber to be more than just a manager, but a queue …

…with basic concurrency management
  • Loading branch information...
1 parent ba72c61 commit 5427cfc662e59c63bbfb4ac9436cbd57a59749be @indexzero committed Feb 11, 2011
View
58 README.md
@@ -1,6 +1,6 @@
# Jobber
-The simplest possible event driven job manager in node.js
+The simplest possible event driven job manager / FIFO queue in node.js
## Installation
@@ -15,40 +15,58 @@ The simplest possible event driven job manager in node.js
</pre>
## Usage
-Jobber is not a "job queue" (not yet anyway). It is simply a way to manage jobs as they are created and completed. Heuristics for parallelization, ordering, and pooling are left to the programmer. These features may be added in the future, but for now they are not included.
+Jobber is not a simple job queue with support for a dynamic level of concurrency. It a way to manage jobs as they are created and completed in an async, event-driven manner. Heuristics for parallelization, ordering, and pooling are simple right now and jobs are processed in a FIFO order.
+
+More features may be added in the future, so keep me posted on how you use it.
### Creating Jobs
-Creating jobs in jobber is easy. Jobber doesn't assume anything about the internal structure of the properties for each of your jobs. Here's a quick sample of creating a job:
+Creating jobs in jobber is easy. Jobber doesn't assume anything about the internal structure of the properties for each of your jobs except that they have a function called `work()`. Each JobManager is designed to process one instance of a Job, creating many workers which may be added by calling the `start()` method.
+
+Here's a quick sample of creating a manager and adding a job.
<pre>
- var jobber = require('jobber'),
+ var util = require('util'),
+ jobber = require('jobber'),
manager = new jobber.JobManager();
- var job = manager.addJob({ results: [] });
-</pre>
+ //
+ // Create the manager and set the job.
+ //
+ var manager = new jobber.JobManager({ concurrency: 100 });
+ manager.setJob(new jobber.Job('listDir', {
+ dirname: __dirname,
+ work: function (dirname) {
+ var self = this;
+ exec('ls -la ' + dirname || this.dirname, function (error, stdout, stderr) {
+ if (error) self.error = error;
+ else self.stdout = stdout;
-### Working with Jobs
-Once we have created a job, we can flexibly set properties on it and pass it around to any data transform or operation.
-<pre>
- var remoteResults = someRemote.operation();
- remoteResults.forEach(function (result) {
- //
- // Perform some async operation on the result
- //
- job.results.push(result);
- });
+ //
+ // Finish the job, this will notify the manager.
+ //
+ self.finished = true;
+ });
+ }
+ }));
</pre>
### Completing Jobs
-A job raises the finished event once the finished property is set to true:
+An instance of `jobber.JobManager` raises the `finish` event every time a worker has set `finished = true`:
<pre>
- jobManager.on('finished', function (job) {
+ //
+ // Start a worker and listen for finish
+ //
+ manager.on('finish', function (worker) {
//
- // Do something with the now finished job.
+ // Log the result from the worker (the directory listing for '/')
//
+ console.dir(worker.stdout);
});
- job.finished = true;
+ //
+ // All arguments passed to the start() function are consumed by the worker
+ //
+ manager.start('/');
</pre>
#### Author: [Charlie Robbins](http://www.charlierobbins.com)
View
91 lib/jobber.js
@@ -6,92 +6,11 @@
*
*/
-var sys = require('sys'),
- events = require('events');
+require.paths.unshift(__dirname);
-//
-// function randomString (bits)
-// randomString returns a pseude-random ASCII string which contains at least the specified number of bits of entropy
-// the return value is a string of length ⌈bits/6⌉ of characters from the base64 alphabet
-//
-function randomString (bits) {
- var chars, rand, i, ret;
- chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/';
- ret = '';
-
- //
- // in v8, Math.random() yields 32 pseudo-random bits (in spidermonkey it gives 53)
- //
- while (bits > 0) {
- rand = Math.floor(Math.random()*0x100000000) // 32-bit integer
- // base 64 means 6 bits per character, so we use the top 30 bits from rand to give 30/6=5 characters.
- for (i=26; i>0 && bits>0; i-=6, bits-=6) {
- ret+=chars[0x3F & rand >>> i];
- }
- }
- return ret;
-};
+var jobber = exports;
-var Job = exports.Job = function (jobId, props) {
- this.id = jobId;
- this._finished = false;
-
- if (props) {
- var self = this;
- Object.keys(props).forEach(function (property) {
- self[property] = props[property];
- });
- }
-};
+jobber.JobManager = require('jobber/job-manager').JobManager;
+jobber.Job = require('jobber/job').Job;
+jobber.Worker = require('jobber/worker').Worker;
-sys.inherits(Job, events.EventEmitter);
-
-Job.prototype.__defineGetter__('finished', function (value) {
- return this._finished;
-});
-
-Job.prototype.__defineSetter__('finished', function (value) {
- this._finished = value;
- if (value) {
- this.emit('finish');
- }
-});
-
-var JobManager = exports.JobManager = function (options) {
- options = options || {};
- this.jobs = options.jobs || {};
-};
-
-sys.inherits(JobManager, events.EventEmitter);
-
-//
-// funtion JobManager.prototype.addJob ()
-// Adds a job to jobber
-//
-JobManager.prototype.addJob = function (props) {
- var self = this, jobId = randomString(32);
- while (Object.keys(this.jobs).indexOf(jobId) !== -1) {
- jobId = randomString(32);
- }
-
- var job = new Job(jobId, props);
- job.once('finish', function () {
- self.emit('finish', job);
- });
-
- this.jobs[jobId] = job;
-
- return job;
-};
-
-//
-// function JobManager.prototype.getJob (jobId)
-// Gets a job with the specified id
-//
-JobManager.prototype.getJob = function (jobId) {
- if (this.jobs[jobId]) {
- return this.jobs[jobId];
- };
-
- return null;
-};
View
180 lib/jobber/job-manager.js
@@ -0,0 +1,180 @@
+/*
+ * JobManager.js: Creates and manages jobs, workers and job results.
+ *
+ * (C) 2010 Charlie Robbins
+ * MIT LICENSE
+ *
+ */
+
+
+var util = require('util'),
+ events = require('events'),
+ jobber = require('jobber');
+
+//
+// function randomString (bits)
+// randomString returns a pseude-random ASCII string which contains at least the specified number of bits of entropy
+// the return value is a string of length ⌈bits/6⌉ of characters from the base64 alphabet
+//
+function randomString (bits) {
+ var chars, rand, i, ret;
+ chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_';
+ ret = '';
+
+ //
+ // in v8, Math.random() yields 32 pseudo-random bits (in spidermonkey it gives 53)
+ //
+ while (bits > 0) {
+ rand = Math.floor(Math.random()*0x100000000) // 32-bit integer
+ // base 64 means 6 bits per character, so we use the top 30 bits from rand to give 30/6=5 characters.
+ for (i=26; i>0 && bits>0; i-=6, bits-=6) {
+ ret+=chars[0x3F & rand >>> i];
+ }
+ }
+ return ret;
+};
+
+var JobManager = exports.JobManager = function (options) {
+ options = options || {};
+
+ if (options.job && !(options.job instanceof jobber.Job)) {
+ throw new Error('job must be an instance of jobber.Job');
+ }
+
+ this.concurrency = options.concurrency || 50;
+ this.job = options.job || {};
+ this.running = {};
+ this.waiting = {};
+ this.queue = [];
+};
+
+util.inherits(JobManager, events.EventEmitter);
+
+//
+// funtion JobManager.prototype.setJob ()
+// Sets the job for this instance to manage.
+//
+JobManager.prototype.setJob = function (job) {
+ if (this.queue.length > 0) throw new Error('Cannot setJob() with unfinished jobs in queue.');
+ else if (!(job instanceof jobber.Job)) throw new Error('job must be an instance of jobber.Job');
+
+ this.job = job;
+};
+
+//
+// ### function start (/* variable arguments */)
+// @arguments {variable} The arguments to pass to the running job.
+// Creates a new instance of the Job managed by this instance
+// by creating a new worker to run which takes `@arguments`. If the number of keys in
+// `this.running` exceeds `this.concurrency` the job is appended
+// to the `waiting` set and added to the `queue` managed by this instance.
+//
+JobManager.prototype.start = function () {
+ if (!(this.job instanceof jobber.Job)) throw new Error('Cannot runNext() with no job to perform.');
+
+ //
+ // Create a unique id for this worker.
+ //
+ workerId = randomString(32);
+ while (this.running[workerId] || this.waiting[workerId]) {
+ workerId = randomString(32);
+ }
+
+ var self = this,
+ worker = new jobber.Worker(workerId, this.job, Array.prototype.slice.call(arguments));
+
+ worker.once('finish', function () {
+ self._workComplete(worker);
+ });
+
+ if (Object.keys(this.running).length >= this.concurrency) {
+ this.waiting[workerId] = worker;
+ this.queue.push(workerId);
+ }
+ else {
+ this.running[workerId] = worker;
+ process.nextTick(function () {
+ worker.run();
+ });
+ }
+
+ return workerId;
+};
+
+//
+// function JobManager.prototype.getWorker (workerId)
+// Gets a worker with the specified id
+//
+JobManager.prototype.getWorker = function (workerId) {
+ if (this.running[workerId]) {
+ return this.running[workerId];
+ }
+ else if (this.waiting[workerId]) {
+ return this.waiting[workerId];
+ }
+
+ return null;
+};
+
+//
+// ### function _workComplete (worker)
+// Updates bookkeeping associated with this instance
+// knowing that the given worker is now complete.
+//
+JobManager.prototype._workComplete = function (worker) {
+ var self = this, nextWorker, nextId;
+
+ // Wait a moment before indicating to the user that we are done
+ process.nextTick(function () {
+ self.emit('finish', worker);
+
+ // If the queue is now empty, notify the user
+ if (self.queue.length === 0) {
+ self.emit('empty');
+ }
+ });
+
+ delete this.running[worker.id];
+ this._replenish();
+};
+
+//
+// ### function _replenish ()
+// Replenishes the running worker by dequeuing waiting workers from `this.queue`.
+//
+JobManager.prototype._replenish = function () {
+ var self = this, running = Object.keys(this.running).length,
+ workerId, started = [];
+
+ if (this.queue.length === 0) return false;
+ else if (running > this.concurrency) return false;
+
+ while (running < this.concurrency && (workerId = this.queue.shift())) {
+ //
+ // Close over the workerId and the worker annoymously so we can
+ // user `process.nextTick()` effectively without leakage.
+ //
+ (function (id, w) {
+ started.push(id);
+ //
+ // Move the worker from the set of waiting workers to the set
+ // of running workers
+ //
+ delete self.waiting[id];
+ self.running[id] = w;
+
+ //
+ // Increment the length of the running workers manually
+ // so we don't have to call `Object.keys(this.running)` again
+ //
+ running += 1;
+
+ // Start the worker on the next tick.
+ process.nextTick(function () {
+ w.run();
+ });
+ })(workerId, this.waiting[workerId]);
+ }
+
+ return started;
+};
View
18 lib/jobber/job.js
@@ -0,0 +1,18 @@
+/*
+ * job.js: Simple data structure for tracking a predefined task (i.e. work() and default params).
+ *
+ * (C) 2010 Charlie Robbins
+ * MIT LICENSE
+ *
+ */
+
+var Job = exports.Job = function (jobName, props) {
+ if (!props.work) throw new Error("Worker function 'work()' is required.");
+ else if (typeof props.finished !== 'undefined') throw new Error('finished is a reserved property');
+
+ var self = this;
+ this.jobName = jobName;
+ Object.keys(props).forEach(function (property) {
+ self[property] = props[property];
+ });
+};
View
42 lib/jobber/worker.js
@@ -0,0 +1,42 @@
+/*
+ * worker.js: Runs individual instances of jobs being managed inside of jobber.
+ *
+ * (C) 2010 Charlie Robbins
+ * MIT LICENSE
+ *
+ */
+
+var util = require('util'),
+ events = require('events'),
+ jobber = require('jobber');
+
+var Worker = exports.Worker = function (workerId, job, args) {
+ if (!workerId) throw new Error('workerId is required.');
+ else if (!(job instanceof jobber.Job)) throw new Error('job must be an instanceof jobber.Job');
+
+ this.id = workerId;
+ this.job = job;
+ this.args = args;
+
+ this._finished = false;
+ this.running = false;
+};
+
+util.inherits(Worker, events.EventEmitter);
+
+Worker.prototype.run = function () {
+ this.running = true;
+ this.job.work.apply(this, this.args);
+};
+
+Worker.prototype.__defineGetter__('finished', function (value) {
+ return this._finished;
+});
+
+Worker.prototype.__defineSetter__('finished', function (value) {
+ this._finished = value;
+ if (value === true) {
+ this.running = false;
+ this.emit('finish');
+ }
+});
View
2 package.json
@@ -13,5 +13,5 @@
},
"main": "./lib/jobber",
"scripts": { "test": "vows test/*-test.js --spec" },
- "engines": { "node": ">= 0.3.0" }
+ "engines": { "node": ">= 0.4.0" }
}
View
24 test/helpers.js
@@ -0,0 +1,24 @@
+/*
+ * helpers.js: Helpers for the jobber tests
+ *
+ * (C) 2010 Charlie Robbins
+ *
+ */
+
+var exec = require('child_process').exec;
+
+var helpers = exports;
+
+helpers.listDir = function (timeout) {
+ return function (dirname) {
+ var self = this;
+ setTimeout(function () {
+ exec('ls -la ' + dirname || this.dirname, function (error, stdout, stderr) {
+ if (error) self.error = error;
+ else self.stdout = stdout;
+
+ self.finished = true;
+ });
+ }, timeout);
+ }
+};
View
67 test/job-manager-concurrency-test.js
@@ -0,0 +1,67 @@
+/*
+ * jobber-test.js: Tests unit tests for jobber module
+ *
+ * (C) 2010 Charlie Robbins
+ *
+ */
+
+require.paths.unshift(require('path').join(__dirname, '..', 'lib'));
+
+var sys = require('sys'),
+ fs = require('fs'),
+ path = require('path'),
+ eyes = require('eyes'),
+ vows = require('vows'),
+ assert = require('assert'),
+ jobber = require('jobber'),
+ helpers = require('./helpers');
+
+var workerIds = [];
+
+function createConcurrentBatch (message, nestedTest, timeout) {
+ var batch = {
+ "When using an instance of the JobManager": {
+ topic: function () {
+ var manager = new jobber.JobManager({ concurrency: 10 });
+ manager.setJob(new jobber.Job('listDir', {
+ dirname: __dirname,
+ work: helpers.listDir(timeout || 3000)
+ }));
+
+ return manager;
+ }
+ }
+ };
+
+ var test, header = "when passed more jobs than the concurrency level allows";
+ test = {
+ topic: function (manager) {
+ var that = this;
+ for (var i = 0; i < 25; i++) {
+ workerIds.push(manager.start(path.join(__dirname, '..')));
+ }
+ return manager;
+ }
+ };
+
+ test[message] = nestedTest;
+
+ batch[Object.keys(batch)[0]][header] = test;
+ return batch;
+}
+
+vows.describe('jobber/job-manager/simple').addBatch(
+ createConcurrentBatch("should have the correct number running and waiting", function (manager) {
+ assert.equal(manager.queue.length, 15);
+ assert.equal(Object.keys(manager.running).length, 10);
+ })
+).addBatch(
+ createConcurrentBatch("and all of those jobs are complete", {
+ topic: function (manager) {
+ manager.on('empty', this.callback.bind(null, null, manager));
+ },
+ "should eventually fire the 'empty' event": function (manager) {
+ assert.equal(manager.queue.length, 0);
+ }
+ }, 100)
+).export(module);
View
82 test/job-manager-simple-test.js
@@ -0,0 +1,82 @@
+/*
+ * jobber-test.js: Tests unit tests for jobber module
+ *
+ * (C) 2010 Charlie Robbins
+ *
+ */
+
+require.paths.unshift(require('path').join(__dirname, '..', 'lib'));
+
+var sys = require('sys'),
+ fs = require('fs'),
+ path = require('path'),
+ eyes = require('eyes'),
+ vows = require('vows'),
+ assert = require('assert'),
+ jobber = require('jobber'),
+ helpers = require('./helpers');
+
+var workerId;
+
+vows.describe('jobber/job-manager/simple').addBatch({
+ "When using an instance of the JobManager": {
+ topic: function () {
+ var manager = new jobber.JobManager();
+ manager.setJob(new jobber.Job('listDir', {
+ dirname: __dirname,
+ work: helpers.listDir(100)
+ }));
+
+ return manager;
+ },
+ "the start() method": {
+ topic: function (manager) {
+ var that = this;
+
+ manager.once('finish', function (worker) {
+ that.callback(null, worker, workerId)
+ });
+
+ workerId = manager.start(path.join(__dirname, '..'));
+ },
+ "should start off a job that returns results": function (err, worker, workerId) {
+ assert.isNotNull(worker);
+ assert.isString(worker.stdout);
+ assert.equal(worker.id, workerId);
+ assert.isTrue(worker.finished);
+ assert.isFalse(worker.running);
+ }
+ },
+ "the getWorker() method": {
+ "should return a valid worker": function (manager) {
+ var worker = manager.getWorker(workerId);
+ assert.isNotNull(worker);
+ assert.equal(worker.id, workerId);
+ assert.equal(worker.finished, false);
+ assert.equal(worker.running, true);
+ }
+ }
+ }
+}).addBatch({
+ "When using an instance of the JobManager": {
+ topic: function () {
+ var manager = new jobber.JobManager();
+ return manager;
+ },
+ "the start() method with no job should throw an error": function (manager) {
+ assert.throws(function () { manager.start(__dirname) });
+ },
+ "the setJob() method": {
+ "when passed invalid parameters should throw an error": function (manager) {
+ assert.throws(function () { manager.setJob('foo') });
+ manager.queue.unshift('foo');
+ assert.throws(function () {
+ manager.setJob(new jobber.Job('listDir', {
+ dirname: __dirname,
+ work: helpers.listDir(100)
+ }));
+ });
+ }
+ }
+ }
+}).export(module);
View
62 test/job-test.js
@@ -0,0 +1,62 @@
+/*
+ * jobber-test.js: Tests unit tests for jobber module
+ *
+ * (C) 2010 Charlie Robbins
+ *
+ */
+
+require.paths.unshift(require('path').join(__dirname, '..', 'lib'));
+
+var sys = require('sys'),
+ fs = require('fs'),
+ path = require('path'),
+ eyes = require('eyes'),
+ vows = require('vows'),
+ assert = require('assert'),
+ jobber = require('jobber'),
+ helpers = require('./helpers');
+
+vows.describe('jobber/job').addBatch({
+ "When using an instance of a Job": {
+ "when passed invalid parameters": {
+ "should throw an error": function () {
+ // No params
+ assert.throws(function () {
+ var j = new jobber.Job();
+ });
+
+ // No work
+ assert.throws(function () {
+ var j = new jobber.Worker('someId');
+ });
+
+ // Pass finished
+ assert.throws(function () {
+ var j = new jobber.Worker('someId', {
+ work: function () { /* Purposefully Empty */ },
+ finished: false
+ });
+ });
+ }
+ },
+ "when passed valid parameters": {
+ topic: function () {
+ var job = new jobber.Job('testJob', {
+ work: function () { /* Purposefully empty */ },
+ someProp: true,
+ someObj: {
+ aparam: 'value'
+ }
+ });
+
+ return job;
+ },
+ "should copy the properties to the instance": function (job) {
+ assert.equal(job.jobName, 'testJob');
+ assert.equal(job.someProp, true);
+ assert.isObject(job.someObj);
+ assert.isString(job.someObj.aparam);
+ }
+ }
+ }
+}).export(module);
View
58 test/jobber-test.js
@@ -1,58 +0,0 @@
-/*
- * jobber-test.js: Tests for jobber module
- *
- * (C) 2010 Charlie Robbins
- *
- */
-
-require.paths.unshift(require('path').join(__dirname, '..', 'lib'));
-
-var sys = require('sys'),
- fs = require('fs'),
- vows = require('vows'),
- assert = require('assert'),
- jobber = require('jobber');
-
-var testJob;
-
-vows.describe('jobber').addBatch({
- "When using an instance of the JobManager": {
- topic: function () {
- var manager = new jobber.JobManager();
- testJob = manager.addJob({ user: 'bar' });
-
- return manager;
- },
- "the addJob() method": {
- "should return a valid job": function (manager) {
- var job = manager.addJob({ user: 'foo' });
- assert.isNotNull(job);
- assert.include(job, 'id');
- assert.equal(job.finished, false);
- assert.equal(job.user, 'foo');
- }
- },
- "the getJob() method": {
- "should return a valid job": function (manager) {
- var job = manager.getJob(testJob.id);
- assert.isNotNull(job);
- assert.include(job, 'id');
- assert.equal(job.id, testJob.id);
- assert.equal(job.finished, false);
- assert.equal(job.user, 'bar');
- }
- }
- }
-}).addBatch({
- "When using an instance of a Job": {
- "setting 'finished' to true": {
- topic: function () {
- testJob.on('finish', this.callback);
- testJob.finished = true;
- },
- "should raise the 'finish' event": function () {
- assert.isTrue(testJob.finished);
- }
- }
- }
-}).export(module);
View
55 test/worker-test.js
@@ -0,0 +1,55 @@
+/*
+ * jobber-test.js: Tests unit tests for jobber module
+ *
+ * (C) 2010 Charlie Robbins
+ *
+ */
+
+require.paths.unshift(require('path').join(__dirname, '..', 'lib'));
+
+var sys = require('sys'),
+ fs = require('fs'),
+ path = require('path'),
+ eyes = require('eyes'),
+ vows = require('vows'),
+ assert = require('assert'),
+ jobber = require('jobber'),
+ helpers = require('./helpers');
+
+var worker;
+
+vows.describe('jobber/worker').addBatch({
+ "When using an instance of a Worker": {
+ "when passed invalid parameters": {
+ "should throw an error": function () {
+ // No params
+ assert.throws(function () {
+ var w = new jobber.Worker();
+ });
+
+ // No job
+ assert.throws(function () {
+ var w = new jobber.Worker('someId');
+ });
+
+ // Not an instance of jobber.Job
+ assert.throws(function () {
+ var w = new jobber.Worker('someId', function () { /* Purposefully Empty */ });
+ });
+ }
+ },
+ "setting 'finished' to true": {
+ topic: function () {
+ worker = new jobber.Worker('testId', new jobber.Job('empty', {
+ work: function () { /* Purposefully empty */ }
+ }));
+
+ worker.on('finish', this.callback);
+ worker.finished = true;
+ },
+ "should raise the 'finish' event": function () {
+ assert.isTrue(worker.finished);
+ }
+ }
+ }
+}).export(module);

0 comments on commit 5427cfc

Please sign in to comment.