Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion server/app/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ io.sockets.on('connection', function(socket) {
var cronTabManager = require('_pr/cronjobs');
cronTabManager.start();
catalystSync.executeScheduledInstances();
catalystSync.executeScheduledTasks();
catalystSync.executeSerialScheduledTasks();
catalystSync.executeParallelScheduledTasks();
server.listen(app.get('port'), function() {
logger.debug('Express server listening on port ' + app.get('port'));
});
89 changes: 56 additions & 33 deletions server/app/cronjobs/catalyst-scheduler/catalystScheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ var taskDao = require('_pr/model/classes/tasks/tasks.js');
var schedulerService = require('_pr/services/schedulerService');
var async = require('async');
var cronTab = require('node-crontab');

var catalystSync = module.exports = {};

catalystSync.executeScheduledInstances = function executeScheduledInstances() {
Expand Down Expand Up @@ -40,14 +39,14 @@ catalystSync.executeScheduledInstances = function executeScheduledInstances() {
});
}

catalystSync.executeScheduledTasks = function executeScheduledTasks() {
catalystSync.executeParallelScheduledTasks = function executeParallelScheduledTasks() {
taskDao.getScheduledTasks(function(err, tasks) {
if (err) {
logger.error("Failed to fetch tasks: ", err);
return;
}
if (tasks && tasks.length) {
var resultList =[],parallelTaskList=[],serialTaskList=[];
var resultList =[],parallelTaskList=[];
for (var i = 0; i < tasks.length; i++) {
(function(task) {
if(task.cronJobId && task.cronJobId !== null){
Expand All @@ -58,53 +57,77 @@ catalystSync.executeScheduledTasks = function executeScheduledTasks() {
parallelTaskList.push(function(callback){schedulerService.executeSchedulerForTasks(task,callback);});
}else{
resultList.push(function(callback){schedulerService.executeSchedulerForTasks(task,callback);});
}
if(resultList.length === tasks.length){
async.parallel(parallelTaskList,function(err,results){
if(err){
logger.error(err);
return;
}
logger.debug("Task Scheduler Completed for Parallel");
return;
})
}
})(tasks[i]);
}
}else{
logger.debug("There is no Parallel scheduled Task right now.");
return;
}
});
}

catalystSync.executeSerialScheduledTasks = function executeSerialScheduledTasks() {
taskDao.getScheduledTasks(function(err, tasks) {
if (err) {
logger.error("Failed to fetch tasks: ", err);
return;
}
if (tasks && tasks.length) {
var resultList =[],serialTaskList=[];
for (var i = 0; i < tasks.length; i++) {
(function(task) {
if(task.cronJobId && task.cronJobId !== null){
cronTab.cancelJob(task.cronJobId);
}
if(task.executionOrder === 'SERIAL'){
resultList.push(function(callback){schedulerService.executeSchedulerForTasks(task,callback);});
if(serialTaskList.length ===0) {
serialTaskList.push(function (next) {
schedulerService.executeSchedulerForTasks(task, next);
});
}else{
serialTaskList.push(function (execution,next) {
serialTaskList.push(function (cronJobId,next) {
cronTab.cancelJob(cronJobId);
schedulerService.executeSchedulerForTasks(task, next);
});
}
resultList.push(function(callback){schedulerService.executeSchedulerForTasks(task,callback);});
}else{
resultList.push(function(callback){schedulerService.executeSchedulerForTasks(task,callback);});
}
if(resultList.length === tasks.length){
async.parallel({
parallelTask: function(callback){
async.parallel(parallelTaskList,function(err,data){
if(err){
callback(err,null);
return;
}
logger.debug("Parallel Task Scheduler Completed");
callback(null,data);
return;
})
},
serialTask: function(callback){
async.waterfall(serialTaskList,function(err,data){
if(err){
callback(err,null);
return;
}
logger.debug("Serial Task Scheduler Completed");
callback(null,data);
if(serialTaskList.length > 0) {
async.waterfall(serialTaskList, function (err, data) {
if (err) {
logger.error(err);
return;
})
}
},function(err,results){
if(err){
logger.error(err);
}
cronTab.cancelJob(data);
logger.debug("Serial Task Scheduler Completed");
var catalystScheduler = require('_pr/cronjobs/catalyst-scheduler/catalystScheduler.js');
catalystScheduler.executeSerialScheduledTasks();
return;
}
logger.debug("Task Scheduler Completed");
})
}else{
logger.debug("There is no Serial scheduled Task right now.");
return;
})
}
}
})(tasks[i]);
}
}else{
logger.debug("There is no scheduled Task right now.");
logger.debug("There is no Serial scheduled Task right now.");
return;
}
});
Expand Down
2 changes: 1 addition & 1 deletion server/app/routes/v1.0/routes_audit_trails.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ module.exports.setRoutes = function(app, sessionVerificationFunc) {
[

function(next) {
apiUtil.paginationRequest(req[0].query, 'taskLogs', next);
apiUtil.paginationRequest(req.query, 'taskLogs', next);
},
function(paginationReq, next) {
reqData = paginationReq;
Expand Down
13 changes: 11 additions & 2 deletions server/app/routes/v1.0/routes_organizations.js
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,11 @@ module.exports.setRoutes = function(app, sessionVerification) {
return;
}
if(task.isTaskScheduled === true){
catalystSync.executeScheduledTasks();
if(task.executionOrder === 'PARALLEL'){
catalystSync.executeParallelScheduledTasks();
}else{
catalystSync.executeSerialScheduledTasks();
}
};
res.send(task);
logger.debug("Exit post() for /organizations/%s/businessGroups/%s/projects/%s/environments/%s/tasks", req.params.orgId, req.params.bgId, req.params.projectId, req.params.environments);
Expand All @@ -1015,7 +1019,12 @@ module.exports.setRoutes = function(app, sessionVerification) {
return;
}
if(task.isTaskScheduled === true){
catalystSync.executeScheduledTasks();
if(task.executionOrder === 'PARALLEL'){
catalystSync.executeParallelScheduledTasks();
}else{
catalystSync.executeSerialScheduledTasks();
}

};
res.send(task);
logger.debug("Exit post() for /organizations/%s/businessGroups/%s/projects/%s/environments/%s/tasks", req.params.orgId, req.params.bgId, req.params.projectId, req.params.environments);
Expand Down
38 changes: 23 additions & 15 deletions server/app/routes/v1.0/routes_tasks.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,19 @@ var fileIo = require('_pr/lib/utils/fileio');
module.exports.setRoutes = function(app, sessionVerification) {
app.all('/tasks/*', sessionVerification);

app.delete('/tasks/serviceDelivery/:taskId', function(req, res) {
Tasks.removeServiceDeliveryTask(req.params.taskId, function(err, data) {
if (err) {
logger.error("Failed to delete service delivery Task", err);
res.send(500, errorResponses.db.error);
return;
}
res.send(200, {
message: "deleted"
});
});
});

app.get('/tasks/history/list/all', function(req, res) {
TaskHistory.listHistory(function(err, tHistories) {
if (err) {
Expand Down Expand Up @@ -96,19 +109,6 @@ module.exports.setRoutes = function(app, sessionVerification) {
});
});

app.delete('/tasks/serviceDelivery/:taskId', function(req, res) {
Tasks.removeServiceDeliveryTask(req.params.taskId, function(err, data) {
if (err) {
logger.error("Failed to delete service delivery Task", err);
res.send(500, errorResponses.db.error);
return;
}
res.send(200, {
message: "deleted"
});
});
});


app.post('/tasks/:taskId/run', function(req, res) {

Expand Down Expand Up @@ -543,7 +543,11 @@ module.exports.setRoutes = function(app, sessionVerification) {
}
if (updateCount) {
if(taskData.isTaskScheduled === true){
catalystSync.executeScheduledTasks();
if(task.executionOrder === 'PARALLEL'){
catalystSync.executeParallelScheduledTasks();
}else{
catalystSync.executeSerialScheduledTasks();
}
};
res.send({
updateCount: updateCount
Expand All @@ -564,7 +568,11 @@ module.exports.setRoutes = function(app, sessionVerification) {
}
if (updateCount) {
if(taskData.isTaskScheduled === true){
catalystSync.executeScheduledTasks();
if(task.executionOrder === 'PARALLEL'){
catalystSync.executeParallelScheduledTasks();
}else{
catalystSync.executeSerialScheduledTasks();
}
};
res.send({
updateCount: updateCount
Expand Down
3 changes: 1 addition & 2 deletions server/app/services/schedulerService.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ var EC2 = require('_pr/lib/ec2.js');
var AWSKeyPair = require('_pr/model/classes/masters/cloudprovider/keyPair.js');
var appConfig = require('_pr/config');
var Cryptography = require('../lib/utils/cryptography');
var catalystSync = null;
var vmWareProvider = require('_pr/model/classes/masters/cloudprovider/vmwareCloudProvider.js');
var vmWare = require('_pr/lib/vmware');
var azureProvider = require('_pr/model/classes/masters/cloudprovider/azureCloudProvider.js');
Expand Down Expand Up @@ -133,7 +132,7 @@ schedulerService.executeSchedulerForTasks = function executeSchedulerForTasks(ta
return;
}
logger.debug("Task Execution Success: ", task.name);
callback(null,historyData);
callback(null,cronJobId);
return;
});
});
Expand Down