From dc3e1e70a500c626111472303bfbbd22a0494e52 Mon Sep 17 00:00:00 2001 From: Chris Raynor Date: Sat, 19 Mar 2016 19:20:50 -0700 Subject: [PATCH 1/3] Fixing bug where a task is attempted twice if no new tasks are added while processing --- src/lib/queue_worker.js | 232 ++++++++++++++++++---------------- test/lib/queue_worker.spec.js | 54 +++++--- 2 files changed, 158 insertions(+), 128 deletions(-) diff --git a/src/lib/queue_worker.js b/src/lib/queue_worker.js index 3a03d41..5f40cb8 100644 --- a/src/lib/queue_worker.js +++ b/src/lib/queue_worker.js @@ -159,7 +159,7 @@ QueueWorker.prototype._resolve = function(taskNumber) { } deferred.resolve(); self.busy = false; - self._tryToProcess(self.nextTaskRef); + self._tryToProcess(); } else { var existedBefore; self.currentTaskRef.transaction(function(task) { @@ -208,7 +208,7 @@ QueueWorker.prototype._resolve = function(taskNumber) { } deferred.resolve(); self.busy = false; - self._tryToProcess(self.nextTaskRef); + self._tryToProcess(); } }, false); } @@ -249,7 +249,7 @@ QueueWorker.prototype._reject = function(taskNumber) { } deferred.resolve(); self.busy = false; - self._tryToProcess(self.nextTaskRef); + self._tryToProcess(); } else { if (_.isError(error)) { errorString = error.message; @@ -320,7 +320,7 @@ QueueWorker.prototype._reject = function(taskNumber) { } deferred.resolve(); self.busy = false; - self._tryToProcess(self.nextTaskRef); + self._tryToProcess(); } }, false); } @@ -394,10 +394,8 @@ QueueWorker.prototype._updateProgress = function(taskNumber) { /** * Attempts to claim the next task in the queue. - * @param {Firebase} nextTaskRef Reference to the Firebase location of the next - * task. */ -QueueWorker.prototype._tryToProcess = function(nextTaskRef, deferred) { +QueueWorker.prototype._tryToProcess = function(deferred) { var self = this, retries = 0, malformed = false; @@ -414,113 +412,125 @@ QueueWorker.prototype._tryToProcess = function(nextTaskRef, deferred) { logger.debug(self._getLogEntry('finished shutdown')); self.shutdownDeferred.resolve(); } else { - nextTaskRef.transaction(function(task) { - /* istanbul ignore if */ - if (_.isNull(task)) { - return task; - } - if (!_.isPlainObject(task)) { - malformed = true; - var error = new Error('Task was malformed'); - var errorStack = null; - if (!self.suppressStack) { - errorStack = error.stack; - } - return { - _state: self.errorState, - _state_changed: Firebase.ServerValue.TIMESTAMP, - _error_details: { - error: error.message, - original_task: task, - error_stack: errorStack - } - }; - } - if (_.isUndefined(task._state)) { - task._state = null; - } - if (task._state === self.startState) { - task._state = self.inProgressState; - task._state_changed = Firebase.ServerValue.TIMESTAMP; - task._owner = self.processId + ':' + (self.taskNumber + 1); - task._progress = 0; - return task; - } else { - return; - } - }, function(error, committed, snapshot) { - /* istanbul ignore if */ - if (error) { - if (++retries < MAX_TRANSACTION_ATTEMPTS) { - logger.debug(self._getLogEntry('errored while attempting to claim' + - ' a new task, retrying'), error); - return setImmediate(self._tryToProcess.bind(self), nextTaskRef, - deferred); - } else { - var errorMsg = 'errored while attempting to claim a new task too ' + - 'many times, no longer retrying'; - logger.debug(self._getLogEntry(errorMsg), error); - return deferred.reject(new Error(errorMsg)); + if (!self.newTaskRef) { + deferred.resolve(); + } else { + self.newTaskRef.once('value', function(snapshot) { + if (!snapshot.exists()) { + return deferred.resolve(); } - } else if (committed && snapshot.exists()) { - if (malformed) { - logger.debug(self._getLogEntry('found malformed entry ' + - snapshot.key())); - } else { + var nextTaskRef; + snapshot.forEach(function(childSnap) { + nextTaskRef = childSnap.ref(); + }); + nextTaskRef.transaction(function(task) { /* istanbul ignore if */ - if (self.busy) { - // Worker has become busy while the transaction was processing - - // so give up the task for now so another worker can claim it - self._resetTask(nextTaskRef); - } else { - self.busy = true; - self.taskNumber += 1; - logger.debug(self._getLogEntry('claimed ' + snapshot.key())); - self.currentTaskRef = snapshot.ref(); - self.currentTaskListener = self.currentTaskRef - .child('_owner').on('value', function(ownerSnapshot) { - var id = self.processId + ':' + self.taskNumber; - /* istanbul ignore else */ - if (ownerSnapshot.val() !== id && - !_.isNull(self.currentTaskRef) && - !_.isNull(self.currentTaskListener)) { - self.currentTaskRef.child('_owner').off( - 'value', - self.currentTaskListener); - self.currentTaskRef = null; - self.currentTaskListener = null; + if (_.isNull(task)) { + return task; + } + if (!_.isPlainObject(task)) { + malformed = true; + var error = new Error('Task was malformed'); + var errorStack = null; + if (!self.suppressStack) { + errorStack = error.stack; + } + return { + _state: self.errorState, + _state_changed: Firebase.ServerValue.TIMESTAMP, + _error_details: { + error: error.message, + original_task: task, + error_stack: errorStack } - }); - var data = snapshot.val(); - if (self.sanitize) { - [ - '_state', - '_state_changed', - '_owner', - '_progress', - '_error_details' - ].forEach(function(reserved) { - if (snapshot.hasChild(reserved)) { - delete data[reserved]; - } - }); + }; + } + if (_.isUndefined(task._state)) { + task._state = null; + } + if (task._state === self.startState) { + task._state = self.inProgressState; + task._state_changed = Firebase.ServerValue.TIMESTAMP; + task._owner = self.processId + ':' + (self.taskNumber + 1); + task._progress = 0; + return task; + } else { + return; + } + }, function(error, committed, snapshot) { + /* istanbul ignore if */ + if (error) { + if (++retries < MAX_TRANSACTION_ATTEMPTS) { + logger.debug(self._getLogEntry('errored while attempting to claim' + + ' a new task, retrying'), error); + return setImmediate(self._tryToProcess.bind(self), deferred); + } else { + var errorMsg = 'errored while attempting to claim a new task too ' + + 'many times, no longer retrying'; + logger.debug(self._getLogEntry(errorMsg), error); + return deferred.reject(new Error(errorMsg)); } - var progress = self._updateProgress(self.taskNumber); - var resolve = self._resolve(self.taskNumber); - var reject = self._reject(self.taskNumber); - setImmediate(function() { - try { - self.processingFunction.call(null, data, progress, resolve, - reject); - } catch (error) { - reject(error); + } else if (committed && snapshot.exists()) { + if (malformed) { + logger.debug(self._getLogEntry('found malformed entry ' + + snapshot.key())); + } else { + /* istanbul ignore if */ + if (self.busy) { + // Worker has become busy while the transaction was processing - + // so give up the task for now so another worker can claim it + self._resetTask(nextTaskRef); + } else { + self.busy = true; + self.taskNumber += 1; + logger.debug(self._getLogEntry('claimed ' + snapshot.key())); + self.currentTaskRef = snapshot.ref(); + self.currentTaskListener = self.currentTaskRef + .child('_owner').on('value', function(ownerSnapshot) { + var id = self.processId + ':' + self.taskNumber; + /* istanbul ignore else */ + if (ownerSnapshot.val() !== id && + !_.isNull(self.currentTaskRef) && + !_.isNull(self.currentTaskListener)) { + self.currentTaskRef.child('_owner').off( + 'value', + self.currentTaskListener); + self.currentTaskRef = null; + self.currentTaskListener = null; + } + }); + var data = snapshot.val(); + if (self.sanitize) { + [ + '_state', + '_state_changed', + '_owner', + '_progress', + '_error_details' + ].forEach(function(reserved) { + if (snapshot.hasChild(reserved)) { + delete data[reserved]; + } + }); + } + var progress = self._updateProgress(self.taskNumber); + var resolve = self._resolve(self.taskNumber); + var reject = self._reject(self.taskNumber); + setImmediate(function() { + try { + self.processingFunction.call(null, data, progress, resolve, + reject); + } catch (error) { + reject(error); + } + }); } - }); + } } - } - } - deferred.resolve(); - }, false); + deferred.resolve(); + }, false); + }); + } } } else { deferred.resolve(); @@ -700,9 +710,8 @@ QueueWorker.prototype.setTaskSpec = function(taskSpec) { logger.debug(self._getLogEntry('listening')); self.newTaskListener = self.newTaskRef.on( 'child_added', - function(snapshot) { - self.nextTaskRef = snapshot.ref(); - self._tryToProcess(self.nextTaskRef); + function() { + self._tryToProcess(); }, /* istanbul ignore next */ function(error) { logger.debug(self._getLogEntry('errored listening to Firebase'), error); }); @@ -742,4 +751,3 @@ QueueWorker.prototype.shutdown = function() { }; module.exports = QueueWorker; - diff --git a/test/lib/queue_worker.spec.js b/test/lib/queue_worker.spec.js index bb78175..34eb688 100644 --- a/test/lib/queue_worker.spec.js +++ b/test/lib/queue_worker.spec.js @@ -1065,21 +1065,23 @@ describe('QueueWorker', function() { qw = new th.QueueWorker(tasksRef, '0', true, false, _.noop); }); - afterEach(function() { + afterEach(function(done) { qw.setTaskSpec(); + tasksRef.set(null, done); }); it('should not try and process a task if busy', function(done) { qw.startState = th.validTaskSpecWithStartState.startState; qw.inProgressState = th.validTaskSpecWithStartState.inProgressState; qw.busy = true; - var testRef = tasksRef.push({ + qw.newTaskRef = tasksRef; + tasksRef.push({ '_state': th.validTaskSpecWithStartState.startState }, function(errorA) { if (errorA) { return done(errorA); } - qw._tryToProcess(testRef).then(function() { + qw._tryToProcess().then(function() { try { expect(qw.currentTaskRef).to.be.null; done(); @@ -1093,13 +1095,14 @@ describe('QueueWorker', function() { it('should try and process a task if not busy', function(done) { qw.startState = th.validTaskSpecWithStartState.startState; qw.inProgressState = th.validTaskSpecWithStartState.inProgressState; - var testRef = tasksRef.push({ + qw.newTaskRef = tasksRef; + tasksRef.push({ '_state': th.validTaskSpecWithStartState.startState }, function(errorA) { if (errorA) { return done(errorA); } - qw._tryToProcess(testRef).then(function() { + qw._tryToProcess().then(function() { try { expect(qw.currentTaskRef).to.not.be.null; expect(qw.busy).to.be.true; @@ -1119,14 +1122,14 @@ describe('QueueWorker', function() { qw.inProgressState = th.validTaskSpecWithStartState.inProgressState; qw.finishedState = th.validTaskSpecWithFinishedState.finishedState; qw.taskRetries = 0; + qw.newTaskRef = tasksRef; var testRef = tasksRef.push({ '_state': th.validTaskSpecWithStartState.startState }, function(errorA) { if (errorA) { return done(errorA); } - qw.nextTaskRef = testRef; - qw._tryToProcess(testRef).then(function() { + qw._tryToProcess().then(function() { try { expect(qw.currentTaskRef).to.not.be.null; expect(qw.busy).to.be.true; @@ -1163,13 +1166,14 @@ describe('QueueWorker', function() { it('should try and process a task without a _state if not busy', function(done) { qw.startState = null; qw.inProgressState = th.validBasicTaskSpec.inProgressState; - var testRef = tasksRef.push({ + qw.newTaskRef = tasksRef; + tasksRef.push({ foo: 'bar' }, function(errorA) { if (errorA) { return done(errorA); } - qw._tryToProcess(testRef).then(function() { + qw._tryToProcess().then(function() { try { expect(qw.currentTaskRef).to.not.be.null; expect(qw.busy).to.be.true; @@ -1184,11 +1188,12 @@ describe('QueueWorker', function() { it('should not try and process a task if not a plain object [1]', function(done) { qw.inProgressState = th.validTaskSpecWithStartState.inProgressState; qw.suppressStack = true; + qw.newTaskRef = tasksRef; var testRef = tasksRef.push('invalid', function(errorA) { if (errorA) { return done(errorA); } - qw._tryToProcess(testRef).then(function() { + qw._tryToProcess().then(function() { try { expect(qw.currentTaskRef).to.be.null; expect(qw.busy).to.be.false; @@ -1215,11 +1220,12 @@ describe('QueueWorker', function() { it('should not try and process a task if not a plain object [2]', function(done) { qw.inProgressState = th.validTaskSpecWithStartState.inProgressState; + qw.newTaskRef = tasksRef; var testRef = tasksRef.push('invalid', function(errorA) { if (errorA) { return done(errorA); } - qw._tryToProcess(testRef).then(function() { + qw._tryToProcess().then(function() { try { expect(qw.currentTaskRef).to.be.null; expect(qw.busy).to.be.false; @@ -1248,13 +1254,14 @@ describe('QueueWorker', function() { it('should not try and process a task if no longer in correct startState', function(done) { qw.startState = th.validTaskSpecWithStartState.startState; qw.inProgressState = th.validTaskSpecWithStartState.inProgressState; - var testRef = tasksRef.push({ + qw.newTaskRef = tasksRef; + tasksRef.push({ '_state': th.validTaskSpecWithStartState.inProgressState }, function(errorA) { if (errorA) { return done(errorA); } - qw._tryToProcess(testRef).then(function() { + qw._tryToProcess().then(function() { try { expect(qw.currentTaskRef).to.be.null; done(); @@ -1265,16 +1272,31 @@ describe('QueueWorker', function() { }); }); + it('should not try and process a task if no task to process', function(done) { + qw.startState = th.validTaskSpecWithStartState.startState; + qw.inProgressState = th.validTaskSpecWithStartState.inProgressState; + qw.newTaskRef = tasksRef; + qw._tryToProcess().then(function() { + try { + expect(qw.currentTaskRef).to.be.null; + done(); + } catch (errorB) { + done(errorB); + } + }).catch(done); + }); + it('should invalidate callbacks if another process times the task out', function(done) { qw.startState = th.validTaskSpecWithStartState.startState; qw.inProgressState = th.validTaskSpecWithStartState.inProgressState; + qw.newTaskRef = tasksRef; var testRef = tasksRef.push({ '_state': th.validTaskSpecWithStartState.startState }, function(errorA) { if (errorA) { return done(errorA); } - qw._tryToProcess(testRef).then(function() { + qw._tryToProcess().then(function() { try { expect(qw.currentTaskRef).to.not.be.null; expect(qw.busy).to.be.true; @@ -1889,7 +1911,7 @@ describe('QueueWorker', function() { var ref = tasksRef.push(); tasksRef.once('child_added', function() { try { - expect(qw._tryToProcess).to.have.been.calledOnce.and.calledWith(ref); + expect(qw._tryToProcess).to.have.been.calledOnce; spy.restore(); done(); } catch (error) { @@ -1907,7 +1929,7 @@ describe('QueueWorker', function() { var ref = tasksRef.push(); tasksRef.once('child_added', function() { try { - expect(qw._tryToProcess).to.have.been.calledOnce.and.calledWith(ref); + expect(qw._tryToProcess).to.have.been.calledOnce; spy.restore(); done(); } catch (error) { From 44203b21225853e5db1113636f20d7c1a5c4db78 Mon Sep 17 00:00:00 2001 From: Chris Raynor Date: Sat, 19 Mar 2016 19:30:40 -0700 Subject: [PATCH 2/3] Adding extra debug log --- src/lib/queue_worker.js | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lib/queue_worker.js b/src/lib/queue_worker.js index 5f40cb8..b716c32 100644 --- a/src/lib/queue_worker.js +++ b/src/lib/queue_worker.js @@ -455,6 +455,7 @@ QueueWorker.prototype._tryToProcess = function(deferred) { task._progress = 0; return task; } else { + logger.debug(self._getLogEntry('task no longer in correct state: expected ' + self.startState + ', got ' + task._state)) return; } }, function(error, committed, snapshot) { From cc28447b354f4dcba122ca710fbe15e2aacf31a4 Mon Sep 17 00:00:00 2001 From: Chris Raynor Date: Sat, 19 Mar 2016 19:40:01 -0700 Subject: [PATCH 3/3] Fixing syntax --- src/lib/queue_worker.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib/queue_worker.js b/src/lib/queue_worker.js index b716c32..b49c8ac 100644 --- a/src/lib/queue_worker.js +++ b/src/lib/queue_worker.js @@ -455,7 +455,7 @@ QueueWorker.prototype._tryToProcess = function(deferred) { task._progress = 0; return task; } else { - logger.debug(self._getLogEntry('task no longer in correct state: expected ' + self.startState + ', got ' + task._state)) + logger.debug(self._getLogEntry('task no longer in correct state: expected ' + self.startState + ', got ' + task._state)); return; } }, function(error, committed, snapshot) {