Permalink
Browse files

Reimplemented process-rotation.

  • Loading branch information...
1 parent 2de4db2 commit 47d1d09eb6b44c68ef29b168273a5607e7ff374d @mde mde committed Jan 26, 2012
Showing with 57 additions and 17 deletions.
  1. +5 −4 lib/cluster/hack_master.js
  2. +35 −11 lib/cluster/master.js
  3. +5 −1 lib/cluster/master_dispatch.js
  4. +9 −1 lib/cluster/worker.js
  5. +3 −0 lib/cluster/worker_dispatch.js
@@ -20,8 +20,9 @@ Master.prototype = geddy.mixin({}, MasterBase.prototype);
// Overrides
geddy.mixin(Master.prototype, new (function () {
- this.createWorker = function () {
+ this.createWorker = function (dt) {
var fds
+ , retireAt = dt || (new Date()).getTime() + this.config.rotationWindow
, w
, data
, id;
@@ -50,7 +51,7 @@ geddy.mixin(Master.prototype, new (function () {
}
id = w.pid.toString();
- data = new WorkerData(id, w);
+ data = new WorkerData(id, w, retireAt);
data._fd = this._fd;
this.workers.addItem(id, data);
@@ -81,7 +82,7 @@ geddy.mixin(Master.prototype, new (function () {
});
worker.addListener('exit', function (code) {
- self.fixBrokenWorker(id);
+ self.handleWorkerExit(id);
});
};
@@ -95,7 +96,7 @@ geddy.mixin(Master.prototype, new (function () {
worker.stdin.write(output + '\n', 'ascii', this._fd);
}
catch (err) {
- this.fixBrokenWorker(workerId);
+ this.killWorker(workerId);
}
};
View
@@ -22,7 +22,7 @@ Master = function () {
// Clustering-only
cluster.addListener('death', function (worker) {
var id = worker.pid.toString();
- self.fixBrokenWorker(id);
+ self.handleWorkerExit(id);
});
};
@@ -44,6 +44,11 @@ Master.prototype = new (function () {
this.processMode = processModes.WATCH_FILES;
}
+ // Obvious, don't rotate with only one worker
+ if (this.config.workers < 2) {
+ this.config.rotateWorkers = false;
+ }
+
// App configs
appBaseConfig = require(dir + '/config/environment');
appEnvConfig = require(dir + '/config/' + this.config.environment);
@@ -135,9 +140,19 @@ Master.prototype = new (function () {
, heartbeatWindow = this.config.heartbeatWindow
, killWorker;
- workers.each(function (data) {
+ workers.each(function (data, id) {
var worker = data.worker;
+ // If this worker is past its freshness date, retire it
+ if (self.config.rotateWorkers && (now > data.retireAt)) {
+ self.stdoutLog.info("Rotating " +
+ worker.pid + ", killing process.");
+ data.retired = true;
+ self.sendMessage(id, {
+ method: 'retire'
+ });
+ }
+
// Kill if process hasn't called back in a while
if ((now - worker.heartbeatAt) > heartbeatWindow) {
killWorker = true;
@@ -228,24 +243,29 @@ Master.prototype = new (function () {
var configCount = this.config.workers
, currentCount = this.workers.count
, needed = configCount - currentCount
+ , rotationWindow = this.config.rotationWindow
+ , staggerInterval = rotationWindow / needed
+ , retirement = (new Date()).getTime() + rotationWindow
, msg;
if (needed) {
msg = 'Creating ' + needed + ' worker process';
msg += needed > 1 ? 'es.' : '.';
this.stdoutLog.info(msg);
while (currentCount < configCount) {
currentCount++;
- this.createWorker();
+ this.createWorker(retirement);
+ retirement -= staggerInterval;
}
}
};
- this.createWorker = function () {
+ this.createWorker = function (dt) {
var self = this
+ , retireAt = dt || (new Date()).getTime() + this.config.rotationWindow
, w = cluster.fork()
, id = w.pid.toString()
, data = new WorkerData(id, w);
- this.workers.addItem(id, data);
+ this.workers.addItem(id, data, retireAt);
this.addWorkerListeners(w);
this.sendMessage(id, {
method: 'config'
@@ -267,7 +287,7 @@ Master.prototype = new (function () {
worker.send(msg);
}
catch (err) {
- this.fixBrokenWorker(workerId);
+ this.killWorker(workerId);
}
};
@@ -287,12 +307,15 @@ Master.prototype = new (function () {
process.kill(pid);
}
catch(e) {}
- this.workers.removeItem(pid.toString());
};
- this.fixBrokenWorker = function (id) {
- this.stderrLog.error("Worker " + id + " died.");
- this.killWorker(id);
+ this.handleWorkerExit = function (pid) {
+ var id = pid.toString()
+ , retired = this.workers.getItem(id).retired;
+ if (!retired) {
+ this.stderrLog.error("Worker " + id + " died.");
+ }
+ this.workers.removeItem(id);
};
this.startShutdown = function () {
@@ -318,12 +341,13 @@ Master.prototype = new (function () {
})();
-WorkerData = function (id, worker) {
+WorkerData = function (id, worker, retireAt) {
this.id = id;
this.heartbeatAt = (new Date()).getTime();
this.process = null;
this.pid = null;
this.worker = worker;
+ this.retireAt = retireAt;
};
module.exports.Master = Master;
@@ -1,6 +1,10 @@
var dispatch = {
- readyForShutdown: function (msg) {
+ readyForRetirement: function (msg) {
+ this.killWorker(msg.workerId);
+ }
+
+, readyForShutdown: function (msg) {
this.killWorker(msg.workerId);
this.checkShutdown();
}
View
@@ -63,7 +63,15 @@ Worker.prototype = new (function () {
}
};
+ this.retire = function (msg) {
+ this.shutDownCleanly('Retirement');
+ };
+
this.shutdown = function (msg) {
+ this.shutDownCleanly('Shutdown');
+ };
+
+ this.shutDownCleanly = function (type) {
var self = this
, startTime = (new Date()).getTime()
, ready = false;
@@ -91,7 +99,7 @@ Worker.prototype = new (function () {
if (ready) {
self.sendMessage({
workerId: process.pid
- , method: 'readyForShutdown'
+ , method: 'readyFor' + type
});
}
}, 2000);
@@ -5,6 +5,9 @@ var dispatch = {
, shutdown: function (msg) {
this.shutdown(msg.data);
}
+, retire: function (msg) {
+ this.retire(msg.data);
+ }
};
module.exports = dispatch;

0 comments on commit 47d1d09

Please sign in to comment.