Skip to content

Commit

Permalink
Fixing a bug where all workers would reset a task on timeout, regardl…
Browse files Browse the repository at this point in the history
…ess of whether another worker had
  • Loading branch information
Chris Raynor committed Dec 20, 2016
1 parent 83fba00 commit eb9ff4b
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 10 deletions.
14 changes: 8 additions & 6 deletions src/lib/queue_worker.js
Expand Up @@ -97,7 +97,7 @@ QueueWorker.prototype._getLogEntry = function(message) {
* reference to the Firebase location of the task that's timed out.
* @returns {RSVP.Promise} Whether the task was able to be reset.
*/
QueueWorker.prototype._resetTask = function(taskRef, deferred) {
QueueWorker.prototype._resetTask = function(taskRef, immediate, deferred) {
var self = this;
var retries = 0;

Expand All @@ -111,7 +111,9 @@ QueueWorker.prototype._resetTask = function(taskRef, deferred) {
if (_.isNull(task)) {
return task;
}
if (task._state === self.inProgressState) {
var correctState = (task._state === self.inProgressState);
var timedOut = self.taskTimeout && (Date.now() - task._state_changed > self.taskTimeout);
if (correctState && (immediate || timedOut)) {
task._state = self.startState;
task._state_changed = SERVER_TIMESTAMP;
task._owner = null;
Expand All @@ -125,7 +127,7 @@ QueueWorker.prototype._resetTask = function(taskRef, deferred) {
if (error) {
if (++retries < MAX_TRANSACTION_ATTEMPTS) {
logger.debug(self._getLogEntry('reset task errored, retrying'), error);
setImmediate(self._resetTask.bind(self), taskRef, deferred);
setImmediate(self._resetTask.bind(self), taskRef, immediate, deferred);
} else {
var errorMsg = 'reset task errored too many times, no longer retrying';
logger.debug(self._getLogEntry(errorMsg), error);
Expand Down Expand Up @@ -491,7 +493,7 @@ QueueWorker.prototype._tryToProcess = function(deferred) {
if (self.busy) {
// Worker has become busy while the transaction was processing
// so give up the task for now so another worker can claim it
self._resetTask(nextTaskRef);
self._resetTask(nextTaskRef, true);
} else {
self.busy = true;
self.taskNumber += 1;
Expand Down Expand Up @@ -592,7 +594,7 @@ QueueWorker.prototype._setUpTimeouts = function() {
self.expiryTimeouts[taskName] = setTimeout(
self._resetTask.bind(self),
expires,
ref);
ref, false);
};

self.processingTaskAddedListener = self.processingTasksRef.on('child_added',
Expand Down Expand Up @@ -704,7 +706,7 @@ QueueWorker.prototype.setTaskSpec = function(taskSpec) {
self.currentTaskRef.child('_owner').off(
'value',
self.currentTaskListener);
self._resetTask(self.currentTaskRef);
self._resetTask(self.currentTaskRef, true);
self.currentTaskRef = null;
self.currentTaskListener = null;
}
Expand Down
67 changes: 63 additions & 4 deletions test/lib/queue_worker.spec.js
Expand Up @@ -116,7 +116,66 @@ describe('QueueWorker', function() {
return testRef.on('value', function(snapshot) {
if (initial) {
initial = false;
qw._resetTask(testRef);
qw._resetTask(testRef, true);
} else {
try {
var task = snapshot.val();
expect(task).to.have.all.keys(['_state_changed']);
expect(task._state_changed).to.be.closeTo(new Date().getTime() + th.offset, 250);
done();
} catch (errorB) {
done(errorB);
}
}
});
});
});

it('should not reset a task if immediate not set and it is has changed state recently', function(done) {
qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop);
qw.setTaskSpec(th.validBasicTaskSpec);
var originalTask = {
'_state': th.validBasicTaskSpec.inProgressState,
'_state_changed': new Date().getTime(),
'_owner': qw.processId + ':' + qw.taskNumber,
'_progress': 0
};
testRef = tasksRef.push(originalTask, function(errorA) {
if (errorA) {
return done(errorA);
}
qw.currentTaskRef = testRef;
return qw._resetTask(testRef, false).then(function() {
testRef.once('value', function(snapshot) {
try {
expect(snapshot.val()).to.deep.equal(originalTask);
done();
} catch (errorB) {
done(errorB);
}
});
}).catch(done);
});
});

it('should reset a task that is currently in progress that has timed out', function(done) {
qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop);
qw.setTaskSpec(th.validTaskSpecWithTimeout);
testRef = tasksRef.push({
'_state': th.validBasicTaskSpec.inProgressState,
'_state_changed': new Date().getTime() - th.validTaskSpecWithTimeout.timeout,
'_owner': 'someone',
'_progress': 10
}, function(errorA) {
if (errorA) {
return done(errorA);
}
qw.currentTaskRef = testRef;
var initial = true;
return testRef.on('value', function(snapshot) {
if (initial) {
initial = false;
qw._resetTask(testRef, false);
} else {
try {
var task = snapshot.val();
Expand All @@ -137,7 +196,7 @@ describe('QueueWorker', function() {

testRef = tasksRef.push();
qw.currentTaskRef = testRef;
qw._resetTask(testRef).then(function() {
qw._resetTask(testRef, true).then(function() {
testRef.once('value', function(snapshot) {
try {
expect(snapshot.val()).to.be.null;
Expand All @@ -163,7 +222,7 @@ describe('QueueWorker', function() {
return done(errorA);
}
qw.currentTaskRef = testRef;
return qw._resetTask(testRef).then(function() {
return qw._resetTask(testRef, true).then(function() {
testRef.once('value', function(snapshot) {
try {
expect(snapshot.val()).to.deep.equal(originalTask);
Expand All @@ -189,7 +248,7 @@ describe('QueueWorker', function() {
return done(errorA);
}
qw.currentTaskRef = testRef;
return qw._resetTask(testRef).then(function() {
return qw._resetTask(testRef, true).then(function() {
testRef.once('value', function(snapshot) {
try {
expect(snapshot.val()).to.deep.equal(originalTask);
Expand Down

0 comments on commit eb9ff4b

Please sign in to comment.