Skip to content

Commit

Permalink
testing only certain branches; tasks use redis but not yet stable for…
Browse files Browse the repository at this point in the history
… 'all' tasks
  • Loading branch information
evantahler committed May 14, 2012
1 parent 2f83faf commit a704d66
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 174 deletions.
4 changes: 4 additions & 0 deletions .travis.yml
Expand Up @@ -3,6 +3,10 @@ node_js:
# - 0.5
- 0.6
- 0.7
branches:
only:
- master
- develop
before_script:
# redis is already there on localhost
45 changes: 0 additions & 45 deletions actions/actionClusterCacheTest.js

This file was deleted.

226 changes: 97 additions & 129 deletions initializers/initTasks.js
@@ -1,172 +1,140 @@
////////////////////////////////////////////////////////////////////////////
// Periodic Tasks (fixed timer events)
// tasks!

var initTasks = function(api, next)
{
api.tasks = {};
api.tasks.tasks = {};
api.tasks.queue = [];
api.tasks.timers = {};
api.tasks.cycleTimeMS = 50;
api.tasks.processing = {};
api.tasks.cycleTimeMS = 10;
api.tasks.reloadPeriodicsTime = 5000;
api.tasks.processTimer = null;

if(api.redis.enable === true){
api.tasks.redisQueue = "actionHero::tasks";
api.tasks.redisProcessingQueue = "actionHero::tasksClaimed";
}else{
api.tasks.queue = [];
}

api.tasks.enqueue = function(api, taskName, params){
if(typeof api.tasks.tasks[taskName] == "object"){
var t = api.tasks.tasks[taskName];

// I can run locally
if(t.scope == "all" || api.actionCluster.master === true){
api.tasks.queue.push({
taskName: taskName,
params: params
});

// The master server should run it
}else{
api.actionCluster.requestID++;
var requestID = api.actionCluster.requestID;
api.actionCluster.cache.results[requestID] = {
requestID: requestID,
complete: false,
peerResponses: []
};
api.actionCluster.sendToAllPeers({action: "taskEnqueue", taskName: taskName, params: params, requestID: requestID});
api.actionCluster.cache.checkForComplete(api, requestID, api.actionCluster.connectionsToPeers.length, function(resp){
if(resp == false){
setTimeout(api.tasks.enqueue, api.configData.actionCluster.ReConnectToLostPeersMS , api, taskName, params) // try again, wait for reconnections
api.log("Cannot enque task with master, trying again...", "red");
}else{
var found = false;
for (var i in resp){
if(resp[i].value == true){
found = true;
break;
if(typeof api.tasks.tasks[taskName] === "object"){
var msg = {
taskName: taskName,
params: params
};
msg = JSON.stringify(msg);
if(api.redis.enable === true){
var toEnqueue = true;
api.redis.client.llen(api.tasks.redisQueue, function(err, length){
api.redis.client.lrange(api.tasks.redisQueue, 0, (length * 2), function(err, enquedTasks){
for(var i in enquedTasks){
var t = JSON.parse(enquedTasks);
if(t.taskName == taskName){
toEnqueue = false;
api.log("not enqueing "+taskName+" (periodic) as it is already in the queue", "yellow")
}
}
if(found == false){
setTimeout(api.tasks.enqueue, api.configData.actionCluster.ReConnectToLostPeersMS , api, taskName, params) // try again, wait for reconnections
api.log("Cannot enque task with master, trying again...", "red");
if(toEnqueue){
api.redis.client.hget(api.tasks.redisProcessingQueue, taskName, function (err, taskProcessing){
if(taskProcessing != null){
var data = JSON.parse(taskProcessing);
var rawTask = api.tasks.tasks[data.taskName];
if(rawTask.frequency > 0){
toEnqueue = false;
api.log("not enqueing "+taskName+" (periodic) as it is already being worked on", "yellow")
}else{
api.redis.client.rpush(api.tasks.redisQueue, msg, function(){ });
}
}else{
api.redis.client.rpush(api.tasks.redisQueue, msg, function(){ });
}
});
}
}
});
});
}else{
api.tasks.queue.push(msg);
}
}else{
api.log(taskName + " is not a known task", "red");
}
};

api.tasks.run = function(api, taskName, params, next){
clearTimeout(api.tasks.timers[taskName]);
if(api.utils.hashLength(api.actionCluster.peers) == 0 || api.tasks.tasks[taskName].scope == "all"){
api.tasks.runLocaly(api, taskName, params, next);
}else{
for(var i in api.actionCluster.peers){
var sent = false;
if(api.actionCluster.peers[i] == "connected" && api.tasks.processing[i] != "processing"){
api.tasks.runRemote(api, taskName, params, i);
sent = true;
break;
}
}
if(sent == false){
api.tasks.enqueue(api, taskName, params);
next(false);
}else{
next(false);
}
}
};

api.tasks.runRemote = function(api, taskName, params, peer){
api.tasks.processing[peer] = "processing";
api.log("enqueing task: "+taskName+" on peer "+peer, "yellow");
api.actionCluster.requestID++;
var requestID = api.actionCluster.requestID;
api.actionCluster.cache.results[requestID] = {
requestID: requestID,
complete: false,
peerResponses: []
};
var msgObj = {
action: "taskRun",
taskName: taskName,
params: params,
requestID: requestID
};
var parts = peer.split(":");
api.actionCluster.sendToPeer(msgObj, parts[0], parts[1]);

var clusterCallback = function(resp){
if(resp == false || resp.length == 0){
// peer still there?
if(api.actionCluster.peers[peer] == "connected"){
api.log("waiting for "+peer +" to comple task "+taskName, "yellow");
clearTimeout(api.actionCluster.cache.results[requestID].timeoutTimer);
clearTimeout(api.actionCluster.cache.results[requestID].dataClearTimer);
api.actionCluster.cache.results[requestID] = {
requestID: requestID,
complete: false,
peerResponses: []
};
checkForTaskComplete(api, requestID, taskName, peer);

api.tasks.getNextTask = function(api, next){
if(api.redis.enable === true){
api.redis.client.lpop(api.tasks.redisQueue, function(err, task){
if(task != null){
task = JSON.parse(task);
var data = {
taskName: task.taskName,
params: task.params,
server: api.id
}
api.redis.client.hset(api.tasks.redisProcessingQueue, task.taskName, JSON.stringify(data), function(){
next(task);
});
}else{
api.tasks.processing[peer] = false;
api.tasks.enqueue(api, taskName, params);
next(null);
}
});
}else{
if(api.tasks.queue.length > 0){
var task = api.tasks.queue.pop();
next(JSON.parse(task));
}else{
var content = resp.taskResp;
api.log("task "+taskName+" complete on peer "+peer, "yellow");
api.tasks.processing[peer] = false;
if(api.tasks.tasks[taskName].frequency > 0){
clearTimeout(api.tasks.timers[taskName]);
api.tasks.timers[taskName] = setTimeout(api.tasks.enqueue, api.tasks.tasks[taskName].frequency, api, taskName);
}
next(null);
}
}

var checkForTaskComplete = function(api, requestID, taskName, peer){
api.actionCluster.cache.checkForComplete(api, requestID, 1, clusterCallback);
}

checkForTaskComplete(api, requestID, taskName, peer);
}

api.tasks.runLocaly = function(api, taskName, params, next){
api.tasks.processing
api.tasks.run = function(api, taskName, params, next){
clearTimeout(api.tasks.timers[taskName]);
api.log("running task: "+taskName, "yellow");
api.tasks.tasks[taskName].run(api, params, function(resp){
if(typeof next == "function"){ next(true); }
})
}
};

api.tasks.process = function(api){
clearTimeout(api.tasks.processTimer);
if(api.tasks.queue.length > 0){
var thisTask = api.tasks.queue[0];
api.tasks.queue = api.tasks.queue.splice(1);
api.tasks.run(api, thisTask.taskName, thisTask.params, function(run){
if(run){
if(api.tasks.tasks[thisTask.taskName].frequency > 0){
api.tasks.timers[thisTask.taskName] = setTimeout(api.tasks.enqueue, api.tasks.tasks[thisTask.taskName].frequency, api, thisTask.taskName);
api.tasks.getNextTask(api, function(task){
if(task == null){
api.tasks.processTimer = setTimeout(api.tasks.process, api.tasks.cycleTimeMS, api);
}else{
api.tasks.run(api, task.taskName, task.params, function(run){
if(run){
//
}else{
api.log("task failed to run: "+JSON.stringify(task), "red")
}
api.tasks.enqueuePeriodicTask(api, api.tasks.tasks[task.taskName])
api.tasks.processTimer = setTimeout(api.tasks.process, api.tasks.cycleTimeMS, api);
});
}
})
};

api.tasks.enqueuePeriodicTask = function(api, task){
if(task.frequency > 0){
api.tasks.timers[task.taskName] = setTimeout(function(api, task){
// remove the task from the processing queue
if(api.redis.enable === true){
api.redis.client.hdel(api.tasks.redisProcessingQueue, task.name, function(){ });
}
api.tasks.processTimer = setTimeout(api.tasks.process, api.tasks.cycleTimeMS, api);
});
}else{
api.tasks.processTimer = setTimeout(api.tasks.process, api.tasks.cycleTimeMS, api);
// enqueue
api.tasks.enqueue(api, task.name, null);
}, task.frequency, api, task);
}
};
}

api.tasks.startPeriodicTasks = function(api, next){
for(var i in api.tasks.tasks){
var task = api.tasks.tasks[i];
if(task.frequency > 0 && ( task.scope == "all" || api.actionCluster.master === true)){
if(api.tasks.timers[task.name] == null){
api.tasks.timers[task.name] = setTimeout(api.tasks.enqueue, task.frequency, api, task.name);
}
if(task.frequency > 0){
api.tasks.enqueuePeriodicTask(api, task)
}
}
next();
if(typeof next == "function"){ next(); }
}

// init
Expand Down

0 comments on commit a704d66

Please sign in to comment.