Skip to content

Commit

Permalink
Fix worker/master communication with namespacing
Browse files Browse the repository at this point in the history
  • Loading branch information
baudehlo committed Jul 2, 2013
1 parent 17a4dbe commit edcf086
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 18 deletions.
15 changes: 13 additions & 2 deletions outbound.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ exports.scan_queue_pids = function (cb) {
}

process.on('message', function (msg) {
if (msg.event && msg.event === 'load_pid_queue') {
if (msg.event && msg.event === 'outbound.load_pid_queue') {
exports.load_pid_queue(msg.data);
}
else if (msg.event && msg.event === 'flush_queue') {
else if (msg.event && msg.event === 'outbound.flush_queue') {
exports.flush_queue();
}
// otherwise ignore the message
Expand Down Expand Up @@ -193,7 +193,18 @@ exports.load_queue_files = function (pid, cb_name, files) {
cb();
});
}
else if (/^\./.test(file)) {
// dot-file...
self.logwarn("Removing left over dot-file: " + file);
return fs.unlink(queue_dir + "/" + file, function (err) {
if (err) {
self.logerror("Error removing dot-file: " + file + ": " + err);
}
cb();
});
}
else {
// Do this because otherwise we blow the stack
async.setImmediate(cb);
}
}, function (err) {
Expand Down
20 changes: 7 additions & 13 deletions plugins/process_title.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ exports.hook_init_master = function (next, server) {
var cluster = server.cluster;
var recvMsg = function (msg) {
switch (msg) {
case 'connect':
case 'process_title.connect':
server.notes.pt_connections++;
server.notes.pt_concurrent++;
break;
case 'disconnect':
case 'process_title.disconnect':
server.notes.pt_concurrent--;
break;
case 'message':
case 'process_title.message':
server.notes.pt_messages++;
break;
default:
Expand Down Expand Up @@ -57,11 +57,9 @@ exports.hook_init_child = function (next, server) {
exports.hook_lookup_rdns = function (next, connection) {
var server = connection.server;
connection.notes.pt_connect_run = true;
var title = 'Haraka';
if (server.cluster) {
title = 'Haraka (worker)';
var worker = server.cluster.worker;
worker.send('connect');
worker.send('process_title.connect');
}
server.notes.pt_connections++;
server.notes.pt_concurrent++;
Expand All @@ -76,28 +74,24 @@ exports.hook_disconnect = function (next, connection) {
// will exhibit this behaviour.
if (!connection.notes.pt_connect_run) {
if (server.cluster) {
server.cluster.worker.send('connect');
server.cluster.worker.send('process_title.connect');
}
server.notes.pt_connections++;
server.notes.pt_concurrent++;
}
var title = 'Haraka';
if (server.cluster) {
title = 'Haraka (worker)';
var worker = server.cluster.worker;
worker.send('disconnect');
worker.send('process_title.disconnect');
}
server.notes.pt_concurrent--;
return next();
}

exports.hook_data = function (next, connection) {
var server = connection.server;
var title = 'Haraka';
if (server.cluster) {
title = 'Haraka (worker)';
var worker = server.cluster.worker;
worker.send('message');
worker.send('process_title.message');
}
server.notes.pt_messages++;
return next();
Expand Down
6 changes: 3 additions & 3 deletions server.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ Server.daemonize = function (config_data) {
Server.flushQueue = function () {
if (Server.cluster) {
for (var id in cluster.workers) {
cluster.workers[id].send({event: 'flush_queue'});
cluster.workers[id].send({event: 'outbound.flush_queue'});
}
}
else {
Expand Down Expand Up @@ -128,7 +128,7 @@ Server.createServer = function (params) {
new_workers.push(cluster.fork({ CLUSTER_MASTER_PID: process.pid }));
}
for (var i=0; i<pids.length; i++) {
new_workers[i % new_workers.length].send({event: 'load_pid_queue', data: pids[i]});
new_workers[i % new_workers.length].send({event: 'outbound.load_pid_queue', data: pids[i]});
}
cluster.on('online', function (worker) {
logger.lognotice('worker ' + worker.id + ' started pid=' + worker.process.pid);
Expand All @@ -146,7 +146,7 @@ Server.createServer = function (params) {
if (signal || code !== 0) {
// Restart worker
var new_worker = cluster.fork({ CLUSTER_MASTER_PID: process.pid });
new_worker.send({event: 'load_pid_queue', data: worker.process.pid});
new_worker.send({event: 'outbound.load_pid_queue', data: worker.process.pid});
}
});
plugins.run_hooks('init_master', Server);
Expand Down

0 comments on commit edcf086

Please sign in to comment.