Skip to content

Commit

Permalink
Merge d07e213 into 83fba00
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Raynor committed Dec 20, 2016
2 parents 83fba00 + d07e213 commit 3bd38d9
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 10 deletions.
17 changes: 11 additions & 6 deletions src/lib/queue_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ QueueWorker.prototype._getLogEntry = function(message) {
* reference to the Firebase location of the task that's timed out.
* @returns {RSVP.Promise} Whether the task was able to be reset.
*/
QueueWorker.prototype._resetTask = function(taskRef, deferred) {
QueueWorker.prototype._resetTask = function(taskRef, immediate, deferred) {
var self = this;
var retries = 0;

Expand All @@ -111,7 +111,12 @@ QueueWorker.prototype._resetTask = function(taskRef, deferred) {
if (_.isNull(task)) {
return task;
}
if (task._state === self.inProgressState) {
var id = self.processId + ':' + self.taskNumber;
var correctState = (task._state === self.inProgressState);
var correctOwner = (task._owner === id || !immediate);
var timeSinceUpdate = Date.now() - _.get(task, '_state_changed', 0);
var timedOut = ((self.taskTimeout && timeSinceUpdate > self.taskTimeout) || immediate);
if (correctState && correctOwner && timedOut) {
task._state = self.startState;
task._state_changed = SERVER_TIMESTAMP;
task._owner = null;
Expand All @@ -125,7 +130,7 @@ QueueWorker.prototype._resetTask = function(taskRef, deferred) {
if (error) {
if (++retries < MAX_TRANSACTION_ATTEMPTS) {
logger.debug(self._getLogEntry('reset task errored, retrying'), error);
setImmediate(self._resetTask.bind(self), taskRef, deferred);
setImmediate(self._resetTask.bind(self), taskRef, immediate, deferred);
} else {
var errorMsg = 'reset task errored too many times, no longer retrying';
logger.debug(self._getLogEntry(errorMsg), error);
Expand Down Expand Up @@ -491,7 +496,7 @@ QueueWorker.prototype._tryToProcess = function(deferred) {
if (self.busy) {
// Worker has become busy while the transaction was processing
// so give up the task for now so another worker can claim it
self._resetTask(nextTaskRef);
self._resetTask(nextTaskRef, true);
} else {
self.busy = true;
self.taskNumber += 1;
Expand Down Expand Up @@ -592,7 +597,7 @@ QueueWorker.prototype._setUpTimeouts = function() {
self.expiryTimeouts[taskName] = setTimeout(
self._resetTask.bind(self),
expires,
ref);
ref, false);
};

self.processingTaskAddedListener = self.processingTasksRef.on('child_added',
Expand Down Expand Up @@ -704,7 +709,7 @@ QueueWorker.prototype.setTaskSpec = function(taskSpec) {
self.currentTaskRef.child('_owner').off(
'value',
self.currentTaskListener);
self._resetTask(self.currentTaskRef);
self._resetTask(self.currentTaskRef, true);
self.currentTaskRef = null;
self.currentTaskListener = null;
}
Expand Down
94 changes: 90 additions & 4 deletions test/lib/queue_worker.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,92 @@ describe('QueueWorker', function() {
testRef = tasksRef.push({
'_state': th.validBasicTaskSpec.inProgressState,
'_state_changed': new Date().getTime(),
'_owner': qw.processId + ':' + qw.taskNumber,
'_progress': 10
}, function(errorA) {
if (errorA) {
return done(errorA);
}
qw.currentTaskRef = testRef;
var initial = true;
return testRef.on('value', function(snapshot) {
if (initial) {
initial = false;
qw._resetTask(testRef, true);
} else {
try {
var task = snapshot.val();
expect(task).to.have.all.keys(['_state_changed']);
expect(task._state_changed).to.be.closeTo(new Date().getTime() + th.offset, 250);
done();
} catch (errorB) {
done(errorB);
}
}
});
});
});

it('should not reset a task if immediate set but no longer owned by current worker', function(done) {
qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop);
qw.setTaskSpec(th.validBasicTaskSpec);
var originalTask = {
'_state': th.validBasicTaskSpec.inProgressState,
'_state_changed': new Date().getTime(),
'_owner': 'someone-else',
'_progress': 0
};
testRef = tasksRef.push(originalTask, function(errorA) {
if (errorA) {
return done(errorA);
}
qw.currentTaskRef = testRef;
return qw._resetTask(testRef, true).then(function() {
testRef.once('value', function(snapshot) {
try {
expect(snapshot.val()).to.deep.equal(originalTask);
done();
} catch (errorB) {
done(errorB);
}
});
}).catch(done);
});
});

it('should not reset a task if immediate not set and it is has changed state recently', function(done) {
qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop);
qw.setTaskSpec(th.validBasicTaskSpec);
var originalTask = {
'_state': th.validBasicTaskSpec.inProgressState,
'_state_changed': new Date().getTime(),
'_owner': 'someone',
'_progress': 0
};
testRef = tasksRef.push(originalTask, function(errorA) {
if (errorA) {
return done(errorA);
}
qw.currentTaskRef = testRef;
return qw._resetTask(testRef, false).then(function() {
testRef.once('value', function(snapshot) {
try {
expect(snapshot.val()).to.deep.equal(originalTask);
done();
} catch (errorB) {
done(errorB);
}
});
}).catch(done);
});
});

it('should reset a task that is currently in progress that has timed out', function(done) {
qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop);
qw.setTaskSpec(th.validTaskSpecWithTimeout);
testRef = tasksRef.push({
'_state': th.validBasicTaskSpec.inProgressState,
'_state_changed': new Date().getTime() - th.validTaskSpecWithTimeout.timeout,
'_owner': 'someone',
'_progress': 10
}, function(errorA) {
Expand All @@ -116,7 +202,7 @@ describe('QueueWorker', function() {
return testRef.on('value', function(snapshot) {
if (initial) {
initial = false;
qw._resetTask(testRef);
qw._resetTask(testRef, false);
} else {
try {
var task = snapshot.val();
Expand All @@ -137,7 +223,7 @@ describe('QueueWorker', function() {

testRef = tasksRef.push();
qw.currentTaskRef = testRef;
qw._resetTask(testRef).then(function() {
qw._resetTask(testRef, true).then(function() {
testRef.once('value', function(snapshot) {
try {
expect(snapshot.val()).to.be.null;
Expand All @@ -163,7 +249,7 @@ describe('QueueWorker', function() {
return done(errorA);
}
qw.currentTaskRef = testRef;
return qw._resetTask(testRef).then(function() {
return qw._resetTask(testRef, true).then(function() {
testRef.once('value', function(snapshot) {
try {
expect(snapshot.val()).to.deep.equal(originalTask);
Expand All @@ -189,7 +275,7 @@ describe('QueueWorker', function() {
return done(errorA);
}
qw.currentTaskRef = testRef;
return qw._resetTask(testRef).then(function() {
return qw._resetTask(testRef, true).then(function() {
testRef.once('value', function(snapshot) {
try {
expect(snapshot.val()).to.deep.equal(originalTask);
Expand Down

0 comments on commit 3bd38d9

Please sign in to comment.