Skip to content

Commit

Permalink
task tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dshaw committed Jun 9, 2012
1 parent 347a9a3 commit 68d7485
Show file tree
Hide file tree
Showing 6 changed files with 389 additions and 0 deletions.
14 changes: 14 additions & 0 deletions examples/inception/inception.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
var charm = require('charm')
, mixture = require('../../')
, mix = mixture.mix('inception')

// mix master emitters
mix.on('online', function (proc) {
console.log(proc.name + ' mix online')
})
mix.emit('online', mix)

// fork a mix master instance
//var inception = mix.task('inception')

console.log(mix.name + ' hello')
42 changes: 42 additions & 0 deletions examples/inception/mix.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
var mixture = require('../../')
, Master = mixture.Master
, isTop = !process.env['MIXTURE_WORKER_ID']
, levels = {
name: 'reality'
, reality: { name: 'city' }
, city: { name: 'hotel' }
, hotel: { name: 'fortress' }
}
, level = levels.name
, mix

console.log('MIXTURE_WORKER_ID:', process.env['MIXTURE_WORKER_ID'], 'top:', isTop)

dream(level)

process.emit('fork', 'hello')

process.on('fork', function (worker, task, master) {
console.log('fork', arguments)
process.exit()
var level = levels(master || master.name)
console.log(level + ': ' + + ' mix online', arguments.length)
//mix.emit('online', mix)
if (level) dream(level) //mix.task('mix').fork()
})

function dream (level) {
mix = mixture.mix(level);

// mix master emitters
mix.on('online', function (proc, task) {
var isMaster = proc instanceof Master
, name = proc.name || task.name
console.log(mix.name + ': ' + name + ' mix online', arguments.length)
})

var worker = mix.task('mix').fork()

if (isTop) mix.emit('online', mix)
console.log(mix.name + ' hello')
}
15 changes: 15 additions & 0 deletions examples/inception/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"name": "mixture-inception-example"
, "version": "0.1.0-pre"
, "description": "Inception Mixture master workers"
, "author": "Daniel D. Shaw <dshaw@dshaw.com> (http://dshaw.com)"
, "keywords": ["mixture"]
, "main": "index"
, "dependencies": {
"charm": ">= 0.0.5"
}
, "engines": { "node": ">= 0.5.9" }
, "scripts": {
"start": "node mix"
}
}
37 changes: 37 additions & 0 deletions examples/stock-quotes/exec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
var fork = require('child_process').fork;

var count = process.argv[2] || 4 // maxes out locally at ~82
, nodes = {
announce: []
, io: []
}
, ioPort = 8881;

process.on('message', function (m) {
console.log('process message', m)
})

// announce data server
var announce = fork('data.js')
nodes.announce.push(announce)
console.log('announce', 'pid:', nodes.announce[0].pid)
announce.send({ m: 1 })

announce.on('message', function (m) {
console.log('child message', m)
})

process.send('hi')

// socket.io instances
for (var i=0; i<count; i++) {
var port = ioPort+i
, nodeId = i+1;
nodes.io[i] = fork('app.js', [port, nodeId]);
console.log(
'io'
, 'nodeId:', nodeId
, 'port:', port
, 'pid:', nodes.io[i].pid
);
}
247 changes: 247 additions & 0 deletions lib/cluster-mix.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
/*!
* mixture
* Copyright(c) 2011 Daniel D. Shaw <dshaw@dshaw.com>
* MIT Licensed
*/

/**
* Module dependencies.
*/

var assert = require('assert')
, fork = require('child_process').fork
, net = require('net')
, EventEmitter = require('events').EventEmitter

var cluster = module.exports = new EventEmitter();

var debug;
if (process.env.NODE_DEBUG && /cluster/.test(process.env.NODE_DEBUG)) {
debug = function(x) {
var prefix = process.pid + ',' + (process.env.NODE_WORKER_ID ? 'Worker' : 'Master');
console.error(prefix, x);
};
} else {
debug = function() { };
}


// Used in the master:
var masterStarted = false;
var ids = 0;
var taskCount = 0;
var tasks = {};
var workers = [];
var servers = {};

// Used in the worker:
var workerId = 0;
var queryIds = 0;
var queryCallbacks = {};

cluster.isWorker = 'NODE_WORKER_ID' in process.env;
cluster.isMaster = ! cluster.isWorker;

// Call this from the master process. It will start child workers.
//
// options.workerFilename
// Specifies the script to execute for the child processes. Default is
// process.argv[1]
//
// options.args
// Specifies program arguments for the workers. The Default is
// process.argv.slice(2)
//
// options.workers
// The number of workers to start. Defaults to os.cpus().length.
function startMaster() {
// This can only be called from the master.
assert(cluster.isMaster);

if (masterStarted) return;
masterStarted = true;

process.on('uncaughtException', function(e) {
// Quickly try to kill all the workers.
// TODO: be session leader - will cause auto SIGHUP to the children.
cluster.eachWorker(function(worker) {
debug("kill worker " + worker.pid);
worker.kill();
});

console.error("Exception in cluster master process: " +
e.message + '\n' + e.stack);
console.error("Please report this bug.");
process.exit(1);
});
}


function handleWorkerMessage(worker, message) {
// This can only be called from the master.
assert(cluster.isMaster);

debug("recv " + JSON.stringify(message));

switch (message.cmd) {
case 'online':
console.log("Worker " + worker.pid + " online");
workers.push(worker);
break;

case 'queryServer':
var key = message.address + ":" +
message.port + ":" +
message.addressType;
var response = { _queryId: message._queryId };

if (key in servers == false) {
// Create a new server.
debug('create new server ' + key);
servers[key] = net._createServerHandle(message.address,
message.port,
message.addressType);
}
worker.send(response, servers[key]);
break;

default:
// Ignore.
break;
}
}


function eachWorker(task, cb) {
// This can only be called from the master.
assert(cluster.isMaster);

var _workers = workers;

if (arguments.length < 2) {
cb = task;
} else {
_workers = tasks[task].workers
}

for (var id in _workers) {
if (_workers[id]) {
cb(_workers[id]);
}
}
}


cluster.fork = function(filename, args, options) {
// This can only be called from the master.
assert(cluster.isMaster);

// Lazily start the master process stuff.
startMaster();

var id = ++ids;
var envCopy = {};

for (var x in process.env) {
envCopy[x] = process.env[x];
}

envCopy['NODE_WORKER_ID'] = id;

if ('object' == typeof filename) {
var name = filename;
filename = tasks[name].filename;
args = tasks[name].args;
options = tasks[name].options;
} else {
// new task
filename || (fileName = process.argv[1]);
args || (args = process.argv.slice(3));
options || (options = {});

var name = options.name || taskCount;
options.name = name; // set if using default
options.env = envCopy;

tasks[name] = {
workers: []
, filename: filename
, args: args
, options: options
};

taskCount++;
}

var worker = fork(filename, args, options);

tasks[name].workers.push(worker);

worker.on('message', function(message) {
handleWorkerMessage(worker, message);
});

worker.on('exit', function() {
debug('worker id=' + id + ' died');
delete workers[id];
cluster.emit('death', worker);
});

return worker;
};


// Internal function. Called from src/node.js when worker process starts.
cluster._startWorker = function(id) {
assert(cluster.isWorker);
workerId = parseInt(process.env.NODE_WORKER_ID);

queryMaster({ cmd: 'online' });

// Make callbacks from queryMaster()
process.on('message', function(msg, handle) {
debug("recv " + JSON.stringify(msg));
if (msg._queryId && msg._queryId in queryCallbacks) {
var cb = queryCallbacks[msg._queryId];
if (typeof cb == 'function') {
cb(msg, handle);
}
delete queryCallbacks[msg._queryId]
}
});
};


function queryMaster(msg, cb) {
assert(cluster.isWorker);

debug('send ' + JSON.stringify(msg));

// Grab some random queryId
msg._queryId = (++queryIds);
msg._workerId = workerId;

// Store callback for later. Callback called in _startWorker.
if (cb) {
queryCallbacks[msg._queryId] = cb;
}

// Send message to master.
process.send(msg);
}


// Internal function. Called by lib/net.js when attempting to bind a
// server.
cluster._getServer = function(address, port, addressType, cb) {
assert(cluster.isWorker);

queryMaster({
cmd: "queryServer",
address: address,
port: port,
addressType: addressType
}, function(msg, handle) {
cb(handle);
});
};
34 changes: 34 additions & 0 deletions test/task.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
var mixture = require('..')
, mix = mixture.mix()
, Master = mixture.Master
, Task = mixture.Task
, tap = require('tap')
, test = tap.test

//console.log(mix, Master, Task, mix instanceof Master)

test('task test', function (t) {
t.ok(Task, 'exports Task')

var master = new Master()
var task = master.task('name')
t.isa(master.task, 'function', 'task method defined')
t.isa(task, Task, 'task returns a task')
t.ok(task.master, 'task has a reference to the master')
t.isa(task.master, Master, 'task master is an instance of Master')

var worker1 = task.fork('./test/fixtures/simple')
t.ok(worker1, 'worker is defined')
t.equal(task.workers.length, 1, 'task now has 1 worker')

for (var i = 0; i < 4; i++) task.fork()
t.equal(task.workers.length, 5, 'task now has 5 workers')

task.kill()
t.equal(task.workers.length, 5, 'task now has 5 workers')

task.fork()
t.equal(task.workers.length, 1, 'task restored. has 1 worker')

t.end()
})

0 comments on commit 68d7485

Please sign in to comment.