Skip to content

Commit

Permalink
Merge b3e2400 into 06828f7
Browse files Browse the repository at this point in the history
  • Loading branch information
pengz1 committed Oct 26, 2016
2 parents 06828f7 + b3e2400 commit 2e2133c
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 21 deletions.
73 changes: 69 additions & 4 deletions lib/task-runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ di.annotate(taskRunnerFactory,
'Rx',
'Task.Task',
'Task.Messenger',
'TaskGraph.Store'
'TaskGraph.Store',
'TaskGraph.TaskGraph'
)
);

Expand All @@ -30,7 +31,8 @@ function taskRunnerFactory(
Rx,
Task,
taskMessenger,
store
store,
TaskGraph
) {
var logger = Logger.initialize(taskRunnerFactory);

Expand Down Expand Up @@ -337,6 +339,7 @@ function taskRunnerFactory(
})
.tap(function(task) {
self.activeTasks[task.instanceId] = task;
self.publishTaskStarted(task);
logger.info("Running task ", {
taskRunnerId: self.taskRunnerId,
taskId: task.instanceId,
Expand All @@ -363,11 +366,48 @@ function taskRunnerFactory(
.tap(function(task) {
delete self.activeTasks[task.instanceId];
})
.tap(self.publishTaskFinished.bind(self))
.tap(function(task) { return self.publishTaskFinished(task); })
.map(function(task) { return _.pick(task, ['instanceId', 'state']); })
.catch(self.handleStreamError.bind(self, 'error while running task'));
};

/**
* Publishes a task finished event over AMQP
*
* @param {Object} task
* @returns {Promise}
* @memberOf TaskRunner
*/
TaskRunner.prototype.publishTaskStarted = function(task) {
var progressData, taskFriendlyName;
taskFriendlyName = task.definition.friendlyName;
progressData = {
graphId: task.context.graphId,
progress: {
percentage: null,
description: 'Task "' + taskFriendlyName + '" started',
},
taskProgress: {
graphId: task.context.graphId,
taskId: task.instanceId,
taskName: taskFriendlyName,
progress: {
percentage: "0%",
description: "Task started"
}
}
};

return TaskGraph.updateGraphProgress(progressData)
.catch(function(error) {
logger.error('Error publishing progress event when task started', {
taskId: task.instanceId,
graphId: task.context.graphId,
error: error
});
});
};

/**
* Publishes a task finished event over AMQP
*
Expand All @@ -376,12 +416,34 @@ function taskRunnerFactory(
* @memberOf TaskRunner
*/
TaskRunner.prototype.publishTaskFinished = function(task) {
var errorMsg;
var errorMsg, progressData, taskFriendlyName;
taskFriendlyName = task.definition.friendlyName;
progressData = {
graphId: task.context.graphId,
progress: {
percentage: null,
description: 'Task "' + taskFriendlyName + '" finished',
},
taskProgress: {
graphId: task.context.graphId,
taskId: task.instanceId,
taskName: taskFriendlyName,
progress: {
percentage: "100%",
description: "Task finished"
}
}
};
if (task.error && task.error.stack) {
errorMsg = task.error.stack;
} else if (task.error) {
errorMsg = task.error.toString();
}

if (errorMsg) {
progressData.progress.description += " with error";
}

return taskMessenger.publishTaskFinished(
this.domain,
task.instanceId,
Expand All @@ -391,6 +453,9 @@ function taskRunnerFactory(
task.context,
task.definition.terminalOnStates
)
.tap(function(){
return TaskGraph.updateGraphProgress(progressData);
})
.catch(function(error) {
logger.error('Error publishing task finished event', {
taskId: task.instanceId,
Expand Down
94 changes: 77 additions & 17 deletions spec/lib/task-runner-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ describe("Task Runner", function() {
},
TaskRunner,
taskMessenger = {},
mockTaskGraph = {
updateGraphProgress: function () {}
},
TaskGraph,
store = {
checkoutTask: function(){},
getTaskById: function(){},
Expand Down Expand Up @@ -61,13 +65,15 @@ describe("Task Runner", function() {
require('../../lib/task-runner.js'),
helper.di.simpleWrapper(taskMessenger, 'Task.Messengers.AMQP'),
helper.di.simpleWrapper(Task, 'Task.Task'),
helper.di.simpleWrapper(store, 'TaskGraph.Store')
helper.di.simpleWrapper(store, 'TaskGraph.Store'),
helper.di.simpleWrapper(mockTaskGraph, 'TaskGraph.TaskGraph')
]);
Rx = helper.injector.get('Rx');
Promise = helper.injector.get('Promise');
Constants = helper.injector.get('Constants');
assert = helper.injector.get('Assert');
TaskRunner = helper.injector.get('TaskGraph.TaskRunner');
TaskGraph = helper.injector.get('TaskGraph.TaskGraph');
this.sandbox = sinon.sandbox.create();
});

Expand Down Expand Up @@ -376,20 +382,36 @@ describe("Task Runner", function() {
});

describe('publishTaskFinished', function() {
var progress,finishedTask ;
before(function() {
this.sandbox.restore();
runner = TaskRunner.create();
});

it("should wrap the taskMessenger's publishTaskFinished", function() {
taskMessenger.publishTaskFinished = this.sandbox.stub().resolves();
var finishedTask = {
finishedTask = {
instanceId: 'aTaskId',
context: { graphId: 'aGraphId'},
state: 'finished',
definition: { terminalOnStates: ['succeeded'] }
definition: { terminalOnStates: ['succeeded'], friendlyName: 'Test Task' }};
progress = {
graphId: finishedTask.context.graphId,
progress: {
percentage: null,
description: 'Task "' + finishedTask.definition.friendlyName + '" finished'
},
taskProgress: {
graphId: finishedTask.context.graphId,
taskId: finishedTask.instanceId,
taskName: finishedTask.definition.friendlyName,
progress: {
percentage: "100%",
description: "Task finished"
}
}
};
runner = TaskRunner.create();
});

it("should wrap the taskMessenger's publishTaskFinished", function() {
taskMessenger.publishTaskFinished = this.sandbox.stub().resolves();
this.sandbox.stub(TaskGraph, 'updateGraphProgress').resolves();
return runner.publishTaskFinished(finishedTask)
.then(function() {
expect(taskMessenger.publishTaskFinished).to.have.been.calledOnce;
Expand All @@ -402,25 +424,23 @@ describe("Task Runner", function() {
finishedTask.context,
finishedTask.definition.terminalOnStates
);
expect(TaskGraph.updateGraphProgress).to.have.been.calledOnce;
expect(TaskGraph.updateGraphProgress).to.have.been.calledWith(progress);
});
});

it("should call publishTaskFinished with an error message string", function() {
taskMessenger.publishTaskFinished = this.sandbox.stub().resolves();
var error = new Error('test error');
var finishedTask = {
taskId: 'aTaskId',
context: { graphId: 'aGraphId'},
state: 'finished',
error: error,
definition: { terminalOnStates: ['succeeded'] }
};

this.sandbox.stub(TaskGraph, 'updateGraphProgress').resolves();
finishedTask.error = new Error('test error');
progress.progress.description += " with error";
return runner.publishTaskFinished(finishedTask)
.then(function() {
expect(taskMessenger.publishTaskFinished).to.have.been.calledOnce;
expect(taskMessenger.publishTaskFinished.firstCall.args[4])
.to.contain('test error');
expect(TaskGraph.updateGraphProgress).to.have.been.calledOnce;
expect(TaskGraph.updateGraphProgress).to.have.been.calledWith(progress);
});
});

Expand All @@ -444,6 +464,45 @@ describe("Task Runner", function() {
});
});

describe('publishTaskStarted', function() {
var startedTask, progress;
before(function() {
startedTask = {
instanceId: 'aTaskId',
context: { graphId: 'aGraphId'},
state: 'Running',
definition: { terminalOnStates: ['succeeded'], friendlyName: 'Test Task' }};
progress = {
graphId: startedTask.context.graphId,
progress: {
percentage: null,
description: 'Task "' + startedTask.definition.friendlyName + '" started',
},
taskProgress: {
graphId: startedTask.context.graphId,
taskId: startedTask.instanceId,
taskName: startedTask.definition.friendlyName,
progress: {
percentage: "0%",
description: "Task started"
}
}
};
this.sandbox.restore();
runner = TaskRunner.create();
});

it("should wrap the taskMessenger's publishTaskStarted", function() {
this.sandbox.stub(TaskGraph, 'updateGraphProgress').resolves();
return runner.publishTaskStarted(startedTask)
.then(function(){
expect(TaskGraph.updateGraphProgress).to.be.calledOnce;
expect(TaskGraph.updateGraphProgress).to.be.calledWith(progress);
});
});

});

describe('task cancellation', function() {

beforeEach(function() {
Expand Down Expand Up @@ -509,6 +568,7 @@ describe("Task Runner", function() {
this.sandbox.stub(Task, 'create').resolves(stubbedTask);
store.setTaskState = this.sandbox.stub().resolves();
this.sandbox.stub(runner, 'publishTaskFinished');
this.sandbox.stub(runner, 'publishTaskStarted');
});

it('should return an Observable', function() {
Expand Down

0 comments on commit 2e2133c

Please sign in to comment.