Skip to content

Commit

Permalink
New worker interface (fixes #1)
Browse files Browse the repository at this point in the history
  • Loading branch information
rauchg committed Jan 29, 2012
1 parent ec1dc85 commit 6ad3f6b
Showing 1 changed file with 121 additions and 58 deletions.
179 changes: 121 additions & 58 deletions lib/up.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ var fork = require('child_process').fork
, os = require('os')
, ms = require('ms')
, Distributor = require('distribute')
, EventEmitter = require('events').EventEmitter
, debug = require('debug')('up')

/**
Expand All @@ -25,6 +26,14 @@ module.exports = exports = UpServer;

exports.version = '0.1.0';

/**
* Worker constructor.
*
* @api public
*/

exports.Worker = Worker;

/**
* Number of CPUs available.
*/
Expand Down Expand Up @@ -54,9 +63,8 @@ function UpServer (server, file, opts) {
this.workerTimeout = ms(null != opts.workerTimeout ? opts.workerTimeout : '10m');
this.requires = opts.requires || [];

this.ports = [];
this.portsMap = {};
this.procs = {};
this.workers = [];
this.spawning = [];
this.lastIndex = -1;

// setup workers
Expand All @@ -83,93 +91,79 @@ UpServer.prototype.reload = function (fn) {
return this;
}

// we keep track of how many workers will exit
var num = this.ports.length
// remove all workers in the spawning state
for (var i = 0, l = this.spawning.length; i < l; i++) {
this.spawning[i].shutdown();
}

// snapshot what workers we'll shut down
var reload = [].concat(this.workers)
, self = this

debug('reloading - spawning %d new workers', this.numWorkers);

this.spawnWorkers(this.numWorkers);
this.reloading = true;

this.once('spawn', function () {
debug('worker spawned - removing old workers');
self.reloading = false;
self.emit('reload');
fn && fn();

var reload = this.ports.splice(0, num)
, proc, port

// shut down old workers
for (var i = 0, l = reload.length; i < l; i++) {
port = reload[i];
proc = self.procs[port];
debug('telling worker %s to exit in %dms', proc.pid, self.workerTimeout);
proc.send({ cmd: 'die', time: self.workerTimeout });
proc.on('exit', function () {
debug('worker %s terminated', proc.pid);
});
delete self.procs[port];
delete self.portsMap[port];
reload[i].shutdown();
}

this.lastIndex = -1;
});

return this;
};

/**
* Spawns multiple workers.
* Helper function to spawn multiple workers.
*
* @param {Number} number of workers to spawn
* @param {Function} callback upon all are added
* @api public
*/

UpServer.prototype.spawnWorkers = function (n, fn) {
UpServer.prototype.spawnWorkers = function (n) {
debug('spawning %d workers from master %d', n, process.pid);
for (var i = 0, l = n; i < l; i++) {
this.spawnWorker(function () {
--n || fn && fn();
});
this.spawnWorker();
}
};

/**
* Spawns a worker that binds to an available port.
*
* @param {Function} fn that gets called with the bound port
* @api public
*/

UpServer.prototype.spawnWorker = function (fn) {
var args = [this.file, JSON.stringify(this.requires)]
, proc = fork(__dirname + '/worker.js', args)
var w = new Worker(this)
, self = this

proc.on('message', function (addr) {
debug('worker spawned on port %d with pid %d', addr.port, proc.pid);
var port = addr.port;
self.ports.push(port);
self.portsMap[port] = true;
self.procs[port] = proc;
self.emit('spawn', port);

proc.on('exit', function () {
// we check for a premature death
if (self.portsMap[port]) {
debug('sudden death of worker on port %d with pid %d', port, proc.pid);
self.ports.splice(self.ports.indexOf(port), 1);
delete self.portsMap[port];
delete self.procs[port];

// we reset the round
self.lastIndex = -1;
}
});

fn();
// keep track that we're spawning
this.spawning.push(w);

w.on('stateChange', function () {
switch (w.readyState) {
case 'spawned':
self.spawning.splice(self.spawning.indexOf(w), 1);
self.workers.push(w);
self.emit('spawn', w);
break;

case 'terminating':
case 'terminated':
if (~self.spawning.indexOf(self.spawning.indexOf(w))) {

This comment has been minimized.

Copy link
@lukasberns

lukasberns Jun 1, 2012

This cascaded indexOf() looks like a bug to me. I'm just scanning so maybe I'm wrong

self.spawning.splice(self.spawning.indexOf(w), 1);
}
if (~self.workers.indexOf(w)) {
self.workers.splice(self.workers.indexOf(w), 1);
self.lastIndex = -1;
// @TODO: auto-add workers ?
}
break;
}
});
};

Expand All @@ -181,8 +175,8 @@ UpServer.prototype.spawnWorker = function (fn) {

UpServer.prototype.nextWorker = function () {
this.lastIndex++;
if (!this.ports[this.lastIndex]) this.lastIndex = 0;
return this.ports[this.lastIndex];
if (!this.workers[this.lastIndex]) this.lastIndex = 0;
return this.workers[this.lastIndex];
};

/**
Expand All @@ -194,12 +188,81 @@ UpServer.prototype.nextWorker = function () {

UpServer.prototype.defaultHTTP =
UpServer.prototype.defaultWS = function (req, res, next) {
if (this.ports.length) {
next(this.nextWorker());
if (this.workers.length) {
next(this.nextWorker().port);
} else {
var self = this;
this.once('spawn', function () {
next(self.nextWorker());
next(self.nextWorker().port);
});
}
};

/**
* Worker constructor.
*
* @api private
*/

function Worker (server) {
this.server = server;
this.readyState = 'spawning';
this.proc = fork(__dirname + '/worker.js'
, [server.file, JSON.stringify(server.requires)]);
this.proc.on('message', this.onMessage.bind(this));
this.proc.on('exit', this.onExit.bind(this));
}

/**
* Inherits from EventEmitter.
*/

Worker.prototype.__proto__ = EventEmitter.prototype;

/**
* Called upon worker exit.
* Sudden exits will mean the worker won't go through `terminating` state
*
* @api private
*/

Worker.prototype.onExit = function () {
this.readyState = 'terminated';
this.emit('stateChange');
};

/**
* Handles an incoming message from the worker.
*
* @api private
*/

Worker.prototype.onMessage = function (addr) {
// avoid spawns after SIGHUP was sent
if ('spawning' == this.readyState) {
this.port = addr.port;
this.readyState = 'spawned';
this.emit('stateChange');
}
};

/**
* Shuts down a worker.
*
* @api private
*/

Worker.prototype.shutdown = function () {
if ('spawned' == this.readyState) {
var timeout = this.server.workerTimeout;
debug('telling worker %s to exit in %dms', this.proc.pid, timeout);
this.proc.send({ cmd: 'die', time: timeout });
this.readyState = 'terminating';
this.emit('stateChange');
} else if ('spawning' == this.readyState) {
debug('killing spawning worker');
this.proc.kill('SIGHUP');
this.readyState = 'terminating';
this.emit('stateChange');
}
};

0 comments on commit 6ad3f6b

Please sign in to comment.