Skip to content

Commit

Permalink
cluster: introduce distribute option
Browse files Browse the repository at this point in the history
`distribute` is a user-specified callback for asynchronous balancing of
incoming connections to cluster workers.

fix nodejs#6001
  • Loading branch information
indutny committed Aug 6, 2013
1 parent b8a7eed commit 1507655
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 3 deletions.
6 changes: 6 additions & 0 deletions doc/api/cluster.markdown
Expand Up @@ -126,6 +126,12 @@ values are `"rr"` and `"none"`.
(Default=`process.argv.slice(2)`)
* `silent` {Boolean} whether or not to send output to parent's stdio.
(Default=`false`)
* `distribute` {Function} user-specified server connection distribution
function. Invoked with only one argument - `callback`, which it should
invoke in a following way: `callback(err, worker)`, where the `worker`
argument is an instance of `Worker` class and specifies to which worker
a request should be scheduled.
NOTE: Experimental. Works only with round-robin scheduling mode.

All settings set by the `.setupMaster` is stored in this settings object.
This object is not supposed to be changed or set manually, by you.
Expand Down
22 changes: 19 additions & 3 deletions lib/cluster.js
Expand Up @@ -99,6 +99,7 @@ function RoundRobinHandle(key, address, port, addressType, backlog, fd) {
this.handles = [];
this.handle = null;
this.server = net.createServer(assert.fail);
this.userDistribute = cluster.settings.distribute;

if (fd >= 0)
this.server.listen({ fd: fd });
Expand Down Expand Up @@ -159,9 +160,23 @@ RoundRobinHandle.prototype.remove = function(worker) {
};

RoundRobinHandle.prototype.distribute = function(err, handle) {
this.handles.push(handle);
var worker = this.free.shift();
if (worker) this.handoff(worker);
if (this.userDistribute) {
var self = this;
var once = false;
this.userDistribute(function(err, worker) {
if (once)
return;
once = true;
self.handles.push(handle);
if (!err && worker)
self.handoff(worker);
});
} else {
this.handles.push(handle);
var worker = this.free.shift();
if (worker)
this.handoff(worker);
}
};

RoundRobinHandle.prototype.handoff = function(worker) {
Expand Down Expand Up @@ -218,6 +233,7 @@ function masterInit() {
args: process.argv.slice(2),
exec: process.argv[1],
execArgv: process.execArgv,
distribute: null,
silent: false
};
cluster.settings = settings;
Expand Down
92 changes: 92 additions & 0 deletions test/simple/test-cluster-distribute.js
@@ -0,0 +1,92 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.

var common = require('../common');
var assert = require('assert');
var cluster = require('cluster');
var http = require('http');

var workerCount = 3;
var requestsPerWorker = 10;

if (cluster.isMaster) {
var distributed = 0;
var hitCount = 0;

var i = 0;
cluster.setupMaster({
distribute: function(callback) {
setTimeout(function() {
// Hand-off handles to the first two workers
var i = (i + 1) % (workerCount - 1);
distributed++;
callback(null, workers[i]);
}, 5);
}
});

var hits = {};
var workers = [];

function fork() {
var worker = cluster.fork();
workers.push(worker);
worker.on('message', function(msg) {
if (msg === 'shoot') {
http.get({ port: common.PORT, path: '/' }, function(res) {
res.resume();
res.on('end', function(err) {
if (err) throw err;
});
});
} else if (msg === 'hit') {
// Count hits
if (!hits[worker.id])
hits[worker.id] = 1;
else
hits[worker.id]++;
if (++hitCount === requestsPerWorker * workerCount)
process.exit();
}
});
}

for (var i = 0; i < workerCount; i++)
fork();

process.on('exit', function() {
assert.equal(hitCount, requestsPerWorker * workerCount);
assert.equal(distributed, requestsPerWorker * workerCount);

// One worker should receive no requests at all
assert.equal(Object.keys(hits).length, workerCount - 1);
});

return;
}

http.createServer(function(req, res) {
process.send('hit');
res.end('OK');
}).listen(common.PORT, function() {
for (var i = 0; i < requestsPerWorker; i++)
process.send('shoot');
});

0 comments on commit 1507655

Please sign in to comment.