diff --git a/src/lib/queue_worker.js b/src/lib/queue_worker.js index f93da58..e365f7f 100644 --- a/src/lib/queue_worker.js +++ b/src/lib/queue_worker.js @@ -111,9 +111,12 @@ QueueWorker.prototype._resetTask = function(taskRef, immediate, deferred) { if (_.isNull(task)) { return task; } + var id = self.processId + ':' + self.taskNumber; var correctState = (task._state === self.inProgressState); - var timedOut = self.taskTimeout && (Date.now() - task._state_changed > self.taskTimeout); - if (correctState && (immediate || timedOut)) { + var correctOwner = (task._owner === id || !immediate); + var timeSinceUpdate = Date.now() - _.get(task, '_state_changed', 0); + var timedOut = ((self.taskTimeout && timeSinceUpdate > self.taskTimeout) || immediate); + if (correctState && correctOwner && timedOut) { task._state = self.startState; task._state_changed = SERVER_TIMESTAMP; task._owner = null; diff --git a/test/lib/queue_worker.spec.js b/test/lib/queue_worker.spec.js index e102d13..540f0f9 100644 --- a/test/lib/queue_worker.spec.js +++ b/test/lib/queue_worker.spec.js @@ -105,7 +105,7 @@ describe('QueueWorker', function() { testRef = tasksRef.push({ '_state': th.validBasicTaskSpec.inProgressState, '_state_changed': new Date().getTime(), - '_owner': 'someone', + '_owner': qw.processId + ':' + qw.taskNumber, '_progress': 10 }, function(errorA) { if (errorA) { @@ -131,13 +131,40 @@ describe('QueueWorker', function() { }); }); + it('should not reset a task if immediate set but no longer owned by current worker', function(done) { + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); + qw.setTaskSpec(th.validBasicTaskSpec); + var originalTask = { + '_state': th.validBasicTaskSpec.inProgressState, + '_state_changed': new Date().getTime(), + '_owner': 'someone-else', + '_progress': 0 + }; + testRef = tasksRef.push(originalTask, function(errorA) { + if (errorA) { + return done(errorA); + } + qw.currentTaskRef = testRef; + return qw._resetTask(testRef, true).then(function() { + testRef.once('value', function(snapshot) { + try { + expect(snapshot.val()).to.deep.equal(originalTask); + done(); + } catch (errorB) { + done(errorB); + } + }); + }).catch(done); + }); + }); + it('should not reset a task if immediate not set and it is has changed state recently', function(done) { qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); qw.setTaskSpec(th.validBasicTaskSpec); var originalTask = { '_state': th.validBasicTaskSpec.inProgressState, '_state_changed': new Date().getTime(), - '_owner': qw.processId + ':' + qw.taskNumber, + '_owner': 'someone', '_progress': 0 }; testRef = tasksRef.push(originalTask, function(errorA) {