Permalink
Browse files

Clustered server events, programmatic shutdown

  • Loading branch information...
1 parent 03111d0 commit 092afe196ac00338ce609660db092a68fdf5966a @mde mde committed Jan 24, 2014
Showing with 64 additions and 26 deletions.
  1. +4 −1 lib/cluster/master.js
  2. +10 −0 lib/cluster/master_dispatch.js
  3. +14 −7 lib/cluster/worker.js
  4. +36 −18 lib/geddy.js
View
@@ -1,5 +1,6 @@
var Master
, cluster = require('cluster')
+ , events = require('events')
, fs = require('fs')
, path = require('path')
, Log = require('../../deps/log')
@@ -371,7 +372,7 @@ Master.prototype = new (function () {
this.startShutdown = function () {
var self = this;
this.processMode = processModes.SHUTDOWN;
- this.stdoutLog.notice('Graceful shutdown from SIGTERM...');
+ this.stdoutLog.notice('Stopping cluster...');
this.workers.each(function (data) {
self.sendMessage(data.id, {
method: 'shutdown'
@@ -391,6 +392,8 @@ Master.prototype = new (function () {
})();
+utils.enhance(Master.prototype, new events.EventEmitter());
+
WorkerData = function (id, worker, retireAt) {
this.id = id;
this.heartbeatAt = (new Date()).getTime();
@@ -42,6 +42,16 @@ var dispatch = {
workerData.heartbeatAt = (new Date()).getTime();
}
+, workerListening: function (msg) {
+ if (typeof this._listeningWorkers == 'undefined') {
+ this._listeningWorkers = 0;
+ }
+ this._listeningWorkers++;
+ if (this._listeningWorkers == this.config.workers) {
+ this.emit('started');
+ }
+ }
+
};
module.exports = dispatch;
View
@@ -97,6 +97,10 @@ Worker.prototype = new (function () {
cb = function () {
log.info(msg);
callback();
+ self.sendMessage({
+ workerId: process.pid
+ , method: 'workerListening'
+ });
};
// server.listen(port, [hostname], [backlog], [callback])
@@ -121,9 +125,6 @@ Worker.prototype = new (function () {
if (typeof process.send == 'function') {
process.send(msg);
}
- else {
- console.dir(msg);
- }
};
this.receiveMessage = function (msg) {
@@ -145,10 +146,16 @@ Worker.prototype = new (function () {
this.shutdown = function (msg) {
var self = this;
this.shutDownCleanly(this.gracefulShutdownTimeout, function () {
- self.sendMessage({
- workerId: process.pid
- , method: 'readyForShutdown'
- });
+ if (typeof process.send == 'function') {
+ self.sendMessage({
+ workerId: process.pid
+ , method: 'readyForShutdown'
+ });
+ }
+ else {
+ self.log.notice('Server worker shutting down.');
+ process.exit();
+ }
});
};
View
@@ -12,7 +12,9 @@ utils.enhance(geddy, new EventEmitter());
utils.mixin(geddy, {version: pkg.version});
utils.mixin(geddy, utils);
utils.mixin(geddy, new (function () {
- var _started = false;
+ var _started = false
+ , _master
+ , _worker
this.start = function (options) {
var opts = options || {}
@@ -26,31 +28,29 @@ utils.mixin(geddy, new (function () {
App = require('./app').App;
worker = require('../lib/cluster/worker');
- w = new worker.Worker();
- geddy.worker = w;
+ _worker = new worker.Worker();
+ geddy.worker = _worker;
- w.init({clustered: false, logger: c.logger || utils.log}, function () {
- utils.mixin(geddy, w);
+ _worker.init({clustered: false, logger: c.logger || utils.log}, function () {
+ utils.mixin(geddy, _worker);
app = new App();
- app.init(w.config, function () {
+ app.init(_worker.config, function () {
geddy.emit('initialized');
utils.mixin(geddy, app);
- w.startServer(function () {
+ _worker.startServer(function () {
geddy.emit('started');
});
});
});
- w.configure(c);
+ _worker.configure(c);
};
this.startCluster = function (options) {
var opts = options || {}
, cluster = require('cluster')
, master
, worker
- , m
- , w
, App
, app;
@@ -60,36 +60,54 @@ utils.mixin(geddy, new (function () {
}
geddy.isMaster = cluster.isMaster;
+ geddy.isWorker = cluster.isWorker;
// Master-process, start workers
if (geddy.isMaster) {
master = require('../lib/cluster/master');
- m = new master.Master();
- m.start(opts);
+ _master = new master.Master();
+ _master.on('started', function () {
+ geddy.emit('clusterStarted');
+ });
+ _master.start(opts);
}
// Worker-process, start up an app
else {
App = require('./app').App;
worker = require('../lib/cluster/worker');
- w = new worker.Worker();
- geddy.worker = w;
+ _worker = new worker.Worker();
+ geddy.worker = _worker;
- w.init({clustered: true}, function () {
- utils.mixin(geddy, w);
+ _worker.init({clustered: true}, function () {
+ utils.mixin(geddy, _worker);
app = new App();
- app.init(w.config, function () {
+ app.init(_worker.config, function () {
geddy.emit('initialized');
utils.mixin(geddy, app);
- w.startServer(function () {
+ _worker.startServer(function () {
geddy.emit('started');
});
});
});
}
};
+ this.stop = function () {
+ if (geddy.isMaster || geddy.isWorker) {
+ throw new Error('`stop` should only be called in an unclustered server.');
+ }
+ _worker.shutdown();
+ };
+
+ this.stopCluster = function () {
+ if (!geddy.isMaster) {
+ throw new Error('`stopCluster` should only be called in the master process of a cluster.');
+ }
+ _master.startShutdown();
+ };
+
})());
// Also allow export/local

0 comments on commit 092afe1

Please sign in to comment.