Skip to content

Commit

Permalink
working on clustering...
Browse files Browse the repository at this point in the history
  • Loading branch information
evantahler committed Mar 17, 2012
1 parent e82a744 commit d129620
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 68 deletions.
10 changes: 4 additions & 6 deletions _specHelper.js
Expand Up @@ -5,6 +5,7 @@ specHelper.vows = require('vows');
specHelper.net = require('net');
specHelper.assert = require('assert');
specHelper.request = require('request');
specHelper.utils = require('./utils.js').utils;
specHelper.apis = [];
specHelper.actionHeroes = [];
specHelper.url = "127.0.0.1";
Expand All @@ -17,7 +18,6 @@ specHelper.params[0] = {
"webServerPort" : 9000,
"socketServerPort" : 6000,
"logging":showLogs,
"cronProcess":false,
"cache" : {
"cacheFile" : "test_cache_1.cache",
"defaultExpireTimeSeconds" : 3600,
Expand All @@ -29,7 +29,7 @@ specHelper.params[0] = {
"Key" : "4ijhaijhm43yjnawhja43jaj",
"ReConnectToLostPeersMS" : 1000,
"CycleCheckTimeMS" : 100,
"remoteTimeoutWaitMS" : 10000,
"remoteTimeoutWaitMS" : 1000,
"nodeDuplication" : 2,
"StartingPeer" : {
"host": null,
Expand All @@ -43,7 +43,6 @@ specHelper.params[1] = {
"webServerPort" : 9001,
"socketServerPort" : 6001,
"logging":showLogs,
"cronProcess":false,
"cache" : {
"cacheFile" : "test_cache_2.cache",
"defaultExpireTimeSeconds" : 3600,
Expand All @@ -55,7 +54,7 @@ specHelper.params[1] = {
"Key" : "4ijhaijhm43yjnawhja43jaj",
"ReConnectToLostPeersMS" : 1000,
"CycleCheckTimeMS" : 100,
"remoteTimeoutWaitMS" : 10000,
"remoteTimeoutWaitMS" : 1000,
"nodeDuplication" : 2,
"StartingPeer" : {
"host": specHelper.url,
Expand All @@ -69,7 +68,6 @@ specHelper.params[2] = {
"webServerPort" : 9002,
"socketServerPort" : 6002,
"logging":showLogs,
"cronProcess":false,
"cache" : {
"cacheFile" : "test_cache_3.cache",
"defaultExpireTimeSeconds" : 3600,
Expand All @@ -81,7 +79,7 @@ specHelper.params[2] = {
"Key" : "4ijhaijhm43yjnawhja43jaj",
"ReConnectToLostPeersMS" : 1000,
"CycleCheckTimeMS" : 100,
"remoteTimeoutWaitMS" : 10000,
"remoteTimeoutWaitMS" : 1000,
"nodeDuplication" : 2,
"StartingPeer" : {
"host": specHelper.url,
Expand Down
4 changes: 2 additions & 2 deletions api.js
Expand Up @@ -113,9 +113,9 @@ var createActionHero = function(){
};

actionHero.stop = function(next){
if(actionHero.running == true)
{
if(actionHero.running == true){
actionHero.api.log("Shutting down open servers (:"+actionHero.api.configData.webServerPort+", :"+actionHero.api.configData.socketServerPort+") and pausing tasks", "bold");
clearTimeout(actionHero.api.tasks.processTimer);
var closed = 0;
var checkForDone = function(closed){
if(closed == 2){
Expand Down
2 changes: 1 addition & 1 deletion config.json
Expand Up @@ -19,7 +19,7 @@

"actionCluster": {
"Key" : "4ijhaijhm43yjnawhja43jaj",
"ReConnectToLostPeersMS" : 1000,
"ReConnectToLostPeersMS" : 5000,
"CycleCheckTimeMS" : 10,
"remoteTimeoutWaitMS" : 5000,
"nodeDuplication" : 2,
Expand Down
18 changes: 11 additions & 7 deletions initializers/initActionCluster.js
Expand Up @@ -198,12 +198,16 @@ var initActionCluster= function(api, next){
if(i < api.utils.hashLength(api.actionCluster.connectionsToPeers)){
saveObjectAtOnePeer(api, key, value, expireTimeSeconds, requestID, i);
api.actionCluster.cache.checkForComplete(api, requestID, (i+1), function(resp){
if(resp[i]["value"] == true){ instnaceCounter++; }
if(instnaceCounter == api.configData.actionCluster.nodeDuplication){
if(typeof next == "function"){ next(resp); }
if(resp == false){
// TODO: ?
}else{
i++;
saveAtEnoughPeers(api, key, value, expireTimeSeconds, requestID, i, instnaceCounter, next);
if(resp[i]["value"] == true){ instnaceCounter++; }
if(instnaceCounter == api.configData.actionCluster.nodeDuplication){
if(typeof next == "function"){ next(resp); }
}else{
i++;
saveAtEnoughPeers(api, key, value, expireTimeSeconds, requestID, i, instnaceCounter, next);
}
}
});
}else{
Expand Down Expand Up @@ -247,7 +251,7 @@ var initActionCluster= function(api, next){
peerResponses: []
};

if(api.utils.hashLength(api.actionCluster.connectionsToPeers)){
if(api.utils.hashLength(api.actionCluster.connectionsToPeers) > 0){
var msgObj = {action: "cacheDestroy", key: key, requestID: requestID};
if(remotePeer == null){
api.actionCluster.sendToAllPeers(msgObj);
Expand Down Expand Up @@ -297,7 +301,7 @@ var initActionCluster= function(api, next){
var completeAndRestart = function(){
if(started == 0){
if(counter > 0){
api.log(counter + " cache objects on this server do not have corresponding duplicates in peers; Attempting to re-duplicate", "red");
api.log(counter + " cache objects on this server do not have corresponding duplicates in peers; Attempting to re-duplicate", "yellow");
}
setTimeout(api.actionCluster.cache.ensureObjectDuplication, api.configData.actionCluster.remoteTimeoutWaitMS, api);
}else{
Expand Down
47 changes: 26 additions & 21 deletions initializers/initStats.js
Expand Up @@ -28,27 +28,32 @@ var initStats = function(api, next){

api.stats.calculate = function(api, next){
api.cache.load(api, "_stats", function(stats){
var now = new Date().getTime();
stats.lastCalculation = now;
stats.uptimeSeconds = (now - stats.startTime) / 1000;
stats.cache = {
numberOfObjects: api.utils.hashLength(api.cache.data)
};
stats.socketServer = {
numberOfSocketRequests: api.socketServer.numberOfSocketRequests
};
stats.webServer = {
numberOfWebRequests: api.webServer.numberOfWebRequests
};
stats.memoryConsumption = process.memoryUsage().heapUsed;
stats.actionCluster = {
peers: api.actionCluster.peers,
clusterRequests: api.actionCluster.requestID
};

api.cache.save(api, "_stats", stats, cacheTime, function(){
if(typeof next == "function"){ next(stats); }
});
if(stats != null){
var now = new Date().getTime();
stats.lastCalculation = now;
stats.uptimeSeconds = (now - stats.startTime) / 1000;
stats.cache = {
numberOfObjects: api.utils.hashLength(api.cache.data)
};
stats.socketServer = {
numberOfSocketRequests: api.socketServer.numberOfSocketRequests
};
stats.webServer = {
numberOfWebRequests: api.webServer.numberOfWebRequests
};
stats.memoryConsumption = process.memoryUsage().heapUsed;
stats.actionCluster = {
peers: api.actionCluster.peers,
clusterRequests: api.actionCluster.requestID
};
api.cache.save(api, "_stats", stats, cacheTime, function(){
if(typeof next == "function"){ next(stats); }
});
}else{
api.stats.init(api, function(){
api.stats.calculate(api, next)
});
}
});
}

Expand Down
34 changes: 22 additions & 12 deletions initializers/initTasks.js
Expand Up @@ -7,7 +7,7 @@ var initTasks = function(api, next)
api.tasks.tasks = {};
api.tasks.queue = [];
api.tasks.timers = {};
api.tasks.cycleTimeMS = 1000;
api.tasks.cycleTimeMS = 500;

api.tasks.enqueue = function(api, taskName, params){
api.tasks.queue.push({
Expand Down Expand Up @@ -38,10 +38,12 @@ var initTasks = function(api, next)
if(api.tasks.queue.length > 0){
var thisTask = api.tasks.queue[0];
api.tasks.queue = api.tasks.queue.splice(1);
// no peers, so do all types of tasks
if(api.actionCluster.connectionsToPeers.length < 2){
api.tasks.run(api, thisTask.taskName, thisTask.params, function(){
api.tasks.processTimer = setTimeout(api.tasks.process, api.tasks.cycleTimeMS, api);
});
// cluster: need to ensure that the "any" tasks aren't done more than once
}else{
api.actionCluster.cache.load(api, "_periodicTasks", function(clusterResp){
var otherPeerTasks = {}
Expand All @@ -52,21 +54,25 @@ var initTasks = function(api, next)
}
var t = api.tasks.tasks[thisTask.taskName];
api.cache.load(api, "_periodicTasks", function(_periodicTasks){
if(t.scope == "all" || otherPeerTasks[thisTask.taskName] != true){
api.tasks.run(api, thisTask.taskName, thisTask.params, function(){
if(_periodicTasks.indexOf(t.name) < 0){
_periodicTasks.push(t.name);
}
if(_periodicTasks != null){
if(t.scope == "all" || otherPeerTasks[thisTask.taskName] != true){
api.tasks.run(api, thisTask.taskName, thisTask.params, function(){
if(_periodicTasks.indexOf(t.name) < 0){
_periodicTasks.push(t.name);
}
api.cache.save(api, "_periodicTasks", _periodicTasks, null, function(resp){
api.tasks.processTimer = setTimeout(api.tasks.process, api.tasks.cycleTimeMS, api);
});
});
}else{
_periodicTasks.splice(_periodicTasks.indexOf(t.name),1);
api.cache.save(api, "_periodicTasks", _periodicTasks, null, function(resp){
api.tasks.timers[t.name] = setTimeout(api.tasks.enqueue, t.frequency, api, t.name);
api.tasks.processTimer = setTimeout(api.tasks.process, api.tasks.cycleTimeMS, api);
});
});
}
}else{
_periodicTasks.splice(_periodicTasks.indexOf(t.name),1);
api.cache.save(api, "_periodicTasks", _periodicTasks, null, function(resp){
api.tasks.timers[t.name] = setTimeout(api.tasks.enqueue, t.frequency, api, t.name);
api.tasks.processTimer = setTimeout(api.tasks.process, api.tasks.cycleTimeMS, api);
});
api.tasks.startPeriodicTasks();
}
});
});
Expand All @@ -79,6 +85,10 @@ var initTasks = function(api, next)
api.tasks.startPeriodicTasks = function(api, next){
var _periodicTasks = [];
for(var i in api.tasks.tasks){


// I THINK THAT api.tasks.tasks IS NOT GETTING RELOADED AT SERER RESTART PROPERLY BECAUSE OF MODLUES NOT GETTING RELOADED!

var task = api.tasks.tasks[i];
if(task.frequency > 0){ // all scopes ok for single node
if(api.tasks.timers[task.name] == null){
Expand Down
2 changes: 1 addition & 1 deletion readme.markdown
Expand Up @@ -341,7 +341,7 @@ You can create you own tasks by placing them in a `./tasks/` folder at the root

* `task.name`: The unique name of your task
* `task.description`: a description
* `task.scope`: "any" or "all". Should a single actionCluster server (any) run this task, or should all of them?
* `task.scope`: "any" or "all". Should a single actionCluster server (any) run this task, or should all of them? For example, `calculateStats` is run by all peers in the action cluster (because we want to know all peer's status), but if you had a task to clean old sessions from your database or send an email, you would only want a single node to do that.
* `task.frequency`: In milliseconds, how often should I run?. Setting me to 0 will cause me not to run automatically, but I can still be run with `api.task.run`

As stated above, any task can also be called programmatically with `api.tasks.run(api, taskName, params, next)`.
Expand Down

0 comments on commit d129620

Please sign in to comment.