Skip to content

Commit

Permalink
Merge pull request #214 from lanchongyizu/feature/workflow_progress
Browse files Browse the repository at this point in the history
workflow level progress
  • Loading branch information
yyscamper committed Feb 21, 2017
2 parents 6d0d7ab + 6d11cab commit 04627b6
Show file tree
Hide file tree
Showing 4 changed files with 297 additions and 96 deletions.
74 changes: 25 additions & 49 deletions lib/task-runner.js
@@ -1,4 +1,4 @@
// Copyright 2016, EMC, Inc.
// Copyright © 2016-2017 Dell Inc. or its subsidiaries. All Rights Reserved.

'use strict';

Expand All @@ -9,6 +9,8 @@ di.annotate(taskRunnerFactory,
new di.Inject(
'Logger',
'Promise',
'GraphProgress',
'Protocol.Events',
'Constants',
'Assert',
'uuid',
Expand All @@ -23,6 +25,8 @@ di.annotate(taskRunnerFactory,
function taskRunnerFactory(
Logger,
Promise,
GraphProgress,
eventsProtocol,
Constants,
assert,
uuid,
Expand Down Expand Up @@ -58,7 +62,6 @@ function taskRunnerFactory(
this.activeTasks = {};
this.lostBeatLimit = options.lostBeatLimit || 3;
this.domain = options.domain || Constants.Task.DefaultDomain;
this.maxTaskProgress = options.totalSteps || 100;
}

/**
Expand Down Expand Up @@ -338,7 +341,7 @@ function taskRunnerFactory(
})
.tap(function(task) {
self.activeTasks[task.instanceId] = task;
self.publishTaskStarted(task);
self.publishTaskStartedProgressEvent(task);
logger.info("Running task ", {
taskRunnerId: self.taskRunnerId,
taskId: task.instanceId,
Expand Down Expand Up @@ -371,32 +374,30 @@ function taskRunnerFactory(
};

/**
* Publishes a task finished event over AMQP
* Publishes a task started event over AMQP
*
* @param {Object} task
* @returns {Promise}
* @memberOf TaskRunner
*/
TaskRunner.prototype.publishTaskStarted = function(task) {
var taskFriendlyName = task.definition.friendlyName;
var progressData = {
progress: {
value: null,
maximum: null,
description: 'Task "' + taskFriendlyName + '" started',
},
taskProgress: {
taskId: task.instanceId,
taskName: taskFriendlyName,
progress: {
value: 0,
maximum: this.maxTaskProgress,
description: "Task started"
}
}
};

return taskMessenger.publishProgressEvent(task.context.graphId, progressData)
TaskRunner.prototype.publishTaskStartedProgressEvent = function(task) {
return store.getGraphById(task.context.graphId)
.then(function(graph) {
var taskFriendlyName = task.definition.friendlyName;
var graphDescription = 'Task "' + taskFriendlyName + '" started';
var taskId = task.instanceId;
var options = task.options || {};
var taskProgress = {
value: 0,
maximum: options.totalSteps || Constants.Progress.DefaultTaskProgressMaximum,
description: "Task started"
};

var progress = GraphProgress.create(graph, graphDescription);
progress.updateTaskProgress(taskId, taskProgress, true);
return eventsProtocol.publishProgressEvent(graph.instanceId,
progress.getProgressEventData());
})
.catch(function(error) {
logger.error('Error publishing progress event when task started', {
taskId: task.instanceId,
Expand All @@ -415,33 +416,11 @@ function taskRunnerFactory(
*/
TaskRunner.prototype.publishTaskFinished = function(task) {
var errorMsg;
var taskFriendlyName = task.definition.friendlyName;
var progressData = {
progress: {
value: null,
maximum: null,
description: 'Task "' + taskFriendlyName + '" finished',
},
taskProgress: {
taskId: task.instanceId,
taskName: taskFriendlyName,
progress: {
value: this.maxTaskProgress,
maximum: this.maxTaskProgress,
description: "Task finished"
}
}
};
if (task.error && task.error.stack) {
errorMsg = task.error.stack;
} else if (task.error) {
errorMsg = task.error.toString();
}

if (errorMsg) {
progressData.progress.description += " with error";
}

return taskMessenger.publishTaskFinished(
this.domain,
task.instanceId,
Expand All @@ -451,9 +430,6 @@ function taskRunnerFactory(
task.context,
task.definition.terminalOnStates
)
.tap(function(){
return taskMessenger.publishProgressEvent(task.context.graphId, progressData);
})
.catch(function(error) {
logger.error('Error publishing task finished event', {
taskId: task.instanceId,
Expand Down
87 changes: 86 additions & 1 deletion lib/task-scheduler.js
@@ -1,4 +1,4 @@
// Copyright 2016, EMC, Inc.
// Copyright © 2016-2017 Dell Inc. or its subsidiaries. All Rights Reserved.

'use strict';

Expand All @@ -19,6 +19,7 @@ di.annotate(taskSchedulerFactory,
'Assert',
'_',
'Rx.Mixins',
'GraphProgress',
'Task.Messenger'
)
);
Expand All @@ -35,6 +36,7 @@ function taskSchedulerFactory(
assert,
_,
Rx,
GraphProgress,
taskMessenger
) {
var logger = Logger.initialize(taskSchedulerFactory);
Expand Down Expand Up @@ -210,6 +212,9 @@ function taskSchedulerFactory(
.tap(self.handleStreamDebug.bind(self, 'Received evaluate task event'))
.map(self.updateTaskDependencies.bind(self))
.mergeLossy(self.concurrencyMaximums.updateTaskDependencies)
.tap(function(task){
return self.publishTaskFinishedProgressEvent(task);
})
.tap(function(task) {
var _task = _.pick(task, ['domain', 'graphId', 'taskId']);
self.handleStreamDebug('Updated dependencies for task', _task);
Expand All @@ -218,6 +223,53 @@ function taskSchedulerFactory(
.map(self.handleEvaluatedTask.bind(self, checkGraphFinishedStream, evaluateGraphStream));
};

/**
* Publish a task finished progress event with the messenger.
*
* @param {Object} task
* @param {String} task.graphId - the unique ID of the graph to which the task belongs
* @param {String} task.taskId - the unique ID of the task
* @returns {Promise}
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.publishTaskFinishedProgressEvent = function(task) {
return Promise.try(function() {
assert.object(task, 'task');
assert.uuid(task.graphId, 'task.graphId');
assert.uuid(task.taskId, 'task.taskId');
})
.then(function() {
return store.getGraphById(task.graphId);
})
.then(function(graph) {
var _task = graph.tasks[task.taskId];
var taskFriendlyName = _task.friendlyName;
var taskOptions = _task.options || {};
var taskProgress = {
value: taskOptions.totalStep || Constants.Progress.DefaultTaskProgressMaximum,
maximum: taskOptions.totalStep || Constants.Progress.DefaultTaskProgressMaximum,
description: "Task finished"
};

var graphDescription = 'Task "' + taskFriendlyName + '" finished';
if (_task.error) {
graphDescription += " with error";
}

var progress = GraphProgress.create(graph, graphDescription);
progress.updateTaskProgress(task.taskId, taskProgress, true);
return eventsProtocol.publishProgressEvent(graph.instanceId,
progress.getProgressEventData());
})
.catch(function(error) {
logger.error('Error publishing task finished progress event', {
taskId: task.taskId,
graphId: task.graphId,
error: error
});
});
};

/**
* Once a task has finished and been evaluated (dependendent tasks updated)
* then check if the task is terminal to determine whether the graph is potentially
Expand Down Expand Up @@ -330,6 +382,9 @@ function taskSchedulerFactory(
.filter(function(_data) { return _data.done; })
.flatMap(store.setGraphDone.bind(store, Constants.Task.States.Succeeded))
.filter(function(graph) { return !_.isEmpty(graph); })
.tap(function(graph) {
return self.publishGraphFinishedProgressEvent(graph, Constants.Task.States.Succeeded);
})
.map(function(graph) { return _.pick(graph, ['instanceId', '_status', 'node']); })
.tap(self.publishGraphFinished.bind(self))
.catch(self.handleStreamError.bind(self, 'Error handling graph done event'));
Expand All @@ -345,9 +400,13 @@ function taskSchedulerFactory(
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.failGraph = function(data, graphState) {
var self = this;
return Rx.Observable.just(data.graphId)
.flatMap(store.getActiveGraphById)
.filter(function(graph) {return !_.isEmpty(graph);})
.tap(function(graph) {
return self.publishGraphFinishedProgressEvent(graph, graphState);
})
.map(function(doneGraph) {
return _.map(doneGraph.tasks, function(taskObj) {
if(taskObj.state === Constants.Task.States.Pending) {
Expand Down Expand Up @@ -549,6 +608,32 @@ function taskSchedulerFactory(
});
};

/**
* Publish a graph finished progress event with the messenger.
*
* @param {Object} graph
* @param {String} graphState
* @returns {Promise}
* @memberOf TaskScheduler
*/
TaskScheduler.prototype.publishGraphFinishedProgressEvent = function(graph, graphState) {
return Promise.try(function() {
assert.object(graph, 'graph');
assert.string(graph.name , 'graph.name');
var graphDescription = 'Graph "' + graph.name + '" ' + graphState;
var progress = GraphProgress.create(graph, graphDescription);
return eventsProtocol.publishProgressEvent(graph.instanceId,
progress.getProgressEventData());
})
.catch(function(error) {
logger.error('Error publishing graph finished progress event', {
graphId: graph.instanceId,
_status: graph._status,
error: error
});
});
};

/**
* Publish a run task event with the messenger, to be picked up by any task runners
* within the domain.
Expand Down

0 comments on commit 04627b6

Please sign in to comment.