Skip to content

Commit

Permalink
config back to normal; stats and tasks work without redis
Browse files Browse the repository at this point in the history
  • Loading branch information
Evan Tahler committed Jan 6, 2013
1 parent 8845d67 commit 9b8074f
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 87 deletions.
4 changes: 2 additions & 2 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ configData.general = {
"defaultOffset": 0,
// The number of internal "workers" this node will have.
// Remember these are logical timers (not threads) so they will block if they are computationally intense
"workers" : 1,
"workers" : 5,
// watch for changes in actions and tasks, and reload them on the fly
// This will not work in all operating systems [ http://nodejs.org/docs/latest/api/fs.html#fs_fs_watchfile_filename_options_listener ]
"developmentMode": false,
Expand All @@ -57,7 +57,7 @@ configData.log = {
///////////

configData.redis = {
"enable": true,
"enable": false,
"host": "127.0.0.1",
"port": 6379,
"password": null,
Expand Down
1 change: 1 addition & 0 deletions initializers/initStats.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var initStats = function(api, next){
});
});
}else{
if(api.stats.data[key] == null){ api.stats.data[key] = 0; }
api.stats.data[key] = api.stats.data[key] + count;
if(typeof next == "function"){ process.nextTick(function() { next(null, true); }); }
}
Expand Down
230 changes: 145 additions & 85 deletions initializers/initTasks.js
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ var initTasks = function(api, next){
api.tasks.queueLength(api, api.tasks.queues.globalQueue, function(err, globalQueueCount){
api.tasks.queueLength(api, api.tasks.queues.localQueue, function(err, localQueueCount){
api.tasks.queueLength(api, api.tasks.queues.delayedQueue, function(err, delayedQueueCount){
// console.log({
// delayedQueueCount: delayedQueueCount,
// globalQueueCount: globalQueueCount,
// localQueueCount: localQueueCount,
// });
if(localQueueCount > 0){

// work something from the local queue to processing, and work it off
Expand Down Expand Up @@ -310,29 +315,41 @@ var initTasks = function(api, next){
api.tasks.taskProcessors = [];
api.tasks.cycleTimeMS = 200;

api.tasks.queues = {
globalQueue: 'actionHero:tasks:global',
delayedQueue: 'actionHero:tasks:delayed',
localQueue: 'actionHero:tasks:' + api.id.replace(/:/g,'-'),
processingQueue: 'actionHero:tasks:processing',
data: 'actionHero:tasks:data', // actually a hash
}

if(api.redis.enable === true){
api.tasks.queues = {
globalQueue: 'actionHero:tasks:global',
delayedQueue: 'actionHero:tasks:delayed',
localQueue: 'actionHero:tasks:' + api.id.replace(/:/g,'-'),
processingQueue: 'actionHero:tasks:processing',
data: 'actionHero:tasks:data', // actually a hash
}
//
}else{
api.tasks.queue = [];
api.tasks.queueData = {};
api.tasks.queueData[api.tasks.queues.globalQueue] = [];
api.tasks.queueData[api.tasks.queues.delayedQueue] = [];
api.tasks.queueData[api.tasks.queues.localQueue] = [];
api.tasks.queueData[api.tasks.queues.processingQueue] = [];
api.tasks.queueData[api.tasks.queues.data] = {};
}

api.tasks._start = function(api, next){
var i = 0;
api.log("starting "+api.configData.general.workers+" task timers", "yellow")
while(i < api.configData.general.workers){
var taskProcessor = new api.taskProcessor({id: i});
taskProcessor.start();
api.tasks.taskProcessors[i] = taskProcessor;
i++;
}
api.tasks.seedPeriodicTasks(function(){
next();
api.tasks.savePreviouslyCrashedTasks(function(){
api.tasks.seedPeriodicTasks(function(){
var i = 0;
api.log("starting "+api.configData.general.workers+" task timers", "yellow")
while(i < api.configData.general.workers){
(function(){
var taskProcessor = new api.taskProcessor({id: i});
api.tasks.taskProcessors[i] = taskProcessor;
var timer = i * 50;
setTimeout(function(){ taskProcessor.start(); }, timer)
})()
i++;
}
next();
});
});
}

Expand All @@ -359,8 +376,6 @@ var initTasks = function(api, next){
}
if(typeof callback == "function"){ callback(null, allLocalQueues); }
});
}else{

}
}

Expand Down Expand Up @@ -396,7 +411,9 @@ var initTasks = function(api, next){
}
})
}else{

process.nextTick(function(){
if(typeof callback == "function"){ callback(); }
});
}
}

Expand Down Expand Up @@ -424,7 +441,9 @@ var initTasks = function(api, next){
}
});
}else{

process.nextTick(function(){
if(typeof callback == "function"){ callback(null, api.tasks.queueData[api.tasks.queues.data]); }
});
}
}

Expand All @@ -442,7 +461,15 @@ var initTasks = function(api, next){
});
});
}else{

var muxedData = api.tasks.queueData[api.tasks.queues.data][taskId];
if(muxedData == null){ muxedData = {}; }
for(var i in data){
muxedData[i] = data[i];
}
api.tasks.queueData[api.tasks.queues.data][taskId] = muxedData;
process.nextTick(function(){
if(typeof callback == "function"){ callback(null, muxedData); }
});
}
}

Expand All @@ -457,7 +484,9 @@ var initTasks = function(api, next){
if(typeof callback == "function"){ callback(err, data); }
});
}else{

process.nextTick(function(){
if(typeof callback == "function"){ callback(null, api.tasks.queueData[api.tasks.queues.data][taskId]); }
});
}
}

Expand All @@ -467,7 +496,10 @@ var initTasks = function(api, next){
if(typeof callback == "function"){ callback(err); }
});
}else{

delete api.tasks.queueData[api.tasks.queues.data][taskId];
process.nextTick(function(){
if(typeof callback == "function"){ callback(null); }
});
}
}

Expand All @@ -478,7 +510,10 @@ var initTasks = function(api, next){
if(typeof callback == "function"){ callback(err); }
});
}else{

process.nextTick(function(){
api.tasks.queueData[queue].push(taskId);
if(typeof callback == "function"){ callback(null); }
});
}
});
}
Expand All @@ -489,84 +524,96 @@ var initTasks = function(api, next){
if(typeof callback == "function"){ callback(err, length); }
});
}else{

process.nextTick(function(){
if(typeof callback == "function"){ callback(null, api.tasks.queueData[queue].length); }
});
}
}

api.tasks.removeFromQueue = function(api, taskId, queue, callback){
if(api.redis.enable === true){
api.redis.client.lrem(queue, 1, taskId, function(err, count){
api.tasks.clearTaskData(api, taskId, function(err){
api.tasks.clearTaskData(api, taskId, function(err){
if(api.redis.enable === true){
api.redis.client.lrem(queue, 1, taskId, function(err, count){
if(typeof callback == "function"){ callback(err, count); }
});
});
}else{

}
}else{
var queueData = api.tasks.queueData[queue];
for(var i in queueData){
if(queueData[i].id == taskId){
queueData.splice(i,1);
break;
}
}
if(typeof callback == "function"){ callback(null, 0); }
}
});
}

api.tasks.changeQueue = function(api, startQueue, endQueue, callback){
api.tasks.popFromQueue = function(api, queue, callback){
if(api.redis.enable === true){
// TODO: never have an instant where there is not a taskID within a queue
api.redis.client.lpop(startQueue, function(err, taskIdReturned){
if(taskIdReturned == null){
callback(err, null);
}else{
api.tasks.setTaskData(api, taskIdReturned, {queue: endQueue}, function(err){
api.redis.client.rpush(endQueue, taskIdReturned, function(err){
api.tasks.getTaskData(api, taskIdReturned, function(err, data){
try{
var task = new api.task(data)
callback(err, task);
}catch(e){
api.log(e, 'red');
api.tasks.removeFromQueue(api, data.id, endQueue, function(){
callback(err, null);
});
}
});
});
});
}
api.redis.client.lpop(queue, function(err, taskIdReturned){
callback(err, taskIdReturned);
});
}else{

process.nextTick(function(){
var queueData = api.tasks.queueData[queue];
taskIdReturned = queueData.splice(0,1)[0];
if(taskIdReturned == null){ taskIdReturned = null; }
callback(null, taskIdReturned);
})
}
}

api.tasks.promoteFromDelayedQueue = function(api, callback){
if(api.redis.enable === true){
// TODO: never have an instant where there is not a taskID within a queue
api.redis.client.lpop(api.tasks.queues.delayedQueue, function(err, taskIdReturned){
if(taskIdReturned == null){
callback(err, null);
}else{
api.tasks.changeQueue = function(api, startQueue, endQueue, callback){
api.tasks.popFromQueue(api, startQueue, function(err, taskIdReturned){
if(taskIdReturned == null){
callback(err, null);
}else{
api.tasks.placeInQueue(api, taskIdReturned, endQueue, function(err){
api.tasks.getTaskData(api, taskIdReturned, function(err, data){
try{
var task = new api.task(data);
if(task.runAt < new Date().getTime()){
api.tasks.setTaskData(api, taskIdReturned, {queue: api.tasks.queues.globalQueue, state: 'pending'}, function(err){
api.redis.client.rpush(api.tasks.queues.globalQueue, taskIdReturned, function(err){
callback(err, task);
});
});
}else{
api.redis.client.rpush(api.tasks.queues.delayedQueue, taskIdReturned, function(err){
callback(err, null);
});
}
var task = new api.task(data)
callback(err, task);
}catch(e){
api.log(e, 'red');
api.tasks.removeFromQueue(api, data.id, api.tasks.queues.delayedQueue, function(){
api.tasks.removeFromQueue(api, data.id, endQueue, function(){
callback(err, null);
});
}
});
}
});
}else{
});
}
});
}

}
api.tasks.promoteFromDelayedQueue = function(api, callback){
api.tasks.popFromQueue(api, api.tasks.queues.delayedQueue, function(err, taskIdReturned){
if(taskIdReturned == null){
callback(err, null);
}else{
api.tasks.getTaskData(api, taskIdReturned, function(err, data){
try{
var task = new api.task(data);
if(task.runAt < new Date().getTime()){
api.tasks.setTaskData(api, taskIdReturned, {state: 'pending'}, function(err){
api.tasks.placeInQueue(api, taskIdReturned, api.tasks.queues.globalQueue, function(err){
callback(err, task);
});
});
}else{
api.tasks.placeInQueue(api, taskIdReturned, api.tasks.queues.delayedQueue, function(err){
callback(err, null);
});
}
}catch(e){
api.log(e, 'red');
api.tasks.removeFromQueue(api, data.id, api.tasks.queues.delayedQueue, function(){
callback(err, null);
});
}
});
}
});
}

api.tasks.seedPeriodicTasks = function(callback){
Expand All @@ -580,9 +627,15 @@ var initTasks = function(api, next){
if(taskTemplate.frequency > 0){
var task = new api.task({name: taskTemplate.name});
task.enqueue(function(err, resp){
if(err != null){ api.log(String(err).replace('Error: ', ""), 'yellow'); }
started--;
if(started == 0){ callback(); }
if(err != null){
api.log(String(err).replace('Error: ', ""), 'yellow');
}else{
api.log("seeded preiodoc task " + task.name, "yellow");
}
process.nextTick(function(){
started--;
if(started == 0){ callback(); }
})
});
}else{
process.nextTick(function(){
Expand All @@ -595,7 +648,14 @@ var initTasks = function(api, next){
}

api.tasks.savePreviouslyCrashedTasks = function(callback){

api.tasks.changeQueue(api, api.tasks.queues.processingQueue, api.tasks.queues.globalQueue, function(err, task){
if(task != null){
api.log('restarting a previously interupted/crashed task ' + task.name, 'yellow');
api.tasks.savePreviouslyCrashedTasks(callback);
}else{
callback();
}
});
}

api.tasks.load = function(api){
Expand Down

0 comments on commit 9b8074f

Please sign in to comment.