Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
brianc committed Aug 5, 2013
0 parents commit 49d4c11
Show file tree
Hide file tree
Showing 13 changed files with 359 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
@@ -0,0 +1 @@
node_modules
8 changes: 8 additions & 0 deletions Makefile
@@ -0,0 +1,8 @@
SHELL := /bin/bash

node-command := xargs -n 1 -I file node file

.PHONY : test

test:
@find test -name "*-tests.js" | $(node-command)
7 changes: 7 additions & 0 deletions examples/master.js
@@ -0,0 +1,7 @@
var forky = require('../');

forky.log = function() {
console.log.apply(console, arguments)
};

forky(__dirname + '/server');
96 changes: 96 additions & 0 deletions examples/server.js
@@ -0,0 +1,96 @@
var express = require('express');
var cluster = require('cluster');
var forky = require('../');
var http = require('http');
var sliced = require('sliced');

var workerId = function() {
if(cluster.isWorker) {
return cluster.worker.id;
}
return 'NON-CLUSTERED WORKER ???';
}

var log = function() {
var args = sliced(arguments);
args.unshift(workerId());
console.log.apply(console, args);
}

var app = express();
app.use(require('express-domain-middleware'));
app.use(app.router);

//express error handler
//which will catch domain errors and respond nicely with a 500
//after the response, it will disconnect this worker gracefully
//meaning no more connections will be accepted and once the
//error connection terminates, the worker will die
app.use(function(err, req, res, next) {
log('route error. disconnecting.');
forky.disconnect();
res.send(500, workerId() + ' request error');
});

app.get('/', function(req, res, next) {
res.send(workerId() + " OK")
});

//throw an unhandled error which will be caught
//by the domain
app.get('/crash', function(req, res, next) {
process.nextTick(function() {
throw new Error("PWND BROTHER");
});
});

//tell this process to exit immediately after
//this response has completed
//forky will respawn another work when this one
//dies
app.get('/exit', function(req, res, next) {
res.on('finish', function() {
console.log('res closed!')
process.exit(0);
});
res.send(workerId() + ' exit');
});

//send the disconnect signal to forky
//this will behave in the same way as the error handler
//no more requests will be handled on this worker and
//this worker will be allowed to gracefully die
app.get('/disconnect', function(req, res, next) {
process.nextTick(function() {
log(workerId() + ' disconnecting from master');
res.send(workerId() + " close");
forky.disconnect();
});
});

//the worker will likely have references keeping it event loop
//alive forever. These can be things like open database connections
//in a pool of connected clients or other long living timeouts.
//if you want to force a disconnection and total worker shutdown/cleanup
//after a specific timeout you can pass a timeout to disconnect.
//after this timeout forky will forcefully kill this worker
app.get('/disconnect/:timeout', function(req, res, next) {
var timeout = req.params.timeout;
setInterval(function() {
console.log(workerId() + ' I am doing something in the background every so often...');
}, 5000);
forky.disconnect(timeout);
res.send(workerId() + 'disconnecting in ' + timeout + ' miliseconds');
});

var server = http.createServer(app);

process.on('exit', function() {
log('exit');
});

server.listen(8485, function() {
log('listening on', 8485);
});

module.exports = server;
82 changes: 82 additions & 0 deletions index.js
@@ -0,0 +1,82 @@
var cluster = require('cluster');
var os = require('os');

var shuttingDown = false;

var killTimeout = function(worker, timeout) {
forky.log('setting kill timeout of', timeout, 'for worker', worker.id);
var tid = setTimeout(function() {
forky.log('worker', worker.id, 'did not shutdown after timeout', timeout, 'killing');
worker.kill();
}, timeout);
worker.once('exit', function() {
forky.log('worker', worker.id, 'died. clear kill timeout');
clearTimeout(tid);
});
}

//fork a new worker
var forkWorker = function() {
if(shuttingDown) return;
var worker = cluster.fork();
forky.log('forked worker', worker.id);
//set up a listener for the disconnect message
//a worker can send this by calling `forky.disconnect([timeout])`
worker.once('message', function(msg) {
if(!msg.action == 'disconnect') return;
forkWorker();
worker.disconnect();

if(!msg.timeout) return;
killTimeout(worker, msg.timeout);
});

worker.once('disconnect', function() {
forky.log('worker', worker.id, ' disconnected.', 'suicide', worker.suicide);
if(worker.suicide) return;
forkWorker();
//set short kill timeout for unexpected worker shutdown
killTimeout(worker, 1000);
});
};

forky = module.exports = function(path, cb) {
cluster.setupMaster({
exec: path
});
var cores = os.cpus().length;
forky.log('starting', cores, 'workers');
for(var i = 0; i < cores; i++) {
forkWorker();
}
var listeningWorkers = 0;
cluster.on('listening', function(worker) {
if(++listeningWorkers == cores) {
cb ? cb(null, cluster) : function(){};
}
});
}

//call this from a worker to disconnect the worker
//forky will automatically spawn a new worker in its place
forky.disconnect = function(timeout) {
if(!cluster.isWorker) {
throw new Exception("You are not a worker");
}
var worker = cluster.worker;
if(worker.state == 'disconnecting') return;
worker.state = 'disconnecting';
worker.disconnectTimeout = timeout;
forky.log('disconnecting worker', worker.id);
if(timeout) {
worker.send({action: 'disconnect', timeout: timeout});
} else {
worker.send({action: 'disconnect'});
}
};

//this is a no-op but you can override it if you
//want some detailed log messages about what
//forky is doing with your workers
forky.log = function() {
};
31 changes: 31 additions & 0 deletions package.json
@@ -0,0 +1,31 @@
{
"name": "forky",
"version": "0.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "mocha"
},
"repository": {
"type": "git",
"url": "git://github.com/brianc/node-forky.git"
},
"keywords": [
"cluster",
"pre-fork"
],
"author": "Brian M. Carlson",
"license": "MIT",
"bugs": {
"url": "https://github.com/brianc/node-forky/issues"
},
"devDependencies": {
"okay": "~0.3.0",
"mocha": "~1.12.0",
"express": "~3.3.4",
"request": "~2.25.0",
"async": "~0.2.9",
"sliced": "0.0.5",
"express-domain-middleware": "~0.1.0"
}
}
13 changes: 13 additions & 0 deletions test/close-server-tests.js
@@ -0,0 +1,13 @@
var helper = require('./');
var forky = require('../');
var assert = require('assert');



var master = forky(helper.serverPath, function(err, master) {
console.log('workers listening');
helper.slam('/disconnect', 200, function(err) {
assert.ifError(err);
master.disconnect();
});
});
11 changes: 11 additions & 0 deletions test/crash-restart-tests.js
@@ -0,0 +1,11 @@
var helper = require('./');
var assert = require('assert');
var forky = require('../');
var request = require('request');

var master = forky(helper.serverPath, function(err, master) {
helper.slam('/crash', 500, function(err) {
assert.ifError(err);
master.disconnect();
});
});
16 changes: 16 additions & 0 deletions test/error-tests.js
@@ -0,0 +1,16 @@
var helper = require('./');
var forky = require('../');
var request = require('request');

var master = forky(helper.serverPath, function(err, master) {
master.on('fork', function() {
for(var id in master.workers) {
console.log(master.workers[id].state);
}
master.disconnect(function() {
console.log('disconnected');
});
});
request.get('http://localhost:8485/crash', function() {
});
});
11 changes: 11 additions & 0 deletions test/exit-tests.js
@@ -0,0 +1,11 @@
var helper = require('./');
var forky = require('../');
var assert = require('assert');

var master = forky(helper.serverPath, function(err, master) {
console.log('workers listening');
helper.slam('/exit', 200, function(err) {
assert.ifError(err);
master.disconnect();
});
});
47 changes: 47 additions & 0 deletions test/index.js
@@ -0,0 +1,47 @@
console.log(process.argv[1]);
var async = require('async');
var ok = require('okay')
var assert = require('assert');
var request = require('request');
var forky = require('../');
forky.log = function() {
console.log.apply(console, arguments);
}

var helper = module.exports = {
serverPath: __dirname + '/../examples/server',
get: function(path, cb) {
var url = 'http://localhost:8485' + path;
request.get(url, cb);
},
//gets the OK url a few times, and then calls
//whatever url you pass, then OK a few more times
slam: function(path, statusCode, cb) {
var hit = function(cb) {
var after = function(err) {
setTimeout(function() {
cb(err);
}, 100)
}
setTimeout(function() {
async.times(3, function(n, next) {
helper.get('/', next);
}, after);
}, 100);
}

var disconnect = function(n, cb) {
helper.get('/', ok(cb, function(res) {
assert.equal(res.statusCode, 200);
helper.get(path, ok(cb, function(res) {
assert.equal(res.statusCode, statusCode);
helper.get('/', ok(cb, function(res) {
assert.equal(res.statusCode, 200);
hit(cb);
}));
}));
}));
};
async.timesSeries(10, disconnect, cb);
}
}
9 changes: 9 additions & 0 deletions test/simple-start-stop-tests.js
@@ -0,0 +1,9 @@
var helper = require('./');
var forky = require('../');
var request = require('request');

var master = forky(helper.serverPath, function(err, master) {
master.disconnect(function() {
console.log('disconnected');
});
});
27 changes: 27 additions & 0 deletions test/stress-tests.js
@@ -0,0 +1,27 @@
var helper = require('./');
var assert = require('assert');
var forky = require('../');
var async = require('async');
var ok = require('okay');

var repeat = 50;
var master = forky(helper.serverPath, function(err, master) {
var hit = function(path, statusCode) {
return function (cb) {
var go = function(n, next) {
helper.slam(path, statusCode, next);
}
async.times(repeat, go, cb);
};
};

var actions = [
hit('/', 200),
hit('/disconnect', 200),
hit('/crash', 500)
];

async.parallel(actions, ok(function() {
master.disconnect();
}));
});

0 comments on commit 49d4c11

Please sign in to comment.