diff --git a/README.md b/README.md index 915ae28..c773f1f 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,7 @@ Queue workers can take an optional options object to specify: - `specId` - specifies the spec type for this worker. This is important when creating multiple specs. Defaults to `null` which uses the default spec. - `numWorkers` - specifies the number of workers to run simultaneously for this node.js thread. Defaults to 1 worker. - `sanitize` - specifies whether the `data` object passed to the processing function is sanitized of internal keys reserved for use by the queue. Defaults to `true`. + - `suppressStack` - specifies whether the queue will suppress error stack traces from being placed in the `_error_details` of the task if it's rejected with an Error. ```js ... @@ -80,7 +81,8 @@ Queue workers can take an optional options object to specify: var options = { 'specId': 'spec_1', 'numWorkers': 5, - 'sanitize': false + 'sanitize': false, + 'suppressStack': true }; var queue = new Queue(ref, options, function(data, progress, resolve, reject) { ... @@ -131,7 +133,7 @@ The reserved keys are: - `_state_changed` - The timestamp that the task changed into its current state. This will always be the server time when the processing function was called. - `_owner` - A unique ID for the worker and task number combination to ensure only one worker is responsible for the task at any time. - `_progress` - A number between 0 and 100, reset at the start of each task to 0. - - `_error_details` - An object containing the error details from a previous task execution. If present, it may contain a `previous_state` string (or `null` if there was no previous state, in the case of malformed input) capturing the state the task was in when it errored, an `error` string from the `reject()` callback of the previous task, and an `attempts` field containing the number of retries attempted before failing a task. + - `_error_details` - An object containing the error details from a previous task execution. If present, it may contain a `previous_state` string (or `null` if there was no previous state, in the case of malformed input) capturing the state the task was in when it errored, an `error` string from the `reject()` callback of the previous task, and an `attempts` field containing the number of retries attempted before failing a task. If the `suppressStack` queue option is not set to `true`, there may also be a `error_stack` field containg a stack dump of any error passed into the `reject()` function. By default the data is sanitized of these keys, but you can disable this behavior by setting `'sanitize': false` in the [queue options](#queue-worker-options-optional). @@ -167,7 +169,7 @@ A callback function for reporting that the current task has been completed and t #### `reject()` -A callback function for reporting that the current task failed and the worker is ready to process another task. Once this is called, the task will go into the `error_state` for the job with an additional `_error_details` object containing a `previous_state` key referencing this task's `in_progress_state`. If a string is passed into the `reject()` function, the `_error_details` will also contain an `error` key containing that string. Note that if retries are enabled and there are remaining attempts, the task will be restarted in it's spec's `start_state`. +A callback function for reporting that the current task failed and the worker is ready to process another task. Once this is called, the task will go into the `error_state` for the job with an additional `_error_details` object containing a `previous_state` key referencing this task's `in_progress_state`. If a string is passed into the `reject()` function, the `_error_details` will also contain an `error` key containing that string. If an Error is passed into the `reject()` function, the `error` key will contain the `error.message`, and if `suppressStack` option has not been specified the `error_stack` key will contain the `error.stack`. Note that if retries are enabled and there are remaining attempts, the task will be restarted in it's spec's `start_state`. ## Queue Security @@ -211,6 +213,9 @@ These don't have to use a custom token, for instance you could use `auth != null "error": { ".validate": "newData.isString()" }, + "error_stack": { + ".validate": "newData.isString()" + }, "previous_state": { ".validate": "newData.isString()" }, diff --git a/changelog.txt b/changelog.txt index e69de29..2f0842f 100644 --- a/changelog.txt +++ b/changelog.txt @@ -0,0 +1,2 @@ +feature - Better error handling, including the ability to handle Error objects in the `reject()` callback function +feature - Reporting Error stacktraces in the `_error_details` of a task, unless the `suppressStack` queue option is specified diff --git a/src/lib/queue_worker.js b/src/lib/queue_worker.js index 921f38e..ef31e8e 100644 --- a/src/lib/queue_worker.js +++ b/src/lib/queue_worker.js @@ -17,7 +17,7 @@ var MAX_TRANSACTION_ATTEMPTS = 10, * task is claimed. * @return {Object} */ -function QueueWorker(tasksRef, processId, sanitize, processingFunction) { +function QueueWorker(tasksRef, processId, sanitize, suppressStack, processingFunction) { var self = this, error; if (_.isUndefined(tasksRef)) { @@ -35,6 +35,11 @@ function QueueWorker(tasksRef, processId, sanitize, processingFunction) { logger.debug('QueueWorker(): ' + error); throw new Error(error); } + if (!_.isBoolean(suppressStack)) { + error = 'Invalid suppressStack option.'; + logger.debug('QueueWorker(): ' + error); + throw new Error(error); + } if (!_.isFunction(processingFunction)) { error = 'No processing function provided.'; logger.debug('QueueWorker(): ' + error); @@ -62,6 +67,7 @@ function QueueWorker(tasksRef, processId, sanitize, processingFunction) { self.taskNumber = 0; self.errorState = DEFAULT_ERROR_STATE; self.sanitize = sanitize; + self.suppressStack = suppressStack; return self; } @@ -113,7 +119,7 @@ QueueWorker.prototype._resetTask = function(taskRef, deferred) { } else { var errorMsg = 'reset task errored too many times, no longer retrying'; logger.debug(self._getLogEntry(errorMsg), error); - deferred.reject(errorMsg); + deferred.reject(new Error(errorMsg)); } } else { if (committed && snapshot.exists()) { @@ -191,7 +197,7 @@ QueueWorker.prototype._resolve = function(taskNumber) { var errorMsg = 'resolve task errored too many times, no longer ' + 'retrying'; logger.debug(self._getLogEntry(errorMsg), error); - deferred.reject(errorMsg); + deferred.reject(new Error(errorMsg)); } } else { if (committed && existedBefore) { @@ -222,6 +228,7 @@ QueueWorker.prototype._reject = function(taskNumber) { var self = this, retries = 0, errorString = null, + errorStack = null, deferred = RSVP.defer(); /** @@ -244,9 +251,18 @@ QueueWorker.prototype._reject = function(taskNumber) { self.busy = false; self._tryToProcess(self.nextTaskRef); } else { - if (!_.isUndefined(error)) { - errorString = '' + error; + if (_.isError(error)) { + errorString = error.message; + } else if (_.isString(error)) { + errorString = error; + } else if (!_.isUndefined(error) && !_.isNull(error)) { + errorString = error.toString(); + } + + if (!self.suppressStack) { + errorStack = _.get(error, 'stack', null); } + var existedBefore; self.currentTaskRef.transaction(function(task) { existedBefore = true; @@ -268,6 +284,7 @@ QueueWorker.prototype._reject = function(taskNumber) { task._error_details = { previous_state: self.inProgressState, error: errorString, + error_stack: errorStack, attempts: attempts + 1 }; return task; @@ -285,7 +302,7 @@ QueueWorker.prototype._reject = function(taskNumber) { var errorMsg = 'reject task errored too many times, no longer ' + 'retrying'; logger.debug(self._getLogEntry(errorMsg), error); - deferred.reject(errorMsg); + deferred.reject(new Error(errorMsg)); } } else { if (committed && existedBefore) { @@ -326,12 +343,12 @@ QueueWorker.prototype._updateProgress = function(taskNumber) { _.isNaN(progress) || progress < 0 || progress > 100) { - return RSVP.reject('Invalid progress'); + return RSVP.reject(new Error('Invalid progress')); } if ((taskNumber !== self.taskNumber) || _.isNull(self.currentTaskRef)) { errorMsg = 'Can\'t update progress - no task currently being processed'; logger.debug(self._getLogEntry(errorMsg)); - return RSVP.reject(errorMsg); + return RSVP.reject(new Error(errorMsg)); } return new RSVP.Promise(function(resolve, reject) { self.currentTaskRef.transaction(function(task) { @@ -352,7 +369,7 @@ QueueWorker.prototype._updateProgress = function(taskNumber) { if (error) { errorMsg = 'errored while attempting to update progress'; logger.debug(self._getLogEntry(errorMsg), error); - return reject(errorMsg); + return reject(new Error(errorMsg)); } if (committed && snapshot.exists()) { resolve(); @@ -360,7 +377,7 @@ QueueWorker.prototype._updateProgress = function(taskNumber) { errorMsg = 'Can\'t update progress - current task no longer owned ' + 'by this process'; logger.debug(self._getLogEntry(errorMsg)); - return reject(errorMsg); + return reject(new Error(errorMsg)); } }, false); }); @@ -386,7 +403,7 @@ QueueWorker.prototype._tryToProcess = function(nextTaskRef, deferred) { if (!self.busy) { if (!_.isNull(self.shutdownDeffered)) { - deferred.reject('Shutting down - can no longer process new tasks'); + deferred.reject(new Error('Shutting down - can no longer process new tasks')); self.setTaskSpec(null); logger.debug(self._getLogEntry('finished shutdown')); self.shutdownDeffered.resolve(); @@ -398,12 +415,18 @@ QueueWorker.prototype._tryToProcess = function(nextTaskRef, deferred) { } if (!_.isPlainObject(task)) { malformed = true; + var error = new Error('Task was malformed'); + var errorStack = null; + if (!self.suppressStack) { + errorStack = error.stack; + } return { _state: self.errorState, _state_changed: Firebase.ServerValue.TIMESTAMP, _error_details: { - error: 'Task was malformed', - original_task: task + error: error.message, + original_task: task, + error_stack: errorStack } }; } @@ -431,7 +454,7 @@ QueueWorker.prototype._tryToProcess = function(nextTaskRef, deferred) { var errorMsg = 'errored while attempting to claim a new task too ' + 'many times, no longer retrying'; logger.debug(self._getLogEntry(errorMsg), error); - return deferred.reject(errorMsg); + return deferred.reject(new Error(errorMsg)); } } else if (committed && snapshot.exists()) { if (malformed) { @@ -484,7 +507,7 @@ QueueWorker.prototype._tryToProcess = function(nextTaskRef, deferred) { self.processingFunction.call(null, data, progress, resolve, reject); } catch (error) { - reject(error.message); + reject(error); } }); } @@ -713,3 +736,4 @@ QueueWorker.prototype.shutdown = function() { }; module.exports = QueueWorker; + diff --git a/src/queue.js b/src/queue.js index 29a1f2c..367aa96 100644 --- a/src/queue.js +++ b/src/queue.js @@ -15,6 +15,7 @@ var _ = require('lodash'), var DEFAULT_NUM_WORKERS = 1, DEFAULT_SANITIZE = true, + DEFAULT_SUPPRESS_STACK = false, DEFAULT_TASK_SPEC = { inProgressState: 'in_progress', timeout: 300000 // 5 minutes @@ -55,6 +56,7 @@ function Queue() { var error; self.numWorkers = DEFAULT_NUM_WORKERS; self.sanitize = DEFAULT_SANITIZE; + self.suppressStack = DEFAULT_SUPPRESS_STACK; self.initialized = false; self.specChangeListener = null; @@ -104,6 +106,15 @@ function Queue() { throw new Error(error); } } + if (!_.isUndefined(options.suppressStack)) { + if (_.isBoolean(options.suppressStack)) { + self.suppressStack = options.suppressStack; + } else { + error = 'options.suppressStack must be a boolean.'; + logger.debug('Queue(): Error during initialization', error); + throw new Error(error); + } + } self.processingFunction = constructorArguments[2]; } else { error = 'Queue can only take at most three arguments - queueRef, ' + @@ -119,6 +130,7 @@ function Queue() { self.ref.child('tasks'), processId, self.sanitize, + self.suppressStack, self.processingFunction )); } diff --git a/test/lib/queue_worker.spec.js b/test/lib/queue_worker.spec.js index 39e93d0..3eec35c 100644 --- a/test/lib/queue_worker.spec.js +++ b/test/lib/queue_worker.spec.js @@ -32,43 +32,51 @@ describe('QueueWorker', function() { }).to.throw('Invalid process ID provided.'); }); - it('should not create a QueueWorker with only a tasksRef, process ID and sanitize option', function() { + it('should not create a QueueWorker with only a tasksRef, process ID, sanitize and suppressStack option', function() { expect(function() { - new th.QueueWorker(tasksRef, '0', true); + new th.QueueWorker(tasksRef, '0', true, false); }).to.throw('No processing function provided.'); }); it('should not create a QueueWorker with a tasksRef, processId, sanitize option and an invalid processing function', function() { ['', 'foo', NaN, Infinity, true, false, 0, 1, ['foo', 'bar'], { foo: 'bar' }, null, { foo: 'bar' }, { foo: { bar: { baz: true } } }].forEach(function(nonFunctionObject) { expect(function() { - new th.QueueWorker(tasksRef, '0', true, nonFunctionObject); + new th.QueueWorker(tasksRef, '0', true, false, nonFunctionObject); }).to.throw('No processing function provided.'); }); }); it('should create a QueueWorker with a tasksRef, processId, sanitize option and a processing function', function() { - new th.QueueWorker(tasksRef, '0', true, _.noop); + new th.QueueWorker(tasksRef, '0', true, false, _.noop); }); it('should not create a QueueWorker with a non-string processId specified', function() { [NaN, Infinity, true, false, 0, 1, ['foo', 'bar'], { foo: 'bar' }, null, { foo: 'bar' }, { foo: { bar: { baz: true } } }, _.noop].forEach(function(nonStringObject) { expect(function() { - new th.QueueWorker(tasksRef, nonStringObject, true, _.noop); + new th.QueueWorker(tasksRef, nonStringObject, true, false, _.noop); }).to.throw('Invalid process ID provided.'); }); }); - it('should not create a QueueWorker with a non-string processId specified', function() { + it('should not create a QueueWorker with a non-boolean sanitize option specified', function() { [NaN, Infinity, '', 'foo', 0, 1, ['foo', 'bar'], { foo: 'bar' }, null, { foo: 'bar' }, { foo: { bar: { baz: true } } }, _.noop].forEach(function(nonBooleanObject) { expect(function() { - new th.QueueWorker(tasksRef, '0', nonBooleanObject, _.noop); + new th.QueueWorker(tasksRef, '0', nonBooleanObject, false, _.noop); }).to.throw('Invalid sanitize option.'); }); }); + + it('should not create a QueueWorker with a non-boolean suppressStack option specified', function() { + [NaN, Infinity, '', 'foo', 0, 1, ['foo', 'bar'], { foo: 'bar' }, null, { foo: 'bar' }, { foo: { bar: { baz: true } } }, _.noop].forEach(function(nonBooleanObject) { + expect(function() { + new th.QueueWorker(tasksRef, '0', true, nonBooleanObject, _.noop); + }).to.throw('Invalid suppressStack option.'); + }); + }); }); describe('#_getLogEntry', function() { - var qw = new th.QueueWorker(tasksRef, '0', true, _.noop); + var qw = new th.QueueWorker(tasksRef, '0', true, false, _.noop); it('should construct a log entry given a string', function() { expect(qw._getLogEntry('informative message')).to.equal('QueueWorker ' + qw.processId + ' informative message'); @@ -91,7 +99,7 @@ describe('QueueWorker', function() { }); it('should reset a task that is currently in progress', function(done) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); qw.setTaskSpec(th.validBasicTaskSpec); testRef = tasksRef.push({ '_state': th.validBasicTaskSpec.inProgressState, @@ -123,7 +131,7 @@ describe('QueueWorker', function() { }); it('should not reset a task that no longer exists', function(done) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); qw.setTaskSpec(th.validBasicTaskSpec); testRef = tasksRef.push(); @@ -141,7 +149,7 @@ describe('QueueWorker', function() { }); it('should not reset a task if it is has already changed state', function(done) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); var originalTask = { '_state': th.validTaskSpecWithFinishedState.finishedState, '_state_changed': new Date().getTime(), @@ -168,7 +176,7 @@ describe('QueueWorker', function() { }); it('should not reset a task if it is has no state', function(done) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); var originalTask = { '_state_changed': new Date().getTime(), '_owner': qw.processId + ':' + qw.taskNumber, @@ -204,7 +212,7 @@ describe('QueueWorker', function() { }); it('should resolve a task owned by the current worker and remove it when no finishedState is specified', function(done) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); qw.setTaskSpec(th.validBasicTaskSpec); testRef = tasksRef.push({ '_state': th.validBasicTaskSpec.inProgressState, @@ -234,7 +242,7 @@ describe('QueueWorker', function() { }); it('should resolve a task owned by the current worker and change the state when a finishedState is specified and no object passed', function(done) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); qw.setTaskSpec(th.validTaskSpecWithFinishedState); testRef = tasksRef.push({ '_state': th.validTaskSpecWithFinishedState.inProgressState, @@ -269,7 +277,7 @@ describe('QueueWorker', function() { ['', 'foo', NaN, Infinity, true, false, 0, 1, ['foo', 'bar'], null, _.noop].forEach(function(nonPlainObject) { it('should resolve an task owned by the current worker and change the state when a finishedState is specified and an invalid object ' + nonPlainObject + ' passed', function(done) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); qw.setTaskSpec(th.validTaskSpecWithFinishedState); testRef = tasksRef.push({ '_state': th.validTaskSpecWithFinishedState.inProgressState, @@ -304,7 +312,7 @@ describe('QueueWorker', function() { }); it('should resolve a task owned by the current worker and change the state when a finishedState is specified and a plain object passed', function(done) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); qw.setTaskSpec(th.validTaskSpecWithFinishedState); testRef = tasksRef.push({ '_state': th.validTaskSpecWithFinishedState.inProgressState, @@ -339,7 +347,7 @@ describe('QueueWorker', function() { }); it('should not resolve a task that no longer exists', function(done) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); qw.setTaskSpec(th.validTaskSpecWithFinishedState); testRef = tasksRef.push(); @@ -357,7 +365,7 @@ describe('QueueWorker', function() { }); it('should not resolve a task if it is no longer owned by the current worker', function(done) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); var originalTask = { '_state': th.validTaskSpecWithFinishedState.inProgressState, '_state_changed': new Date().getTime(), @@ -384,7 +392,7 @@ describe('QueueWorker', function() { }); it('should not resolve a task if it is has already changed state', function(done) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); var originalTask = { '_state': th.validTaskSpecWithFinishedState.finishedState, '_state_changed': new Date().getTime(), @@ -411,7 +419,7 @@ describe('QueueWorker', function() { }); it('should not resolve a task if it is has no state', function(done) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); var originalTask = { '_state_changed': new Date().getTime(), '_owner': qw.processId + ':' + qw.taskNumber, @@ -437,7 +445,7 @@ describe('QueueWorker', function() { }); it('should not resolve a task if it is no longer being processed', function(done) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); var originalTask = { '_state': th.validTaskSpecWithFinishedState.inProgressState, '_state_changed': new Date().getTime(), @@ -463,7 +471,7 @@ describe('QueueWorker', function() { }); it('should not resolve a task if a new task is being processed', function(done) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); var originalTask = { '_state': th.validTaskSpecWithFinishedState.inProgressState, '_state_changed': new Date().getTime(), @@ -502,7 +510,7 @@ describe('QueueWorker', function() { }); it('should reject a task owned by the current worker', function(done) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); qw.setTaskSpec(th.validBasicTaskSpec); testRef = tasksRef.push({ '_state': th.validBasicTaskSpec.inProgressState, @@ -538,8 +546,8 @@ describe('QueueWorker', function() { }); }); - it('should reject a task owned by the current worker and reset more retries are specified', function(done) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + it('should reject a task owned by the current worker and reset if more retries are specified', function(done) { + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); qw.setTaskSpec(th.validTaskSpecWithRetries); testRef = tasksRef.push({ '_state': th.validTaskSpecWithRetries.inProgressState, @@ -579,7 +587,7 @@ describe('QueueWorker', function() { }); it('should reject a task owned by the current worker and a non-standard error state', function(done) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); qw.setTaskSpec(th.validTaskSpecWithErrorState); testRef = tasksRef.push({ '_state': th.validBasicTaskSpec.inProgressState, @@ -617,7 +625,7 @@ describe('QueueWorker', function() { [NaN, Infinity, true, false, 0, 1, ['foo', 'bar'], { foo: 'bar' }, { foo: 'bar' }, { foo: { bar: { baz: true } } }, _.noop].forEach(function(nonStringObject) { it('should reject a task owned by the current worker and convert the error to a string if not a string: ' + nonStringObject, function(done) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); qw.setTaskSpec(th.validBasicTaskSpec); testRef = tasksRef.push({ '_state': th.validBasicTaskSpec.inProgressState, @@ -656,7 +664,7 @@ describe('QueueWorker', function() { }); it('should reject a task owned by the current worker and append the error string to the _error_details', function(done) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); var error = 'My error message'; qw.setTaskSpec(th.validBasicTaskSpec); testRef = tasksRef.push({ @@ -694,8 +702,88 @@ describe('QueueWorker', function() { }); }); + it('should reject a task owned by the current worker and append the error string and stack to the _error_details', function(done) { + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); + var error = new Error('My error message'); + qw.setTaskSpec(th.validBasicTaskSpec); + testRef = tasksRef.push({ + '_state': th.validBasicTaskSpec.inProgressState, + '_state_changed': new Date().getTime(), + '_owner': qw.processId + ':' + qw.taskNumber, + '_progress': 0 + }, function(errorA) { + if (errorA) { + return done(errorA); + } + qw.currentTaskRef = testRef; + var initial = true; + testRef.on('value', function(snapshot) { + if (initial) { + initial = false; + qw._reject(qw.taskNumber)(error); + } else { + try { + var task = snapshot.val(); + expect(task).to.have.all.keys(['_state', '_progress', '_state_changed', '_error_details']); + expect(task['_state']).to.equal('error'); + expect(task['_state_changed']).to.be.closeTo(new Date().getTime() + th.offset, 250); + expect(task['_progress']).to.equal(0); + expect(task['_error_details']).to.have.all.keys(['previous_state', 'error', 'attempts', 'error_stack']); + expect(task['_error_details'].previous_state).to.equal(th.validBasicTaskSpec.inProgressState); + expect(task['_error_details'].attempts).to.equal(1); + expect(task['_error_details'].error).to.equal(error.message); + expect(task['_error_details'].error_stack).to.be.a.string; + done(); + } catch (errorB) { + done(errorB); + } + } + }); + }); + }); + + it('should reject a task owned by the current worker and append the error string to the _error_details', function(done) { + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); + qw.suppressStack = true; + var error = new Error('My error message'); + qw.setTaskSpec(th.validBasicTaskSpec); + testRef = tasksRef.push({ + '_state': th.validBasicTaskSpec.inProgressState, + '_state_changed': new Date().getTime(), + '_owner': qw.processId + ':' + qw.taskNumber, + '_progress': 0 + }, function(errorA) { + if (errorA) { + return done(errorA); + } + qw.currentTaskRef = testRef; + var initial = true; + testRef.on('value', function(snapshot) { + if (initial) { + initial = false; + qw._reject(qw.taskNumber)(error); + } else { + try { + var task = snapshot.val(); + expect(task).to.have.all.keys(['_state', '_progress', '_state_changed', '_error_details']); + expect(task['_state']).to.equal('error'); + expect(task['_state_changed']).to.be.closeTo(new Date().getTime() + th.offset, 250); + expect(task['_progress']).to.equal(0); + expect(task['_error_details']).to.have.all.keys(['previous_state', 'error', 'attempts']); + expect(task['_error_details'].previous_state).to.equal(th.validBasicTaskSpec.inProgressState); + expect(task['_error_details'].attempts).to.equal(1); + expect(task['_error_details'].error).to.equal(error.message); + done(); + } catch (errorB) { + done(errorB); + } + } + }); + }); + }); + it('should not reject a task that no longer exists', function(done) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); qw.setTaskSpec(th.validTaskSpecWithFinishedState); testRef = tasksRef.push(); qw.currentTaskRef = testRef; @@ -712,7 +800,7 @@ describe('QueueWorker', function() { }); it('should not reject a task if it is no longer owned by the current worker', function(done) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); var originalTask = { '_state': th.validTaskSpecWithFinishedState.inProgressState, '_state_changed': new Date().getTime(), @@ -739,7 +827,7 @@ describe('QueueWorker', function() { }); it('should not reject a task if it is has already changed state', function(done) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); var originalTask = { '_state': th.validTaskSpecWithFinishedState.finishedState, '_state_changed': new Date().getTime(), @@ -766,7 +854,7 @@ describe('QueueWorker', function() { }); it('should not reject a task if it is has no state', function(done) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); var originalTask = { '_state_changed': new Date().getTime(), '_owner': qw.processId + ':' + qw.taskNumber, @@ -792,7 +880,7 @@ describe('QueueWorker', function() { }); it('should not reject a task if it is no longer being processed', function(done) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); var originalTask = { '_state': th.validTaskSpecWithFinishedState.inProgressState, '_state_changed': new Date().getTime(), @@ -818,7 +906,7 @@ describe('QueueWorker', function() { }); it('should not reject a task if a new task is being processed', function(done) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); var originalTask = { '_state': th.validTaskSpecWithFinishedState.inProgressState, '_state_changed': new Date().getTime(), @@ -851,7 +939,7 @@ describe('QueueWorker', function() { var qw; beforeEach(function() { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); qw._tryToProcess = _.noop; }); @@ -934,7 +1022,7 @@ describe('QueueWorker', function() { var qw; beforeEach(function() { - qw = new th.QueueWorker(tasksRef, '0', true, _.noop); + qw = new th.QueueWorker(tasksRef, '0', true, false, _.noop); }); afterEach(function() { @@ -984,7 +1072,7 @@ describe('QueueWorker', function() { }); it('should try and process a task if not busy, rejecting it if it throws', function(done) { - qw = new th.QueueWorker(tasksRef, '0', true, function(data, progress, resolve, reject) { + qw = new th.QueueWorker(tasksRef, '0', true, false, function(data, progress, resolve, reject) { throw new Error('Error thrown in processingFunction'); }); qw.startState = th.validTaskSpecWithStartState.startState; @@ -1014,10 +1102,11 @@ describe('QueueWorker', function() { expect(task['_state']).to.equal('error'); expect(task['_state_changed']).to.be.closeTo(new Date().getTime() + th.offset, 250); expect(task['_progress']).to.equal(0); - expect(task['_error_details']).to.have.all.keys(['previous_state', 'attempts', 'error']); + expect(task['_error_details']).to.have.all.keys(['previous_state', 'attempts', 'error', 'error_stack']); expect(task['_error_details'].previous_state).to.equal(th.validTaskSpecWithStartState.inProgressState); expect(task['_error_details'].attempts).to.equal(1); expect(task['_error_details'].error).to.equal('Error thrown in processingFunction'); + expect(task['_error_details'].error_stack).to.be.a.string; done(); } catch (errorC) { done(errorC); @@ -1052,8 +1141,9 @@ describe('QueueWorker', function() { }); }); - it('should not try and process a task if not a plain object', function(done) { + it('should not try and process a task if not a plain object [1]', function(done) { qw.inProgressState = th.validTaskSpecWithStartState.inProgressState; + qw.suppressStack = true; var testRef = tasksRef.push('invalid', function(errorA) { if (errorA) { return done(errorA); @@ -1083,6 +1173,38 @@ describe('QueueWorker', function() { }); }); + it('should not try and process a task if not a plain object [2]', function(done) { + qw.inProgressState = th.validTaskSpecWithStartState.inProgressState; + var testRef = tasksRef.push('invalid', function(errorA) { + if (errorA) { + return done(errorA); + } + qw._tryToProcess(testRef).then(function() { + try { + expect(qw.currentTaskRef).to.be.null; + expect(qw.busy).to.be.false; + testRef.once('value', function(snapshot) { + try { + var task = snapshot.val(); + expect(task).to.have.all.keys(['_error_details', '_state', '_state_changed']); + expect(task['_error_details']).to.have.all.keys(['error', 'original_task', 'error_stack']); + expect(task['_error_details'].error).to.equal('Task was malformed'); + expect(task['_error_details'].original_task).to.equal('invalid'); + expect(task['_error_details'].error_stack).to.be.a.string; + expect(task['_state']).to.equal('error'); + expect(task['_state_changed']).to.be.closeTo(new Date().getTime() + th.offset, 250); + done(); + } catch (errorB) { + done(errorB); + } + }); + } catch (errorC) { + done(errorC); + } + }).catch(done); + }); + }); + it('should not try and process a task if no longer in correct startState', function(done) { qw.startState = th.validTaskSpecWithStartState.startState; qw.inProgressState = th.validTaskSpecWithStartState.inProgressState; @@ -1137,7 +1259,7 @@ describe('QueueWorker', function() { }); it('should sanitize data passed to the processing function when specified', function(done) { - qw = new th.QueueWorker(tasksRef, '0', true, function(data, progress, resolve, reject) { + qw = new th.QueueWorker(tasksRef, '0', true, false, function(data, progress, resolve, reject) { try { expect(data).to.have.all.keys(['foo']); done(); @@ -1150,7 +1272,7 @@ describe('QueueWorker', function() { }) it('should not sanitize data passed to the processing function when specified', function(done) { - qw = new th.QueueWorker(tasksRef, '0', false, function(data, progress, resolve, reject) { + qw = new th.QueueWorker(tasksRef, '0', false, false, function(data, progress, resolve, reject) { try { expect(data).to.have.all.keys(['foo', '_owner', '_progress', '_state', '_state_changed']); done(); @@ -1169,7 +1291,7 @@ describe('QueueWorker', function() { beforeEach(function() { clock = sinon.useFakeTimers(new Date().getTime()); - qw = new th.QueueWorkerWithoutProcessing(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessing(tasksRef, '0', true, false, _.noop); }); afterEach(function(done) { @@ -1422,7 +1544,7 @@ describe('QueueWorker', function() { var qw; before(function() { - qw = new th.QueueWorker(tasksRef, '0', true, _.noop); + qw = new th.QueueWorker(tasksRef, '0', true, false, _.noop); }); it('should not accept a non-plain object as a valid task spec', function() { @@ -1581,7 +1703,7 @@ describe('QueueWorker', function() { it('should reset the worker when called with an invalid task spec', function() { ['', 'foo', NaN, Infinity, true, false, null, undefined, 0, -1, 10, ['foo', 'bar'], { foo: 'bar' }, { foo: 'bar' }, { foo: { bar: { baz: true } } }, _.noop].forEach(function(invalidTaskSpec) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); var oldTaskNumber = qw.taskNumber; qw.setTaskSpec(invalidTaskSpec); expect(qw.taskNumber).to.not.equal(oldTaskNumber); @@ -1597,7 +1719,7 @@ describe('QueueWorker', function() { it('should reset the worker when called with an invalid task spec after a valid task spec', function() { ['', 'foo', NaN, Infinity, true, false, null, undefined, 0, -1, 10, ['foo', 'bar'], { foo: 'bar' }, { foo: 'bar' }, { foo: { bar: { baz: true } } }, _.noop].forEach(function(invalidTaskSpec) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); qw.setTaskSpec(th.validBasicTaskSpec); var oldTaskNumber = qw.taskNumber; qw.setTaskSpec(invalidTaskSpec); @@ -1614,7 +1736,7 @@ describe('QueueWorker', function() { it('should reset the worker when called with an invalid task spec after a valid task spec with everythin', function() { ['', 'foo', NaN, Infinity, true, false, null, undefined, 0, -1, 10, ['foo', 'bar'], { foo: 'bar' }, { foo: 'bar' }, { foo: { bar: { baz: true } } }, _.noop].forEach(function(invalidTaskSpec) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); qw.setTaskSpec(th.validTaskSpecWithEverything); var oldTaskNumber = qw.taskNumber; qw.setTaskSpec(invalidTaskSpec); @@ -1630,7 +1752,7 @@ describe('QueueWorker', function() { }); it('should reset a worker when called with a basic valid task spec', function() { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); var oldTaskNumber = qw.taskNumber; qw.setTaskSpec(th.validBasicTaskSpec); expect(qw.taskNumber).to.not.equal(oldTaskNumber); @@ -1644,7 +1766,7 @@ describe('QueueWorker', function() { }); it('should reset a worker when called with a valid task spec with a startState', function() { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); var oldTaskNumber = qw.taskNumber; qw.setTaskSpec(th.validTaskSpecWithStartState); expect(qw.taskNumber).to.not.equal(oldTaskNumber); @@ -1658,7 +1780,7 @@ describe('QueueWorker', function() { }); it('should reset a worker when called with a valid task spec with a finishedState', function() { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); var oldTaskNumber = qw.taskNumber; qw.setTaskSpec(th.validTaskSpecWithFinishedState); expect(qw.taskNumber).to.not.equal(oldTaskNumber); @@ -1672,7 +1794,7 @@ describe('QueueWorker', function() { }); it('should reset a worker when called with a valid task spec with a timeout', function() { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); var oldTaskNumber = qw.taskNumber; qw.setTaskSpec(th.validTaskSpecWithTimeout); expect(qw.taskNumber).to.not.equal(oldTaskNumber); @@ -1686,7 +1808,7 @@ describe('QueueWorker', function() { }); it('should reset a worker when called with a valid task spec with everything', function() { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); var oldTaskNumber = qw.taskNumber; qw.setTaskSpec(th.validTaskSpecWithEverything); expect(qw.taskNumber).to.not.equal(oldTaskNumber); @@ -1700,7 +1822,7 @@ describe('QueueWorker', function() { }); it('should not pick up tasks on the queue not for the current task', function(done) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); qw.setTaskSpec(th.validBasicTaskSpec); var spy = sinon.spy(qw, '_tryToProcess'); tasksRef.once('child_added', function() { @@ -1721,7 +1843,7 @@ describe('QueueWorker', function() { }); it('should pick up tasks on the queue with no "_state" when a task is specified without a startState', function(done) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); qw.setTaskSpec(th.validBasicTaskSpec); var spy = sinon.spy(qw, '_tryToProcess'); var ref = tasksRef.push(); @@ -1739,7 +1861,7 @@ describe('QueueWorker', function() { }); it('should pick up tasks on the queue with the corresponding "_state" when a task is specifies a startState', function(done) { - qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop); + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); qw.setTaskSpec(th.validTaskSpecWithStartState); var spy = sinon.spy(qw, '_tryToProcess'); var ref = tasksRef.push(); @@ -1765,7 +1887,7 @@ describe('QueueWorker', function() { beforeEach(function() { callbackStarted = false; callbackComplete = false; - qw = new th.QueueWorker(tasksRef, '0', true, function(data, progress, resolve, reject) { + qw = new th.QueueWorker(tasksRef, '0', true, false, function(data, progress, resolve, reject) { callbackStarted = true; setTimeout(function() { callbackComplete = true; diff --git a/test/queue.spec.js b/test/queue.spec.js index 6a15d21..98ed222 100644 --- a/test/queue.spec.js +++ b/test/queue.spec.js @@ -76,6 +76,14 @@ describe('Queue', function() { }); }); + _.forEach([NaN, Infinity, '', 'foo', 0, 1, ['foo', 'bar'], { foo: 'bar' }, null, { foo: 'bar' }, { foo: { bar: { baz: true } } }, _.noop], function(nonBooleanObject) { + it('should not create a Queue with a non-boolean suppressStack option specified', function() { + expect(function() { + new th.Queue(th.testRef, { suppressStack: nonBooleanObject }, _.noop); + }).to.throw('options.suppressStack must be a boolean.'); + }); + }); + _.forEach(_.range(1, 20), function(numWorkers) { it('should create a Queue with ' + numWorkers + ' workers when specified in options.numWorkers', function() { var q = new th.Queue(th.testRef, { numWorkers: numWorkers }, _.noop); @@ -108,6 +116,13 @@ describe('Queue', function() { }); }); + [true, false].forEach(function(bool) { + it('should create a Queue with a ' + bool + ' suppressStack option when specified', function() { + var q = new th.Queue(th.testRef, { suppressStack: bool }, _.noop) + expect(q.suppressStack).to.equal(bool); + }); + }); + it('should not create a Queue when initialized with 4 parameters', function() { expect(function() { new th.Queue(th.testRef, {}, _.noop, null);