Skip to content

Commit

Permalink
clean up lib/daemon.js
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewrk committed Apr 25, 2013
1 parent 8f9c06d commit 859bdab
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 215 deletions.
175 changes: 90 additions & 85 deletions lib/daemon.js
@@ -1,145 +1,153 @@
var fs, mkdirp, path, spawn, net, assert, async, json_socket, createLog, argv, worker_count, socket_path, log_naught_path, log_stderr_path, log_stdout_path, max_log_size, script, node_args_str, logs, socket, master, server;
fs = require('fs');
mkdirp = require('mkdirp');
path = require('path');
spawn = require('child_process').spawn;
net = require('net');
assert = require('assert');
async = require('async');
json_socket = require('./json_socket');
createLog = require('./log').create;
argv = process.argv.slice(2);
worker_count = parseInt(argv.shift());
socket_path = argv.shift();
log_naught_path = argv.shift();
log_stderr_path = argv.shift();
log_stdout_path = argv.shift();
max_log_size = parseInt(argv.shift());
script = argv.shift();
node_args_str = argv.shift();
logs = null;
socket = null;
master = null;
server = null;
function maybeCreateLog(log_path, cb){
if (log_path === '/dev/null') {
var fs = require('fs')
, mkdirp = require('mkdirp')
, path = require('path')
, spawn = require('child_process').spawn
, net = require('net')
, assert = require('assert')
, async = require('async')
, jsonSocket = require('./json_socket')
, createLog = require('./log').create

, argv = process.argv.slice(2)
, workerCount = parseInt(argv.shift(), 10)
, socketPath = argv.shift()
, logNaughtPath = argv.shift()
, logStderrPath = argv.shift()
, logStdoutPath = argv.shift()
, maxLogSize = parseInt(argv.shift(), 10)
, script = argv.shift()
, nodeArgsStr = argv.shift()

, logs = null
, socket = null
, master = null
, server = null

, own = {}.hasOwnProperty;

createLogsAndIpcServer(function(err) {
assert.ifError(err);
spawnMaster();
});

function maybeCreateLog(logPath, cb) {
// special case /dev/null - disable logging altogether
if (logPath === '/dev/null') {
cb(null, null);
} else {
createLog(log_path, max_log_size, cb);
createLog(logPath, maxLogSize, cb);
}
}

function createLogs(cb){
async.parallel({
naught: function(cb){
maybeCreateLog(log_naught_path, cb);
maybeCreateLog(logNaughtPath, cb);
},
stderr: function(cb){
maybeCreateLog(log_stderr_path, cb);
maybeCreateLog(logStderrPath, cb);
},
stdout: function(cb){
maybeCreateLog(log_stdout_path, cb);
maybeCreateLog(logStdoutPath, cb);
}
}, function(err, results){
var ref$;
if (err) {
return cb(err);
}
if (err) return cb(err);
logs = results;
if ((ref$ = logs.stderr) != null) {
ref$.on('error', function(err){
log("Error writing to " + log_stderr_path + ": " + err.stack + "\n");
if (logs.stderr) {
logs.stderr.on('error', function(err) {
log("Error writing to " + logStderrPath + ": " + err.stack + "\n");
});
}
if ((ref$ = logs.stdout) != null) {
ref$.on('error', function(err){
log("Error writing to " + log_stdout_path + ": " + err.stack + "\n");
if (logs.stdout) {
logs.stdout.on('error', function(err) {
log("Error writing to " + logStdoutPath + ": " + err.stack + "\n");
});
}
if ((ref$ = logs.naught) != null) {
ref$.on('error', function(err){
process.stderr.write("Error writing to " + log_naught_path + ": " + err.stack + "\n");
if (logs.naught) {
logs.naught.on('error', function(err){
process.stderr.write("Error writing to " + logNaughtPath + ": " + err.stack + "\n");
});
}
cb();
});
}
function log(str){
var ref$;
if ((ref$ = logs.naught) != null) {
ref$.write(str);
}

function log(str) {
if (logs.naught) logs.naught.write(str);
process.stderr.write(str);
}
function workerCountsFromMsg(msg){
var ref$;
return "booting: " + ((ref$ = msg.count) != null ? ref$.booting : void 8) + ", online: " + ((ref$ = msg.count) != null ? ref$.online : void 8) + ", dying: " + ((ref$ = msg.count) != null ? ref$.dying : void 8) + ", new_booting: " + ((ref$ = msg.count) != null ? ref$.new_booting : void 8) + ", new_online: " + ((ref$ = msg.count) != null ? ref$.new_online : void 8);

function workerCountsFromMsg(counts) {
return "booting: " + counts.booting +
", online: " + counts.online +
", dying: " + counts.dying +
", new_booting: " + counts.new_booting +
", new_online: " + counts.new_online;
}

function onMessage(message){
var ref$;
if ((ref$ = logs.naught) != null) {
ref$.write(message.event + ". " + workerCountsFromMsg(message) + "\n");
}
if (socket != null) {
json_socket.send(socket, message);
if (logs.naught) {
var str = message.event + ".";
if (message.count) str += " " + workerCountsFromMsg(message.count);
logs.naught.write(str + "\n");
}
if (socket) jsonSocket.send(socket, message);
}
function createLogsAndIpcServer(cb){

function createLogsAndIpcServer(cb) {
async.parallel([
createLogs, function(cb){
mkdirp(path.dirname(socket_path), cb);
mkdirp(path.dirname(socketPath), cb);
}
], function(err){
if (err) {
return cb(err);
}
server = net.createServer(function(new_socket){
if (err) return cb(err);
server = net.createServer(function(newSocket){
if (socket != null) {
log("Warning: Only one connection to daemon allowed. Terminating old connection.\n");
socket.destroy();
}
socket = new_socket;
socket = newSocket;
socket.on('error', function(err){
log("Error: ipc channel socket: " + err.stack + "\n");
});
socket.once('end', function(){
socket = null;
});
json_socket.listen(socket, function(msg){
jsonSocket.listen(socket, function(msg){
if (master != null) {
if (msg.action === 'NaughtDeploy') {
import$(process.env, msg.environment);
extend(process.env, msg.environment);
}
master.send(msg);
} else {
json_socket.send(socket, {
jsonSocket.send(socket, {
event: 'ErrorStillBooting'
});
}
});
});
server.listen(socket_path, function(){
server.listen(socketPath, function(){
process.send('IpcListening');
cb();
});
});
}
function spawnMaster(){
var node_args, stdout_behavior, stderr_behavior;
node_args = splitCmdLine(node_args_str);
console.error("node_args", node_args);
stdout_behavior = logs.stdout != null ? 'pipe' : 'ignore';
stderr_behavior = logs.stderr != null ? 'pipe' : 'ignore';
console.log("stdout beh", stdout_behavior);
master = spawn(process.execPath, node_args.concat([path.join(__dirname, "master.js"), worker_count, script]).concat(argv), {
var nodeArgs = splitCmdLine(nodeArgsStr);
console.error("node_args", nodeArgs);
var stdoutBehavior = logs.stdout ? 'pipe' : 'ignore';
var stderrBehavior = logs.stderr ? 'pipe' : 'ignore';
console.log("stdout beh", stdoutBehavior);
master = spawn(process.execPath, nodeArgs.concat([path.join(__dirname, "master.js"), workerCount, script]).concat(argv), {
env: process.env,
stdio: [process.stdin, stdout_behavior, stderr_behavior, 'ipc'],
stdio: [process.stdin, stdoutBehavior, stderrBehavior, 'ipc'],
cwd: process.cwd()
});
master.on('message', onMessage);
if (logs.stdout != null) {
if (logs.stdout) {
master.stdout.on('data', logs.stdout.write);
}
if (logs.stderr != null) {
if (logs.stderr) {
master.stderr.on('data', logs.stderr.write);
}
master.on('close', function(){
Expand All @@ -163,12 +171,9 @@ function splitCmdLine(str){
return str.split(/\s+/);
}
}
createLogsAndIpcServer(function(err){
assert.ifError(err);
spawnMaster();
});
function import$(obj, src){
var own = {}.hasOwnProperty;
for (var key in src) if (own.call(src, key)) obj[key] = src[key];
function extend(obj, src) {
for (var key in src) {
if (own.call(src, key)) obj[key] = src[key];
}
return obj;
}
}
130 changes: 0 additions & 130 deletions src/daemon.co

This file was deleted.

0 comments on commit 859bdab

Please sign in to comment.