Skip to content

Commit

Permalink
Adding new guard against the owner having changed
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Raynor committed Dec 20, 2016
1 parent eb9ff4b commit d07e213
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 4 deletions.
7 changes: 5 additions & 2 deletions src/lib/queue_worker.js
Expand Up @@ -111,9 +111,12 @@ QueueWorker.prototype._resetTask = function(taskRef, immediate, deferred) {
if (_.isNull(task)) {
return task;
}
var id = self.processId + ':' + self.taskNumber;
var correctState = (task._state === self.inProgressState);
var timedOut = self.taskTimeout && (Date.now() - task._state_changed > self.taskTimeout);
if (correctState && (immediate || timedOut)) {
var correctOwner = (task._owner === id || !immediate);
var timeSinceUpdate = Date.now() - _.get(task, '_state_changed', 0);
var timedOut = ((self.taskTimeout && timeSinceUpdate > self.taskTimeout) || immediate);
if (correctState && correctOwner && timedOut) {
task._state = self.startState;
task._state_changed = SERVER_TIMESTAMP;
task._owner = null;
Expand Down
31 changes: 29 additions & 2 deletions test/lib/queue_worker.spec.js
Expand Up @@ -105,7 +105,7 @@ describe('QueueWorker', function() {
testRef = tasksRef.push({
'_state': th.validBasicTaskSpec.inProgressState,
'_state_changed': new Date().getTime(),
'_owner': 'someone',
'_owner': qw.processId + ':' + qw.taskNumber,
'_progress': 10
}, function(errorA) {
if (errorA) {
Expand All @@ -131,13 +131,40 @@ describe('QueueWorker', function() {
});
});

it('should not reset a task if immediate set but no longer owned by current worker', function(done) {
qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop);
qw.setTaskSpec(th.validBasicTaskSpec);
var originalTask = {
'_state': th.validBasicTaskSpec.inProgressState,
'_state_changed': new Date().getTime(),
'_owner': 'someone-else',
'_progress': 0
};
testRef = tasksRef.push(originalTask, function(errorA) {
if (errorA) {
return done(errorA);
}
qw.currentTaskRef = testRef;
return qw._resetTask(testRef, true).then(function() {
testRef.once('value', function(snapshot) {
try {
expect(snapshot.val()).to.deep.equal(originalTask);
done();
} catch (errorB) {
done(errorB);
}
});
}).catch(done);
});
});

it('should not reset a task if immediate not set and it is has changed state recently', function(done) {
qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop);
qw.setTaskSpec(th.validBasicTaskSpec);
var originalTask = {
'_state': th.validBasicTaskSpec.inProgressState,
'_state_changed': new Date().getTime(),
'_owner': qw.processId + ':' + qw.taskNumber,
'_owner': 'someone',
'_progress': 0
};
testRef = tasksRef.push(originalTask, function(errorA) {
Expand Down

0 comments on commit d07e213

Please sign in to comment.