Skip to content

Commit

Permalink
remove clustering as it was rather unstable
Browse files Browse the repository at this point in the history
  • Loading branch information
evantahler committed Dec 24, 2011
1 parent b2d2cd8 commit 99dda30
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 146 deletions.
3 changes: 1 addition & 2 deletions actionHero
Expand Up @@ -14,8 +14,7 @@ params.configChanges = {
"port" : "3306",
"consoleLogging" : false
},
"flatFileDirectory":"./public/",
"cluster": true
"flatFileDirectory":"./public/"
}

// any additional functions you might wish to define to be globally accessable can be added as part of params.initFunction. The api object will be availalbe.
Expand Down
3 changes: 2 additions & 1 deletion actions/status.js
Expand Up @@ -19,7 +19,8 @@ action.outputExample = {
action.run = function(api, connection, next){
connection.response.status = "OK";
var now = new Date().getTime();
api.stats.uptime = now - api.stats.startTime;
api.stats.uptimeSeconds = (now - api.stats.startTime) / 1000;
api.stats.pid = process.pid;
connection.response.stats = api.stats;
next(connection, true);
};
Expand Down
193 changes: 52 additions & 141 deletions api.js
Expand Up @@ -12,19 +12,15 @@ actionHero.initRequires = function(api, next)
api.utils = require(__dirname + '/utils.js').utils;
api.cache = require(__dirname + '/cache.js').cache;

if (api.cluster.isMaster) {
var taskFile = process.cwd() + "/tasks.js";
api.path.exists(taskFile, function (exists) {
if(!exists){
taskFile = __dirname + "/tasks.js";
api.log("no ./tasks.js file in project, loading defaults tasks from "+taskFile, "yellow");
}
api.tasks = require(taskFile).tasks;
next();
});
}else{
next();
}
var taskFile = process.cwd() + "/tasks.js";
api.path.exists(taskFile, function (exists) {
if(!exists){
taskFile = __dirname + "/tasks.js";
api.log("no ./tasks.js file in project, loading defaults tasks from "+taskFile, "yellow");
}
api.tasks = require(taskFile).tasks;
next();
});
}

////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -66,30 +62,30 @@ actionHero.initDB = function(api, next)

var modelsPath = process.cwd() + "/models/";
api.path.exists(modelsPath, function (exists) {
if(!exists){
var defaultModelsPath = __dirname + "/models/";
if (api.cluster.isMaster) { api.log("no ./modles path in project, loading defaults from "+defaultModelsPath, "yellow"); }
modelsPath = defaultModelsPath;
if(!exists){
var defaultModelsPath = __dirname + "/models/";
api.log("no ./modles path in project, loading defaults from "+defaultModelsPath, "yellow");
modelsPath = defaultModelsPath;
}

api.fs.readdirSync(modelsPath).forEach( function(file) {
var modelName = file.split(".")[0];
api.models[modelName] = require(modelsPath + file)['defineModel'](api);
api.seeds[modelName] = require(modelsPath + file)['defineSeeds'](api);
api.modelsArray.push(modelName);
if (api.cluster.isMaster) { api.log("model loaded: " + modelName, "blue"); }
api.log("model loaded: " + modelName, "blue");
});
api.dbObj.sync().on('success', function() {
for(var i in api.seeds){
var seeds = api.seeds[i];
var model = api.models[i];
if (seeds != null){
api.utils.DBSeed(api, model, seeds, function(seeded, modelResp){
if (api.cluster.isMaster) { if(seeded){ api.log("Seeded data for: "+modelResp.name, "cyan"); } }
if(seeded){ api.log("Seeded data for: "+modelResp.name, "cyan"); }
});
}
}
if (api.cluster.isMaster) { api.log("DB conneciton sucessfull and Objects mapped to DB tables", "green"); }
api.log("DB conneciton sucessfull and Objects mapped to DB tables", "green");
next();
}).on('failure', function(error) {
api.log("trouble synchronizing models and DB. Correct DB credentials?", "red");
Expand Down Expand Up @@ -152,15 +148,15 @@ actionHero.initActions = function(api, next)
api.path.exists(actionsPath, function (exists) {
if(!exists){
var defaultActionsPath = __dirname + "/actions/";
if (api.cluster.isMaster) { api.log("no ./actions path in project, loading defaults from "+defaultActionsPath, "yellow"); }
api.log("no ./actions path in project, loading defaults from "+defaultActionsPath, "yellow");
actionsPath = defaultActionsPath;
}
api.fs.readdirSync(actionsPath).forEach( function(file) {
if (file != ".DS_Store"){
var actionName = file.split(".")[0];
var thisAction = require(actionsPath + file)["action"];
api.actions[thisAction.name] = require(actionsPath + file).action;
if (api.cluster.isMaster) { api.log("action loaded: " + actionName, "blue"); }
api.log("action loaded: " + actionName, "blue");
}
});
next();
Expand Down Expand Up @@ -357,11 +353,12 @@ actionHero.initSocketServerListen = function(api, next){
api.connections = [];
api.socketServer = api.net.createServer(function (connection) {
api.stats.numberOfSocketRequests = api.stats.numberOfSocketRequests + 1;

connection.setEncoding("utf8");
connection.type = "socket";
connection.params = {};
connection.remoteIP = connection.remoteAddress;
connection.id = new Buffer(connection.remoteAddress + "@" + connection.remotePort).toString('base64');
connection.id = new Buffer(connection.remoteAddress + connection.remotePort + Math.random()).toString('base64');
connection.room = api.configData.defaultSocketRoom;
connection.public = {};
connection.public.id = connection.id;
Expand All @@ -378,67 +375,60 @@ actionHero.initSocketServerListen = function(api, next){
if(words[0] == "quit" || words[0] == "exit" || words[0] == "close" || data.indexOf("\u0004") > -1 ){
api.sendSocketMessage(connection, {status: "Bye!"});
connection.end();
api.log("socket connection "+connection.remoteIP+" | requesting disconnect");
if(api.configData.logRequests){api.log(" > socket request from " + connection.remoteIP + " | requesting disconnect", "white");}
}else if(words[0] == "paramAdd"){
var parts = words[1].split("=");
connection.params[parts[0]] = parts[1];
api.sendSocketMessage(connection, {status: "OK"});
api.log("socket connection "+connection.remoteIP+" | "+data);
if(api.configData.logRequests){api.log(" > socket request from " + connection.remoteIP + " | "+data, "grey");}
}else if(words[0] == "paramDelete"){
connection.data.params[words[1]] = null;
api.sendSocketMessage(connection, {status: "OK"});
api.log("socket connection "+connection.remoteIP+" | "+data);
if(api.configData.logRequests){api.log(" > socket request from " + connection.remoteIP + " | "+data, "grey");}
}else if(words[0] == "paramView"){
var q = words[1];
api.sendSocketMessage(connection, {q: connection.params[q]});
api.log("socket connection "+connection.remoteIP+" | "+data);
if(api.configData.logRequests){api.log(" > socket request from " + connection.remoteIP + " | "+data, "grey");}
}else if(words[0] == "paramsView"){
api.sendSocketMessage(connection, connection.params);
api.log("socket connection "+connection.remoteIP+" | "+data);
if(api.configData.logRequests){api.log(" > socket request from " + connection.remoteIP + " | "+data, "grey");}
}else if(words[0] == "paramsDelete"){
connection.params = {};
api.sendSocketMessage(connection, {status: "OK"});
api.log("socket connection "+connection.remoteIP+" | "+data);
if(api.configData.logRequests){api.log(" > socket request from " + connection.remoteIP + " | "+data, "grey");}
}else if(words[0] == "roomChange"){
connection.room = words[1];
api.sendSocketMessage(connection, {status: "OK", room: connection.room});
api.log("socket connection "+connection.remoteIP+" | "+data);
if(api.configData.logRequests){api.log(" > socket request from " + connection.remoteIP + " | "+data, "grey");}
}else if(words[0] == "roomView"){
var stats = api.socketRoomStatus(api, connection.room);
api.sendSocketMessage(connection, {status: "OK", room: connection.room, stats: stats});
api.log("socket connection "+connection.remoteIP+" | "+data);
var roomStatus = api.socketRoomStatus(api, connection.room);
api.sendSocketMessage(connection, {status: "OK", room: connection.room, roomStatus: roomStatus});
if(api.configData.logRequests){api.log(" > socket request from " + connection.remoteIP + " | "+data, "grey");}
}else if(words[0] == "say"){
var message = data.substr(4);
api.socketRoomBroadcast(api, connection, message);
api.sendSocketMessage(connection, {status: "OK"});
api.log("socket connection "+connection.remoteIP+" | "+data);
if(api.configData.logRequests){api.log(" > socket request from " + connection.remoteIP + " | "+data, "grey");}
}else{
connection.error = false;
connection.response = {};
// if(connection.params["action"] == null || words.length == 1){connection.params["action"] = words[0];}
connection.params["action"] = words[0];
process.nextTick(function() { actionHero.processAction(api, connection, api.respondToSocketClient); });
api.log("socket connection "+connection.remoteIP+" | "+data);
if(api.configData.logRequests){api.log(" > socket request from " + connection.remoteIP + " | "+data, "grey");}
}
});
connection.on("end", function () {
connection.end();
try{ connection.end(); }catch(e){}
for(var i in api.connections){
var thisConnection = api.connections[i];
if(thisConnection.id == connection.id){ api.connections.splice(i,1); }
}
api.log("socket connection "+connection.remoteIP, connection.id+" | disconnected");
if(api.configData.logRequests){api.log(" > socket connection " + connection.remoteIP + " disconnected", "white");}
});
});

// broadcast a message to all connections in a room
api.socketRoomBroadcast = function(api, connection, message, broadcastToMaster){
// tell the master if worker
if(broadcastToMaster == null){broadcastToMaster = true;}
if (api.cluster.isWorker && broadcastToMaster) {
process.send({ cmd: 'socketMessage', data: {connection: {id: connection.id, room: connection.room}, message: message} });
}
// tell other local connections
api.socketRoomBroadcast = function(api, connection, message){
for(var i in api.connections){
var thisConnection = api.connections[i];
if(thisConnection.room == connection.room){
Expand All @@ -453,7 +443,7 @@ actionHero.initSocketServerListen = function(api, next){
}
}

// stats for a room
// status for a room
api.socketRoomStatus = function(api, room){
results = {};
results.rooms = {};
Expand Down Expand Up @@ -551,51 +541,19 @@ actionHero.log = function(original_message, styles){

////////////////////////////////////////////////////////////////////////////
// process init and message passing
actionHero.initMasterComplete = function(api, next){
api.log("");
api.log("*** Master Started @ " + api.utils.sqlDateTime() + " @ web port " + api.configData.webServerPort + " & socket port " + api.configData.socketServerPort + " ***", ["green", "bold"]);
api.log("Starting workers:");
api.log("");

api.masterProcessMessage = function(api, m){

}

api.workers = [];
for (var i = 0; i < api.os.cpus().length; i++) {
var worker = api.cluster.fork();
worker.on('message', function(m) {
for (var i in api.workers){
if(api.workers[i].pid != worker.pid && m.cmd != "queryServer"){ api.workers[i].send(m); }
}
api.masterProcessMessage(api, m)
});
api.workers.push(worker);
}
next();
}

actionHero.singleThreadComplete = function(api, next){
api.log("");
api.log("*** Server Started @ " + api.utils.sqlDateTime() + " @ web port " + api.configData.webServerPort + " & socket port " + api.configData.socketServerPort + " ***", ["green", "bold"]);
api.log("");
next();
}

actionHero.initWorkerComplete = function(api){

process.on('message', function(m){
if (m.cmd == "socketMessage"){ api.socketRoomBroadcast(api, m.data.connection, m.data.message, false); }
});

api.log("worker pid "+process.pid+" started", "green");
}

////////////////////////////////////////////////////////////////////////////
// GO!

actionHero.start = function(params, callback){
if (params == null){params = {};}

// the api namespace. Everything uses this.
if(params.api == null){
var api = {};
Expand All @@ -610,7 +568,6 @@ actionHero.start = function(params, callback){
api.url = require("url");
api.path = require("path");
api.fs = require("fs");
api.cluster = require("cluster");
api.os = require('os');
api.mysql = require('mysql');
api.SequelizeBase = require("sequelize");
Expand All @@ -633,84 +590,39 @@ actionHero.start = function(params, callback){
}else{
var defualtConfigFile = "./node_modules/actionHero/config.json";
if(params.configChanges == null){
if (api.cluster.isMaster) { api.log('no local config.json found nor no provided configChanges; using default from '+defualtConfigFile, "red"); }
api.log('no local config.json found nor no provided configChanges; using default from '+defualtConfigFile, "red");
}else{
if (api.cluster.isMaster) {
api.log("configChanges found to default template in "+defualtConfigFile+":");
api.log(JSON.stringify(params.configChanges));
}
api.log("configChanges found to default template in "+defualtConfigFile+":");
api.log(JSON.stringify(params.configChanges));
}
api.configData = JSON.parse(api.fs.readFileSync(defualtConfigFile,'utf8'));
}

for (var i in params.configChanges){ api.configData[i] = params.configChanges[i];}

api.stats = {};
api.stats.numberOfWebRequests = 0;
api.stats.numberOfSocketRequests = 0;
api.stats.startTime = new Date().getTime();

if (api.cluster.isMaster) {
actionHero.initLogFolder(api, function(){
actionHero.initRequires(api, function(){
actionHero.initDB(api, function(){
actionHero.initCron(api, function(){
actionHero.initActions(api, function(){
actionHero.initPostVariables(api, function(){
if(api.configData.cluster){
if(typeof params.initFunction == "function"){
params.initFunction(api, function(){
actionHero.initMasterComplete(api, function(){
if(callback != null){ process.nextTick(function() { callback(api); }); }
});
})
}else{
actionHero.initMasterComplete(api, function(){
if(callback != null){ process.nextTick(function() { callback(api); }); }
});
}
}else{
actionHero.initWebListen(api, function(){
actionHero.initSocketServerListen(api, function(){
if(typeof params.initFunction == "function"){
params.initFunction(api, function(){
actionHero.singleThreadComplete(api, function(){
if(callback != null){ process.nextTick(function() { callback(api); }); }
});
})
}else{
actionHero.singleThreadComplete(api, function(){
if(callback != null){ process.nextTick(function() { callback(api); }); }
});
}
});
});
}
});
});
});
});
});
});

api.cluster.on('death', function(worker) {
api.log('worker ' + worker.pid + ' died', ["red", "bold"]);
});

}else{
actionHero.initLogFolder(api, function(){
actionHero.initRequires(api, function(){
actionHero.initDB(api, function(){
actionHero.initLogFolder(api, function(){
actionHero.initRequires(api, function(){
actionHero.initDB(api, function(){
actionHero.initCron(api, function(){
actionHero.initActions(api, function(){
actionHero.initPostVariables(api, function(){
actionHero.initWebListen(api, function(){
actionHero.initSocketServerListen(api, function(){
if(typeof params.initFunction == "function"){
params.initFunction(api, function(){
actionHero.initWorkerComplete(api);
actionHero.singleThreadComplete(api, function(){
if(callback != null){ process.nextTick(function() { callback(api); }); }
});
})
}else{
actionHero.initWorkerComplete(api);
actionHero.singleThreadComplete(api, function(){
if(callback != null){ process.nextTick(function() { callback(api); }); }
});
}
});
});
Expand All @@ -719,8 +631,7 @@ actionHero.start = function(params, callback){
});
});
});
}

});
});
}

Expand Down
1 change: 0 additions & 1 deletion config.json
Expand Up @@ -5,7 +5,6 @@
"serverName" : "actionHero API",
"socketServerWelcomeMessage" : "Hello! Welcome to the actionHero api",
"apiBaseDir" : "./node_modules/actionHero/",
"cluster" : true ,

"logging" : true,
"logFolder" : "./log/",
Expand Down
1 change: 0 additions & 1 deletion specHelper.js
Expand Up @@ -18,7 +18,6 @@ specHelper.params = {
"consoleLogging" : false,
},
"flatFileDirectory":"./public/",
"cluster" : false,
"webServerPort" : 8081,
"socketServerPort" : 5001,
"logging":false,
Expand Down

0 comments on commit 99dda30

Please sign in to comment.