From 6c67d636afa3d32e95f09eb6f957e1355b826dfe Mon Sep 17 00:00:00 2001 From: Doug Read Date: Wed, 10 Jun 2015 18:35:36 -0700 Subject: [PATCH] add error_stack on reject also reject error not string --- src/lib/queue_worker.js | 27 ++++++++++++++++----------- test/lib/queue_worker.spec.js | 11 ++++++----- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/src/lib/queue_worker.js b/src/lib/queue_worker.js index 921f38e..1dd0745 100644 --- a/src/lib/queue_worker.js +++ b/src/lib/queue_worker.js @@ -113,7 +113,7 @@ QueueWorker.prototype._resetTask = function(taskRef, deferred) { } else { var errorMsg = 'reset task errored too many times, no longer retrying'; logger.debug(self._getLogEntry(errorMsg), error); - deferred.reject(errorMsg); + deferred.reject(new Error(errorMsg)); } } else { if (committed && snapshot.exists()) { @@ -191,7 +191,7 @@ QueueWorker.prototype._resolve = function(taskNumber) { var errorMsg = 'resolve task errored too many times, no longer ' + 'retrying'; logger.debug(self._getLogEntry(errorMsg), error); - deferred.reject(errorMsg); + deferred.reject(new Error(errorMsg)); } } else { if (committed && existedBefore) { @@ -244,8 +244,11 @@ QueueWorker.prototype._reject = function(taskNumber) { self.busy = false; self._tryToProcess(self.nextTaskRef); } else { + if (!_.isUndefined(error) && !_.isError(error)) { + error = new Error(error); + } if (!_.isUndefined(error)) { - errorString = '' + error; + errorString = '' + error.message; } var existedBefore; self.currentTaskRef.transaction(function(task) { @@ -268,6 +271,7 @@ QueueWorker.prototype._reject = function(taskNumber) { task._error_details = { previous_state: self.inProgressState, error: errorString, + error_stack: (error||{}).stack||null, attempts: attempts + 1 }; return task; @@ -285,7 +289,7 @@ QueueWorker.prototype._reject = function(taskNumber) { var errorMsg = 'reject task errored too many times, no longer ' + 'retrying'; logger.debug(self._getLogEntry(errorMsg), error); - deferred.reject(errorMsg); + deferred.reject(new Error(errorMsg)); } } else { if (committed && existedBefore) { @@ -326,12 +330,12 @@ QueueWorker.prototype._updateProgress = function(taskNumber) { _.isNaN(progress) || progress < 0 || progress > 100) { - return RSVP.reject('Invalid progress'); + return RSVP.reject(new Error('Invalid progress')); } if ((taskNumber !== self.taskNumber) || _.isNull(self.currentTaskRef)) { errorMsg = 'Can\'t update progress - no task currently being processed'; logger.debug(self._getLogEntry(errorMsg)); - return RSVP.reject(errorMsg); + return RSVP.reject(new Error(errorMsg)); } return new RSVP.Promise(function(resolve, reject) { self.currentTaskRef.transaction(function(task) { @@ -352,7 +356,7 @@ QueueWorker.prototype._updateProgress = function(taskNumber) { if (error) { errorMsg = 'errored while attempting to update progress'; logger.debug(self._getLogEntry(errorMsg), error); - return reject(errorMsg); + return reject(new Error(errorMsg)); } if (committed && snapshot.exists()) { resolve(); @@ -360,7 +364,7 @@ QueueWorker.prototype._updateProgress = function(taskNumber) { errorMsg = 'Can\'t update progress - current task no longer owned ' + 'by this process'; logger.debug(self._getLogEntry(errorMsg)); - return reject(errorMsg); + return reject(new Error(errorMsg)); } }, false); }); @@ -386,7 +390,7 @@ QueueWorker.prototype._tryToProcess = function(nextTaskRef, deferred) { if (!self.busy) { if (!_.isNull(self.shutdownDeffered)) { - deferred.reject('Shutting down - can no longer process new tasks'); + deferred.reject(new Error('Shutting down - can no longer process new tasks')); self.setTaskSpec(null); logger.debug(self._getLogEntry('finished shutdown')); self.shutdownDeffered.resolve(); @@ -431,7 +435,7 @@ QueueWorker.prototype._tryToProcess = function(nextTaskRef, deferred) { var errorMsg = 'errored while attempting to claim a new task too ' + 'many times, no longer retrying'; logger.debug(self._getLogEntry(errorMsg), error); - return deferred.reject(errorMsg); + return deferred.reject(new Error(errorMsg)); } } else if (committed && snapshot.exists()) { if (malformed) { @@ -484,7 +488,7 @@ QueueWorker.prototype._tryToProcess = function(nextTaskRef, deferred) { self.processingFunction.call(null, data, progress, resolve, reject); } catch (error) { - reject(error.message); + reject(error); } }); } @@ -713,3 +717,4 @@ QueueWorker.prototype.shutdown = function() { }; module.exports = QueueWorker; + diff --git a/test/lib/queue_worker.spec.js b/test/lib/queue_worker.spec.js index 39e93d0..4c304ed 100644 --- a/test/lib/queue_worker.spec.js +++ b/test/lib/queue_worker.spec.js @@ -563,6 +563,7 @@ describe('QueueWorker', function() { } else { try { var task = snapshot.val(); + console.log('$$', task); expect(task).to.have.all.keys(['_progress', '_state_changed', '_error_details']); expect(task['_state_changed']).to.be.closeTo(new Date().getTime() + th.offset, 250); expect(task['_progress']).to.equal(0); @@ -641,7 +642,7 @@ describe('QueueWorker', function() { expect(task['_state']).to.equal('error'); expect(task['_state_changed']).to.be.closeTo(new Date().getTime() + th.offset, 250); expect(task['_progress']).to.equal(0); - expect(task['_error_details']).to.have.all.keys(['previous_state', 'error', 'attempts']); + expect(task['_error_details']).to.have.all.keys(['previous_state', 'error', 'attempts', 'error_stack']); expect(task['_error_details'].previous_state).to.equal(th.validBasicTaskSpec.inProgressState); expect(task['_error_details'].error).to.equal(nonStringObject.toString()); expect(task['_error_details'].attempts).to.equal(1); @@ -657,7 +658,7 @@ describe('QueueWorker', function() { it('should reject a task owned by the current worker and append the error string to the _error_details', function(done) { qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); - var error = 'My error message'; + var error = new Error('My error message'); qw.setTaskSpec(th.validBasicTaskSpec); testRef = tasksRef.push({ '_state': th.validBasicTaskSpec.inProgressState, @@ -681,10 +682,10 @@ describe('QueueWorker', function() { expect(task['_state']).to.equal('error'); expect(task['_state_changed']).to.be.closeTo(new Date().getTime() + th.offset, 250); expect(task['_progress']).to.equal(0); - expect(task['_error_details']).to.have.all.keys(['previous_state', 'error', 'attempts']); + expect(task['_error_details']).to.have.all.keys(['previous_state', 'error', 'attempts', 'error_stack']); expect(task['_error_details'].previous_state).to.equal(th.validBasicTaskSpec.inProgressState); expect(task['_error_details'].attempts).to.equal(1); - expect(task['_error_details'].error).to.equal(error); + expect(task['_error_details'].error).to.equal(error.message); done(); } catch (errorB) { done(errorB); @@ -1014,7 +1015,7 @@ describe('QueueWorker', function() { expect(task['_state']).to.equal('error'); expect(task['_state_changed']).to.be.closeTo(new Date().getTime() + th.offset, 250); expect(task['_progress']).to.equal(0); - expect(task['_error_details']).to.have.all.keys(['previous_state', 'attempts', 'error']); + expect(task['_error_details']).to.have.all.keys(['previous_state', 'attempts', 'error', 'error_stack']); expect(task['_error_details'].previous_state).to.equal(th.validTaskSpecWithStartState.inProgressState); expect(task['_error_details'].attempts).to.equal(1); expect(task['_error_details'].error).to.equal('Error thrown in processingFunction');