Permalink
Browse files

Mixture feature complete.

  • Loading branch information...
1 parent c994fbb commit 4f702ed2efd87ed798611f9e8c81312e8de9a738 @dshaw committed Nov 8, 2011
@@ -0,0 +1,9 @@
+# Stock Quote Stream powered by socket.io-announce, managed by mixture
+
+### Setup
+Runs on Node.js v0.5.9+.
+
+* `npm install`
+
+### Managed mixed cluster
+* `node . [n]` - manage all processes, creating a data stream and n socket.io apps. defaults to 4.
@@ -0,0 +1,37 @@
+var path = require('path')
+ , connect = require('connect')
+ , sio = require('socket.io')
+ , RedisStore = sio.RedisStore
+
+var port = process.argv[2] || 8880
+ , id = process.argv[3] || 0
+ , delay = process.argv[4] || 800
+ , app = connect.createServer(connect.static(path.join(__dirname, './')))
+ , io = sio.listen(app);
+
+io.configure(function () {
+ io.set('store', new RedisStore({ nodeId: function () { return id } }))
+})
+
+io.sockets.on('connection', function (socket) {
+ socket.emit('nodeId', id);
+
+ socket.on('purchase', function (data, fn) {
+ data.timestamp = Date.now();
+
+ setTimeout(function () { // without a delay the transition between purchase and confirm is imperceptible
+ socket.emit('confirm', data);
+ socket.broadcast.emit('activity', data);
+ }, delay)
+ });
+
+ socket.on('restart', function (data) {
+ io.sockets.in('').emit('restart')
+ })
+})
+
+app.listen(port)
+
+app.on('listening', function () {
+ console.log('listening on :', app.address())
+})
@@ -0,0 +1,24 @@
+/**
+ * Announce doesn't bind a port.
+ * Announce also doesn't have to worry about where it's sending the data.
+ * It simply broadcasts to the Redis pub/sub backbone.
+ */
+
+var announce = require('socket.io-announce').createClient()
+
+/* fake data stream */
+var symbols = 'THOO GOOF EXIT BOP SDD ALPP RIGM OPPL HPBG'.split(' ')
+
+function dataStream () {
+ var n = Math.round(Math.random()*5)
+ , data = {
+ id: (Math.abs(Math.random() * Math.random() * Date.now() | 0))
+ , symbol: symbols[n]
+ , price: (Math.random()*1000).toFixed(2)
+ , n: n
+ }
+ announce.emit('quote', data)
+}
+
+dataStream()
+setInterval(dataStream, 800)
@@ -0,0 +1,88 @@
+<!DOCTYPE html>
+<html>
+<head>
+ <meta charset="utf-8">
+ <style>
+ body { margin: 0 }
+ #nodeId {
+ width: 100%;
+ height: 100%;
+ background-color: white;
+ color: black;
+ font-size: 12em;
+ text-align: center;
+ text-shadow: 4px 4px 1px #e5e5e5;
+ -webkit-transform: scale(1);
+ }
+ #ticker {
+ position: absolute;
+ bottom: 0;
+ width: 100%;
+ padding: 10px;
+ overflow: hidden;
+ white-space: nowrap;
+ text-align: right;
+ direction: rtl;
+ background-color: rgba(0,0,0,.5);
+ color: white;
+ text-shadow: 1px 1px 1px #e5e5e5;
+ height: 1.2em;
+ }
+ #ticker span { padding: 10px }
+ #ticker .reserved { background-color: rgba(255,255,0,.5) }
+ #ticker .purchased { background-color: rgba(0,255,0,.5) }
+ #ticker .activity { background-color: rgba(255,0,0,.5) }
+ </style>
+</head>
+<body>
+ <div id=nodeId></div>
+ <footer id=ticker></footer>
+
+ <script src="/js/jquery-1.6.4.min.js"></script>
+ <script src="/socket.io/socket.io.js"></script>
+ <script>
+ var socket = io.connect()
+ , nodeId;
+
+ socket.on('nodeId', function (data) {
+ nodeId = data;
+ document.title = '[ '+nodeId+' ] - @dshaw demo';
+ $('#nodeId').text(nodeId);
+ });
+
+ socket.on('quote', function (data) {
+ $('<span/>', { id: data.id, html: data.symbol + ' : ' + data.price }).appendTo('#ticker');
+
+ // if the last digit of the prices matches the id, buy.
+ var m = ((data.price).match(/[-+]?([0-9]*([0-9]))?\.[0-9]+/))
+ , watch = m && m[2];
+
+ if (nodeId && watch == nodeId) {
+ $('#'+data.id).addClass('reserved');
+ data.buyer = nodeId;
+ data.quantity = Math.round(Math.random()*5)*5;
+ socket.emit('purchase', data);
+ }
+ });
+
+ socket.on('confirm', function (data) {
+ $('#'+data.id)
+ .attr('title','Purchased '+(new Date(data.timestamp)))
+ .removeClass('reserved')
+ .addClass('purchased');
+ });
+
+ socket.on('activity', function (data) {
+ $('#'+data.id)
+ .attr('title','Purchased '+(new Date(data.timestamp))+' on Node Id '+data.nodeId)
+ .addClass('activity');
+ });
+
+ socket.on('restart', function () {
+ location.reload(true);
+ });
+
+ $('#nodeId').live('dblclick', function() { socket.emit('restart') });
+ </script>
+</body>
+</html>
@@ -0,0 +1,18 @@
+var mix = require('../../').mix()
+
+var count = process.argv[2] || 4 // || maxes out locally at ~82
+ , ioPort = 8880
+ , nodeId = 0
+
+// announce data server
+mix.task('announce').fork('data.js');
+
+// socket.io instances
+var socketio = mix.task('socket.io', { filename: 'app.js' })
+
+for (var i = 0; i < count; i++) {
+ ioPort++;
+ nodeId++;
+
+ var worker = socketio.fork({ args: [ioPort, nodeId] })
+}
Oops, something went wrong.
@@ -0,0 +1,14 @@
+{
+ "name": "stock-quotes-example"
+ , "version": "0.1.1"
+ , "description": "Socket.io-announce Sample App: Stock Quote Stream"
+ , "author": "Daniel D. Shaw <dshaw@dshaw.com> (http://dshaw.com)"
+ , "keywords": ["socket.io", "socket.io-announce", "redisstore", "redis"]
+ , "main": "index"
+ , "dependencies": {
+ "connect": ">= 1.7.2"
+ , "socket.io": ">= 0.8.5"
+ , "socket.io-announce": ">= 0.1.0"
+ }
+ , "engines": { "node": ">= 0.5.9" }
+}
View
@@ -8,10 +8,10 @@
* Mixture
*/
-exports = module.exports = require('./lib/mixture.js');
+exports = module.exports = require('./lib/mixture.js')
/**
* Version
*/
-exports.version = '0.0.0';
+exports.version = require('./package.json').version
View
@@ -17,7 +17,9 @@ var assert = require('assert')
* Exports
*/
-exports = module.exports = new Master();
+exports.mix = function () { return new Master() }
+exports.Master = Master
+exports.Task = Task
/**
* Mix Master
@@ -27,13 +29,47 @@ function Master () {
this.ids = 0
this.tasks = {}
this.workers = []
+
+ this.init()
}
/**
* Inherit EventEmitter
*/
-util.inherit(Master, EventEmitter)
+util.inherits(Master, EventEmitter)
+
+/**
+ * Initialize Master
+ */
+
+Master.prototype.init = function () {
+ var self = this
+
+ this.on('exit', function(e) {
+ self.eachWorker(function(worker) {
+ console.log('kill worker ' + worker.pid)
+ worker.kill()
+ });
+ })
+
+ this.on('death', function(task, worker) {
+ console.log('kill worker ' + worker.pid + 'on task ' + task.name)
+ })
+
+ process.on('uncaughtException', function(e) {
+ // Quickly try to kill all the workers.
+ // TODO: be session leader - will cause auto SIGHUP to the children.
+ self.eachWorker(function(worker) {
+ console.log('kill worker ' + worker.pid);
+ worker.kill();
+ });
+
+ console.error('Exception in cluster master process: ' + e.message + '\n' + e.stack);
+ console.error("Please report this bug.");
+ process.exit(1);
+ });
+}
/**
* Task mix
@@ -43,19 +79,47 @@ util.inherit(Master, EventEmitter)
* @api public
*/
-Master.prototype.mix = function (name) {
+Master.prototype.task = function (name, options) {
if (!this.tasks[name]) {
- this.tasks[name] = new Task({ name: name })
+ options || (options = {})
+ options.master = this
+ options.name = name
+ this.tasks[name] = new Task(options)
}
return this.tasks[name]
}
-var debug;
-if (process.env.NODE_DEBUG && /cluster/.test(process.env.NODE_DEBUG)) {
- debug = function(x) {
- var prefix = process.pid + ',' + (process.env.NODE_WORKER_ID ? 'Worker' : 'Master');
- console.error(prefix, x);
- };
-} else {
- debug = function() { };
+/**
+ * Send message to all
+ *
+ * @param task (optional)
+ * @param cb
+ */
+
+Master.prototype.eachWorker = function (task, cb) {
+ var _workers = workers;
+
+ if (arguments.length < 2) {
+ cb = task;
+ } else {
+ _workers = tasks[task].workers
+ }
+
+ for (var id in _workers) {
+ if (_workers[id]) {
+ cb(_workers[id]);
+ }
+ }
}
+
+/**
+ * Handle Worker Message
+ *
+ * @param task
+ * @param worker
+ * @param message
+ */
+
+Master.prototype.onWorkerMessage = function (task, worker, message) {
+ console.log(task.name, worker.id, message)
+}
Oops, something went wrong.

0 comments on commit 4f702ed

Please sign in to comment.