Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base: 0.1
...
compare: help
Checking mergeability… Don't worry, you can still create the pull request.
  • 18 commits
  • 22 files changed
  • 0 commit comments
  • 2 contributors
View
37 Readme.md
@@ -1,6 +1,6 @@
# Mixture
- Heterogeneous cluster task manager designed to manage and coordinate tasks amongst multiple, diverse child processes.
+Heterogeneous cluster task manager designed to manage and coordinate tasks amongst multiple, diverse child processes.
## Status
@@ -30,6 +30,39 @@
var worker = socketio.fork({ args: [ioPort, nodeId] })
}
+## API
+
+### Mix Master
+
+Require mixture, create a mix
+
+ var mix = require('mixture').mix()
+
+Optionally, name your mix (for network identification)
+
+ var mixture = require('mixture')
+ , mix = mixture.mix('jupiter')
+
+### Tasks
+
+Define a simple task with straightforward [fork](http://nodejs.org/docs/v0.6.0/api/child_processes.html#child_process.fork) semantics.
+
+ mix.task('express').fork('server.js')
+ console.log(mix.name, mix.workers.length)
+
+A task returns a reference to the task that you can refer to at any point
+task fork accepts an options argument so you can pass in args or options for a spefic forked worker instance.
+
+ // spin up a second instance
+ var task = mix.task('express')
+ task.fork({ args: [9001] })
+ console.log(task.name, task.workers.length)
+
+when you don't need a task worker anymore, just [kill](http://nodejs.org/docs/v0.6.0/api/child_processes.html#child.kill) it
+
+ var worker = mix.task('express').workers.pop()
+ worker.kill()
+
## Sample App
* [Stock Quote Stream](https://github.com/dshaw/mixture/tree/master/examples/stock-quotes)
@@ -57,4 +90,4 @@ MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
-SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
View
14 examples/inception/inception.js
@@ -0,0 +1,14 @@
+var charm = require('charm')
+ , mixture = require('../../')
+ , mix = mixture.mix('inception')
+
+// mix master emitters
+mix.on('online', function (proc) {
+ console.log(proc.name + ' mix online')
+})
+mix.emit('online', mix)
+
+// fork a mix master instance
+//var inception = mix.task('inception')
+
+console.log(mix.name + ' hello')
View
42 examples/inception/mix.js
@@ -0,0 +1,42 @@
+var mixture = require('../../')
+ , Master = mixture.Master
+ , isTop = !process.env['MIXTURE_WORKER_ID']
+ , levels = {
+ name: 'reality'
+ , reality: { name: 'city' }
+ , city: { name: 'hotel' }
+ , hotel: { name: 'fortress' }
+ }
+ , level = levels.name
+ , mix
+
+console.log('MIXTURE_WORKER_ID:', process.env['MIXTURE_WORKER_ID'], 'top:', isTop)
+
+dream(level)
+
+process.emit('fork', 'hello')
+
+process.on('fork', function (worker, task, master) {
+ console.log('fork', arguments)
+ process.exit()
+ var level = levels(master || master.name)
+ console.log(level + ': ' + + ' mix online', arguments.length)
+ //mix.emit('online', mix)
+ if (level) dream(level) //mix.task('mix').fork()
+})
+
+function dream (level) {
+ mix = mixture.mix(level);
+
+ // mix master emitters
+ mix.on('online', function (proc, task) {
+ var isMaster = proc instanceof Master
+ , name = proc.name || task.name
+ console.log(mix.name + ': ' + name + ' mix online', arguments.length)
+ })
+
+ var worker = mix.task('mix').fork()
+
+ if (isTop) mix.emit('online', mix)
+ console.log(mix.name + ' hello')
+}
View
15 examples/inception/package.json
@@ -0,0 +1,15 @@
+{
+ "name": "mixture-inception-example"
+ , "version": "0.1.0-pre"
+ , "description": "Inception Mixture master workers"
+ , "author": "Daniel D. Shaw <dshaw@dshaw.com> (http://dshaw.com)"
+ , "keywords": ["mixture"]
+ , "main": "index"
+ , "dependencies": {
+ "charm": ">= 0.0.5"
+ }
+ , "engines": { "node": ">= 0.5.9" }
+ , "scripts": {
+ "start": "node mix"
+ }
+}
View
8 examples/stock-quotes/Readme.md
@@ -12,4 +12,10 @@ Note: If you're running Node v0.7+ you might have to run:
### Managed mixed cluster
-* `node . [n]` - manage all processes, creating a data stream and n socket.io apps. defaults to 4.
+* `node mix [n]` - manage all processes, creating a data stream and n socket.io apps. defaults to 4. try 80.
+
+### Fun with Mix Masters
+
+* `node mix-death [n]` - force the death of one of the socket.io processes
+* `node mix-balanced [n]` - add load balancing with [bouncy](https://github.com/substack/bouncy) to managed processes with 3 lines of code.
+* `sudo node mix-lb80 [n]` - run the load balancer on port 80.
View
3  examples/stock-quotes/app.js
@@ -1,11 +1,12 @@
var express = require('express')
, sio = require('socket.io')
, RedisStore = sio.RedisStore
+ , worker = require('../../').worker
var port = process.argv[2] || 8880
, id = process.argv[3] || 0
, delay = process.argv[4] || 800
- , app = app = express.createServer(express.static(__dirname + '/.'))
+ , app = express.createServer(express.static(__dirname + '/.'))
, io = sio.listen(app)
io.configure(function () {
View
1  examples/stock-quotes/data.js
@@ -5,6 +5,7 @@
*/
var announce = require('socket.io-announce').createClient()
+ , worker = require('../../').worker
/* fake data stream */
var symbols = 'THOO GOOF EXIT BOP SDD ALPP RIGM OPPL HPBG'.split(' ')
View
37 examples/stock-quotes/exec.js
@@ -0,0 +1,37 @@
+var fork = require('child_process').fork;
+
+var count = process.argv[2] || 4 // maxes out locally at ~82
+ , nodes = {
+ announce: []
+ , io: []
+ }
+ , ioPort = 8881;
+
+process.on('message', function (m) {
+ console.log('process message', m)
+})
+
+// announce data server
+var announce = fork('data.js')
+nodes.announce.push(announce)
+console.log('announce', 'pid:', nodes.announce[0].pid)
+announce.send({ m: 1 })
+
+announce.on('message', function (m) {
+ console.log('child message', m)
+})
+
+process.send('hi')
+
+// socket.io instances
+for (var i=0; i<count; i++) {
+ var port = ioPort+i
+ , nodeId = i+1;
+ nodes.io[i] = fork('app.js', [port, nodeId]);
+ console.log(
+ 'io'
+ , 'nodeId:', nodeId
+ , 'port:', port
+ , 'pid:', nodes.io[i].pid
+ );
+}
View
28 examples/stock-quotes/mix-balanced.js
@@ -0,0 +1,28 @@
+var mix = require('../../').mix('balanced')
+ , bouncy = require('bouncy')
+
+var count = process.argv[2] || 4
+ , portmap = []
+ , port = 8880
+ , nodeId = 0
+
+// announce data server
+mix.task('announce').fork('data.js')
+
+// socket.io instances
+var socketio = mix.task('socket.io', { filename: 'app.js' })
+
+for (var i = 0; i < count; i++) {
+ port++;
+ nodeId++;
+ portmap.push(port)
+
+ var worker = socketio.fork({ args: [port, nodeId] })
+}
+
+// load balance the socket.io instances
+bouncy(function (req, bounce) {
+ bounce(portmap[Math.random()*portmap.length|0])
+}).listen(8880)
+
+console.log('bouncy listening on :%d', 8880)
View
22 examples/stock-quotes/mix-death.js
@@ -0,0 +1,22 @@
+var mix = require('../../').mix()
+
+var count = process.argv[2] || 5
+ , ioPort = 8880
+ , nodeId = 0
+
+// announce data server
+mix.task('announce').fork('data.js')
+
+// socket.io instances
+var socketio = mix.task('socket.io', { filename: 'app.js' })
+
+for (var i = 0; i < count; i++) {
+ ioPort++;
+ nodeId++;
+
+ var worker = socketio.fork({ args: [ioPort, nodeId] })
+}
+
+console.log('Kill off one of the managed workers. Just for fun.')
+var lastSio = socketio.workers.pop()
+lastSio.kill()
View
28 examples/stock-quotes/mix-lb80.js
@@ -0,0 +1,28 @@
+var mix = require('../../').mix('balanced')
+ , bouncy = require('bouncy')
+
+var count = process.argv[2] || 4
+ , portmap = []
+ , port = 8880
+ , nodeId = 0
+
+// announce data server
+mix.task('announce').fork('data.js')
+
+// socket.io instances
+var socketio = mix.task('socket.io', { filename: 'app.js' })
+
+for (var i = 0; i < count; i++) {
+ port++;
+ nodeId++;
+ portmap.push(port)
+
+ var worker = socketio.fork({ args: [port, nodeId] })
+}
+
+// load balance the socket.io instances
+bouncy(function (req, bounce) {
+ bounce(portmap[Math.random()*portmap.length|0])
+}).listen(80)
+
+console.log('bouncy listening on :%d', 80)
View
3  examples/stock-quotes/package.json
@@ -16,7 +16,8 @@
"start": "node ."
},
"dependencies": {
- "mixture": "~0.1.0",
+ "mixture": "",
+ "bouncy": ">= 1.0.2",
"express": "~2.5.9",
"socket.io": "https://github.com/dshaw/socket.io/tarball/patch/redis71",
"socket.io-announce": "https://github.com/dshaw/socket.io-announce/tarball/patch/redis71"
View
12 index.js
@@ -1,6 +1,6 @@
/*!
* mixture
- * Copyright(c) 2011 Daniel D. Shaw <dshaw@dshaw.com>
+ * Copyright(c) 2011-2012 Daniel D. Shaw <dshaw@dshaw.com>
* MIT Licensed
*/
@@ -8,10 +8,16 @@
* Mixture
*/
-exports = module.exports = require('./lib/mixture.js')
+var mixture = exports = module.exports = require('./mixture.js')
/**
* Version
*/
-exports.version = require('./package.json').version
+mixture.version = require('./package.json').version
+
+/**
+ * Worker mixin
+ */
+
+mixture.worker = require('./lib/worker')
View
247 lib/cluster-mix.js
@@ -0,0 +1,247 @@
+/*!
+ * mixture
+ * Copyright(c) 2011 Daniel D. Shaw <dshaw@dshaw.com>
+ * MIT Licensed
+ */
+
+/**
+ * Module dependencies.
+ */
+
+var assert = require('assert')
+ , fork = require('child_process').fork
+ , net = require('net')
+ , EventEmitter = require('events').EventEmitter
+
+var cluster = module.exports = new EventEmitter();
+
+var debug;
+if (process.env.NODE_DEBUG && /cluster/.test(process.env.NODE_DEBUG)) {
+ debug = function(x) {
+ var prefix = process.pid + ',' + (process.env.NODE_WORKER_ID ? 'Worker' : 'Master');
+ console.error(prefix, x);
+ };
+} else {
+ debug = function() { };
+}
+
+
+// Used in the master:
+var masterStarted = false;
+var ids = 0;
+var taskCount = 0;
+var tasks = {};
+var workers = [];
+var servers = {};
+
+// Used in the worker:
+var workerId = 0;
+var queryIds = 0;
+var queryCallbacks = {};
+
+cluster.isWorker = 'NODE_WORKER_ID' in process.env;
+cluster.isMaster = ! cluster.isWorker;
+
+// Call this from the master process. It will start child workers.
+//
+// options.workerFilename
+// Specifies the script to execute for the child processes. Default is
+// process.argv[1]
+//
+// options.args
+// Specifies program arguments for the workers. The Default is
+// process.argv.slice(2)
+//
+// options.workers
+// The number of workers to start. Defaults to os.cpus().length.
+function startMaster() {
+ // This can only be called from the master.
+ assert(cluster.isMaster);
+
+ if (masterStarted) return;
+ masterStarted = true;
+
+ process.on('uncaughtException', function(e) {
+ // Quickly try to kill all the workers.
+ // TODO: be session leader - will cause auto SIGHUP to the children.
+ cluster.eachWorker(function(worker) {
+ debug("kill worker " + worker.pid);
+ worker.kill();
+ });
+
+ console.error("Exception in cluster master process: " +
+ e.message + '\n' + e.stack);
+ console.error("Please report this bug.");
+ process.exit(1);
+ });
+}
+
+
+function handleWorkerMessage(worker, message) {
+ // This can only be called from the master.
+ assert(cluster.isMaster);
+
+ debug("recv " + JSON.stringify(message));
+
+ switch (message.cmd) {
+ case 'online':
+ console.log("Worker " + worker.pid + " online");
+ workers.push(worker);
+ break;
+
+ case 'queryServer':
+ var key = message.address + ":" +
+ message.port + ":" +
+ message.addressType;
+ var response = { _queryId: message._queryId };
+
+ if (key in servers == false) {
+ // Create a new server.
+ debug('create new server ' + key);
+ servers[key] = net._createServerHandle(message.address,
+ message.port,
+ message.addressType);
+ }
+ worker.send(response, servers[key]);
+ break;
+
+ default:
+ // Ignore.
+ break;
+ }
+}
+
+
+function eachWorker(task, cb) {
+ // This can only be called from the master.
+ assert(cluster.isMaster);
+
+ var _workers = workers;
+
+ if (arguments.length < 2) {
+ cb = task;
+ } else {
+ _workers = tasks[task].workers
+ }
+
+ for (var id in _workers) {
+ if (_workers[id]) {
+ cb(_workers[id]);
+ }
+ }
+}
+
+
+cluster.fork = function(filename, args, options) {
+ // This can only be called from the master.
+ assert(cluster.isMaster);
+
+ // Lazily start the master process stuff.
+ startMaster();
+
+ var id = ++ids;
+ var envCopy = {};
+
+ for (var x in process.env) {
+ envCopy[x] = process.env[x];
+ }
+
+ envCopy['NODE_WORKER_ID'] = id;
+
+ if ('object' == typeof filename) {
+ var name = filename;
+ filename = tasks[name].filename;
+ args = tasks[name].args;
+ options = tasks[name].options;
+ } else {
+ // new task
+ filename || (fileName = process.argv[1]);
+ args || (args = process.argv.slice(3));
+ options || (options = {});
+
+ var name = options.name || taskCount;
+ options.name = name; // set if using default
+ options.env = envCopy;
+
+ tasks[name] = {
+ workers: []
+ , filename: filename
+ , args: args
+ , options: options
+ };
+
+ taskCount++;
+ }
+
+ var worker = fork(filename, args, options);
+
+ tasks[name].workers.push(worker);
+
+ worker.on('message', function(message) {
+ handleWorkerMessage(worker, message);
+ });
+
+ worker.on('exit', function() {
+ debug('worker id=' + id + ' died');
+ delete workers[id];
+ cluster.emit('death', worker);
+ });
+
+ return worker;
+};
+
+
+// Internal function. Called from src/node.js when worker process starts.
+cluster._startWorker = function(id) {
+ assert(cluster.isWorker);
+ workerId = parseInt(process.env.NODE_WORKER_ID);
+
+ queryMaster({ cmd: 'online' });
+
+ // Make callbacks from queryMaster()
+ process.on('message', function(msg, handle) {
+ debug("recv " + JSON.stringify(msg));
+ if (msg._queryId && msg._queryId in queryCallbacks) {
+ var cb = queryCallbacks[msg._queryId];
+ if (typeof cb == 'function') {
+ cb(msg, handle);
+ }
+ delete queryCallbacks[msg._queryId]
+ }
+ });
+};
+
+
+function queryMaster(msg, cb) {
+ assert(cluster.isWorker);
+
+ debug('send ' + JSON.stringify(msg));
+
+ // Grab some random queryId
+ msg._queryId = (++queryIds);
+ msg._workerId = workerId;
+
+ // Store callback for later. Callback called in _startWorker.
+ if (cb) {
+ queryCallbacks[msg._queryId] = cb;
+ }
+
+ // Send message to master.
+ process.send(msg);
+}
+
+
+// Internal function. Called by lib/net.js when attempting to bind a
+// server.
+cluster._getServer = function(address, port, addressType, cb) {
+ assert(cluster.isWorker);
+
+ queryMaster({
+ cmd: "queryServer",
+ address: address,
+ port: port,
+ addressType: addressType
+ }, function(msg, handle) {
+ cb(handle);
+ });
+};
View
171 lib/cp-util.js
@@ -0,0 +1,171 @@
+/*!
+ * mixture
+ * Copyright(c) 2011-2012 Daniel D. Shaw <dshaw@dshaw.com>
+ * MIT Licensed
+ */
+
+/**
+ * Child Process Utilities - largely adapted from internal Node.js child_process.js functions.
+ */
+
+/**
+ * Module dependencies.
+ */
+
+var net = require('net')
+
+/**
+ * Exports
+ */
+
+exports.createPipe = createPipe
+exports.createSocket = createSocket
+exports.mergeOptions = mergeOptions
+exports.setupChannel = setupChannel
+
+
+/**
+ * Create Pipe
+ * - constructors for lazy loading
+ *
+ * @param ipc
+ */
+
+function createPipe(ipc) {
+ var Pipe;
+
+ // Lazy load
+ if (!Pipe) {
+ Pipe = new process.binding('pipe_wrap').Pipe;
+ }
+
+ return new Pipe(ipc);
+}
+
+/**
+ * Create Socket
+ *
+ * @param pipe
+ * @param readable
+ */
+
+function createSocket(pipe, readable) {
+ var s = new net.Socket({ handle: pipe });
+
+ if (readable) {
+ s.writable = false;
+ s.readable = true;
+ s.resume();
+ } else {
+ s.writable = true;
+ s.readable = false;
+ }
+
+ return s;
+}
+
+/**
+ * Merge options
+ *
+ * @param target
+ * @param overrides
+ */
+
+function mergeOptions(target, overrides) {
+ if (overrides) {
+ var keys = Object.keys(overrides);
+ for (var i = 0, len = keys.length; i < len; i++) {
+ var k = keys[i];
+ if (overrides[k] !== undefined) {
+ target[k] = overrides[k];
+ }
+ }
+ }
+ return target;
+}
+
+/**
+ * noop
+ */
+
+function nop() { }
+
+/**
+ * Setup Channel
+ *
+ * @param target
+ * @param channel
+ */
+
+function setupChannel(target, channel) {
+ var isWindows = process.platform === 'win32';
+ target._channel = channel;
+
+ var jsonBuffer = '';
+
+ if (isWindows) {
+ var setSimultaneousAccepts = function(handle) {
+ var simultaneousAccepts = (process.env.NODE_MANY_ACCEPTS
+ && process.env.NODE_MANY_ACCEPTS != '0') ? true : false;
+
+ if (handle._simultaneousAccepts != simultaneousAccepts) {
+ handle.setSimultaneousAccepts(simultaneousAccepts);
+ handle._simultaneousAccepts = simultaneousAccepts;
+ }
+ }
+ }
+
+ channel.onread = function(pool, offset, length, recvHandle) {
+ if (recvHandle && setSimultaneousAccepts) {
+ // Update simultaneous accepts on Windows
+ setSimultaneousAccepts(recvHandle);
+ }
+
+ if (pool) {
+ jsonBuffer += pool.toString('ascii', offset, offset + length);
+
+ var i, start = 0;
+ while ((i = jsonBuffer.indexOf('\n', start)) >= 0) {
+ var json = jsonBuffer.slice(start, i);
+ var message = JSON.parse(json);
+
+ target.emit('message', message, recvHandle);
+ start = i+1;
+ }
+ jsonBuffer = jsonBuffer.slice(start);
+
+ } else {
+ channel.close();
+ target._channel = null;
+ }
+ };
+
+// console.log('setting up target send', target)
+ target.send = function(message, sendHandle) {
+ if (!target._channel) throw new Error("channel closed");
+
+ // For overflow protection don't write if channel queue is too deep.
+ if (channel.writeQueueSize > 1024 * 1024) {
+ return false;
+ }
+
+ var buffer = Buffer(JSON.stringify(message) + '\n');
+
+ if (sendHandle && setSimultaneousAccepts) {
+ // Update simultaneous accepts on Windows
+ setSimultaneousAccepts(sendHandle);
+ }
+
+ var writeReq = channel.write(buffer, 0, buffer.length, sendHandle);
+
+ if (!writeReq) {
+ throw new Error(errno + " cannot write to IPC channel.");
+ }
+
+ writeReq.oncomplete = nop;
+
+ return true;
+ };
+
+ channel.readStart();
+}
View
58 lib/task.js
@@ -1,6 +1,6 @@
/*!
* mixture
- * Copyright(c) 2011 Daniel D. Shaw <dshaw@dshaw.com>
+ * Copyright(c) 2011-2012 Daniel D. Shaw <dshaw@dshaw.com>
* MIT Licensed
*/
@@ -8,25 +8,29 @@
* Module dependencies.
*/
-var assert = require('assert')
- , fork = require('child_process').fork
- , util = require('util')
+var util = require('util')
+ , spawn = require('child_process').spawn
, EventEmitter = require('events').EventEmitter
+ , cputil = require('./cp-util')
/**
* Exports.
*/
-exports = module.exports = Task
+module.exports = Task
/**
* Task
*
* @param options
- * @api public
+ * @constructor
*/
function Task (options) {
+ if (!(this instanceof Task)) return new Task(options)
+
+ options || (options = {})
+
this.master = options.master
this.name = options.name
this.filename = options.filename
@@ -58,9 +62,10 @@ Task.prototype.fork = function(filename, args, options) {
, mergedOptions = {}
if (!this.initialized) {
- if (!this.filename) this.filename = filename
+ if (!this.filename) this.filename = filename || this.name
if (!this.args) this.args = args
if (!this.options) this.options = options
+ filename = this.filename
} else {
if ('object' == typeof filename) forkOptions = filename
filename = forkOptions.filename || this.filename
@@ -68,12 +73,16 @@ Task.prototype.fork = function(filename, args, options) {
options = forkOptions.options || this.options
}
+ args = args ? args.slice(0) : [];
+ args.unshift(filename)
+
for (var x in process.env) {
envCopy[x] = process.env[x]
}
// Node's `NODE_WORKER_ID` has too much behavior attached to it.
envCopy['MIXTURE_WORKER_ID'] = id
+ envCopy['MIXTURE_TASK_NAME'] = this.name
for (var x in this.options) {
mergedOptions[x] = this.options[x]
@@ -85,23 +94,46 @@ Task.prototype.fork = function(filename, args, options) {
mergedOptions.env = envCopy
- var worker = fork(filename, args, mergedOptions)
- this.workers.push(worker)
- this.master.workers.push(worker)
+ // Just need to set this - child process won't actually use the fd.
+ mergedOptions.env.NODE_CHANNEL_FD = 42;
+
+ // Leave stdin open for the IPC channel. stdout and stderr should be the
+ // same as the parent's.
+// mergedOptions.customFds = [ -1, 1, 2 ];
+
+ // stdin is the IPC channel.
+ mergedOptions.stdinStream = cputil.createPipe(true);
+
+ var worker = spawn(process.execPath, args, mergedOptions)
+
+ cputil.setupChannel(worker, mergedOptions.stdinStream);
- worker.on('message', function(message) {
+ this.workers[id] = worker
+ this.master.workers[id] = worker
+
+ worker.on('message', function(message, handle) {
self.master.onWorkerMessage(worker, self, message)
})
worker.on('exit', function() {
- console.log('worker id=' + id + ' died')
delete self.workers[id]
delete self.master.workers[id]
self.master.emit('death', worker, self)
})
- this.master.emit('online', worker, self)
+ worker.send({ message: 'fork' });
+
+ worker.started = true
+ this.master.emit('worker:started', worker, self)
this.initialized = true
return worker
}
+
+Task.prototype.kill = function () {
+ var self = this
+ this.master.eachWorker(this.name, function(worker) {
+ console.log('Task '+ self.name + ': kill worker ' + worker.pid)
+ worker.kill()
+ })
+}
View
37 lib/worker.js
@@ -0,0 +1,37 @@
+/*!
+ * mixture
+ * Copyright(c) 2011-2012 Daniel D. Shaw <dshaw@dshaw.com>
+ * MIT Licensed
+ */
+
+/**
+ * Exports.
+ */
+
+var send = exports.send = process.send || noop
+var taskName = exports.taskName = process.env.MIXTURE_TASK_NAME
+var workerId = exports.workerId = parseInt(process.env.MIXTURE_WORKER_ID)
+
+/**
+ * Noop.
+ */
+
+function noop () {}
+
+/**
+ * Master Message handler.
+ */
+
+function onMasterMessage (m) { console.log('worker message', arguments) }
+
+/**
+ * Receive messages from Mix Master.
+ */
+
+process.on('message', onMasterMessage)
+
+/**
+ * Notify Mix Master.
+ */
+
+send({ 'worker:online' : workerId, task: taskName, pid: process.pid })
View
47 lib/mixture.js → mixture.js
@@ -1,6 +1,6 @@
/*!
* mixture
- * Copyright(c) 2011 Daniel D. Shaw <dshaw@dshaw.com>
+ * Copyright(c) 2011-2012 Daniel D. Shaw <dshaw@dshaw.com>
* MIT Licensed
*/
@@ -8,37 +8,43 @@
* Module dependencies.
*/
-var assert = require('assert')
- , util = require('util')
+var util = require('util')
, EventEmitter = require('events').EventEmitter
- , Task = require('./task')
+ , Task = require('./lib/task')
+ , Worker = require('./lib/worker')
+ , debug = function () {}
/**
- * Exports
+ * Exports.
*/
exports.mix = Mix
exports.Task = Task
+exports.worker = Worker
/**
- * Mix Master
+ * Mix
+ *
+ * @param options
+ * @return {Mix}
+ * @constructor
*/
-function Mix (name) {
- if (!(this instanceof Mix)) return new Mix(name)
+function Mix (options) {
+ if (!(this instanceof Mix)) return new Mix(options)
+
+ options || (options = {})
- this.name = name
+ this.name = (typeof options === 'string') ? options : options.name || 'mix'
this.ids = 0
this.tasks = {}
this.workers = []
+ if (options.debug) debug = console.log
+
this.init()
}
-/**
- * Inherit EventEmitter
- */
-
util.inherits(Mix, EventEmitter)
/**
@@ -55,13 +61,18 @@ Mix.prototype.init = function () {
})
})
+ this.on('worker:started', function(worker, task) {
+ if (!worker || !task) return console.log('incomplete online notification')
+ debug(self.name + ': Task ', task.name, ' worker ', worker.pid, 'worker:started')
+ })
+
this.on('death', function(worker, task) {
- console.log('Task ' + task.name + ' worker ' + worker.pid + ' died')
+ debug(self.name + ': Task ', task.name, ' worker ', worker.pid, 'died')
})
process.on('uncaughtException', function(e) {
self.eachWorker(function(worker) {
- console.log('kill worker ' + worker.pid)
+ debug('kill worker ' + worker.pid)
worker.kill()
})
@@ -120,6 +131,8 @@ Mix.prototype.eachWorker = function (task, cb) {
* @param message
*/
-Mix.prototype.onWorkerMessage = function (task, worker, message) {
- console.log(task.name, worker.id, message)
+Mix.prototype.onWorkerMessage = function (worker, task, message) {
+ task || (task = {})
+ worker || (worker = {})
+ console.log(this.name + ': Task ', task.name, ' worker ', worker.pid, message)
}
View
4 package.json
@@ -1,6 +1,6 @@
{
"name": "mixture",
- "version": "0.1.1",
+ "version": "0.2.0-pre",
"description": "Heterogeneous cluster task manager",
"keywords": [
"cluster",
@@ -17,7 +17,7 @@
"url": "http://github.com/dshaw/mixture/issues"
},
"license": "MIT",
- "main": "index",
+ "main": "mixture",
"scripts": {
"test": "tap test/*.js"
},
View
2  test/fixtures/simple.js
@@ -1 +1 @@
-console.log('yes, it\'s really that simple')
+//console.log('yes, it\'s really that simple')
View
47 test/mixture.test.js
@@ -1,9 +1,10 @@
-var mixture = require('..')
+var net = require('net')
+ , tap = require('tap')
+ , test = tap.test
+ , mixture = require('..')
, Mix = mixture.mix
, mix = Mix()
, Task = mixture.Task
- , tap = require('tap')
- , test = tap.test
console.log(mix, Task, mix instanceof Mix)
@@ -11,12 +12,11 @@ test('mixture exports', function (t) {
t.ok(Mix, 'exports Mix')
t.ok(Task, 'exports Task')
- var version = require('../package.json').version
- t.equal(mixture.version, version, 'exports version')
-
t.isa(mix, Mix, 'mix is an instance of Mix')
t.ok(mix.tasks, 'initialized tasks hash')
t.ok(mix.workers, 'initialized workers list')
+ t.equal(mix.name, 'mix', 'default mix name')
+ t.equal(mixture.mix('dshaw').name, 'dshaw', 'assigned mix name')
var master = new Mix()
var task = master.task('name')
@@ -25,17 +25,38 @@ test('mixture exports', function (t) {
t.ok(task.master, 'task has a reference to the master')
t.isa(task.master, Mix, 'task master is an instance of Master')
- master.on('death', function() {
+ master.on('death', function(worker, task) {
console.log('I died', arguments)
+ t.equal(arguments.length, 2, 'death event has 2 arguments')
+// t.ok(worker, 'worker is defined')
+// t.ok(worker.pid, 'worker.pid is defined')
+// t.ok(task, 'task is defined')
})
- var worker1 = task.fork('./test/fixtures/simple')
- t.ok(worker1, 'worker is defined')
- t.equal(task.workers.length, 1, 'task now has one worker')
+ var worker = task.fork('./test/fixtures/simple')
+ worker.kill()
+
+ t.end()
+})
+
+
+test('master child communications', function (t) {
+ t.plan(1)
+
+ var master = mixture.mix()
+ var task = master.task('name')
+ var worker = task.fork('./test/fixtures/simple')
+
+ master.on('message', function (m) {
+ t.ok(m, 'message ok')
+ t.equal('message', m, 'master received message from child')
+ })
- // fork a new instance of the task
- var worker2 = task.fork()
- t.equal(task.workers.length, 2, 'task now has two workers')
+ var server = net.createServer();
+ server.listen(1337, function() {
+ worker.send('message', server._handle);
+ });
+ worker.send('message')
t.end()
})
View
33 test/task.test.js
@@ -0,0 +1,33 @@
+var mixture = require('..')
+ , Mix = mixture.mix
+ , Task = require('../lib/task')
+ , tap = require('tap')
+ , test = tap.test
+
+test('task test', function (t) {
+ t.ok(Task, 'exports Task')
+
+ var mix = mixture.mix()
+ , task = mix.task('name')
+
+ t.isa(mix.task, 'function', 'task method defined')
+ t.isa(task, Task, 'task returns a task')
+ t.ok(task.master, 'task has a reference to the master')
+ t.isa(task.master, Mix, 'task master is an instance of Mix')
+
+ var worker1 = task.fork('./test/fixtures/simple')
+
+ t.ok(worker1, 'worker is defined')
+ t.equal(task.workers.length, 1, 'task now has 1 worker')
+
+ for (var i = 0; i < 4; i++) task.fork()
+ t.equal(task.workers.length, 5, 'task now has 5 workers')
+
+ task.kill()
+ t.equal(task.workers.length, 5, 'task now has 5 workers')
+
+ task.fork()
+ t.equal(task.workers.length, 1, 'task restored. has 1 worker')
+
+ t.end()
+})

No commit comments for this range

Something went wrong with that request. Please try again.