Skip to content
This repository was archived by the owner on Mar 17, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,16 @@ Queue workers can take an optional options object to specify:
- `specId` - specifies the spec type for this worker. This is important when creating multiple specs. Defaults to `null` which uses the default spec.
- `numWorkers` - specifies the number of workers to run simultaneously for this node.js thread. Defaults to 1 worker.
- `sanitize` - specifies whether the `data` object passed to the processing function is sanitized of internal keys reserved for use by the queue. Defaults to `true`.
- `suppressStack` - specifies whether the queue will suppress error stack traces from being placed in the `_error_details` of the task if it's rejected with an Error.

```js
...

var options = {
'specId': 'spec_1',
'numWorkers': 5,
'sanitize': false
'sanitize': false,
'suppressStack': true
};
var queue = new Queue(ref, options, function(data, progress, resolve, reject) {
...
Expand Down Expand Up @@ -131,7 +133,7 @@ The reserved keys are:
- `_state_changed` - The timestamp that the task changed into its current state. This will always be the server time when the processing function was called.
- `_owner` - A unique ID for the worker and task number combination to ensure only one worker is responsible for the task at any time.
- `_progress` - A number between 0 and 100, reset at the start of each task to 0.
- `_error_details` - An object containing the error details from a previous task execution. If present, it may contain a `previous_state` string (or `null` if there was no previous state, in the case of malformed input) capturing the state the task was in when it errored, an `error` string from the `reject()` callback of the previous task, and an `attempts` field containing the number of retries attempted before failing a task.
- `_error_details` - An object containing the error details from a previous task execution. If present, it may contain a `previous_state` string (or `null` if there was no previous state, in the case of malformed input) capturing the state the task was in when it errored, an `error` string from the `reject()` callback of the previous task, and an `attempts` field containing the number of retries attempted before failing a task. If the `suppressStack` queue option is not set to `true`, there may also be a `error_stack` field containg a stack dump of any error passed into the `reject()` function.

By default the data is sanitized of these keys, but you can disable this behavior by setting `'sanitize': false` in the [queue options](#queue-worker-options-optional).

Expand Down Expand Up @@ -167,7 +169,7 @@ A callback function for reporting that the current task has been completed and t

#### `reject()`

A callback function for reporting that the current task failed and the worker is ready to process another task. Once this is called, the task will go into the `error_state` for the job with an additional `_error_details` object containing a `previous_state` key referencing this task's `in_progress_state`. If a string is passed into the `reject()` function, the `_error_details` will also contain an `error` key containing that string. Note that if retries are enabled and there are remaining attempts, the task will be restarted in it's spec's `start_state`.
A callback function for reporting that the current task failed and the worker is ready to process another task. Once this is called, the task will go into the `error_state` for the job with an additional `_error_details` object containing a `previous_state` key referencing this task's `in_progress_state`. If a string is passed into the `reject()` function, the `_error_details` will also contain an `error` key containing that string. If an Error is passed into the `reject()` function, the `error` key will contain the `error.message`, and if `suppressStack` option has not been specified the `error_stack` key will contain the `error.stack`. Note that if retries are enabled and there are remaining attempts, the task will be restarted in it's spec's `start_state`.

## Queue Security

Expand Down Expand Up @@ -211,6 +213,9 @@ These don't have to use a custom token, for instance you could use `auth != null
"error": {
".validate": "newData.isString()"
},
"error_stack": {
".validate": "newData.isString()"
},
"previous_state": {
".validate": "newData.isString()"
},
Expand Down
2 changes: 2 additions & 0 deletions changelog.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
feature - Better error handling, including the ability to handle Error objects in the `reject()` callback function
feature - Reporting Error stacktraces in the `_error_details` of a task, unless the `suppressStack` queue option is specified
54 changes: 39 additions & 15 deletions src/lib/queue_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var MAX_TRANSACTION_ATTEMPTS = 10,
* task is claimed.
* @return {Object}
*/
function QueueWorker(tasksRef, processId, sanitize, processingFunction) {
function QueueWorker(tasksRef, processId, sanitize, suppressStack, processingFunction) {
var self = this,
error;
if (_.isUndefined(tasksRef)) {
Expand All @@ -35,6 +35,11 @@ function QueueWorker(tasksRef, processId, sanitize, processingFunction) {
logger.debug('QueueWorker(): ' + error);
throw new Error(error);
}
if (!_.isBoolean(suppressStack)) {
error = 'Invalid suppressStack option.';
logger.debug('QueueWorker(): ' + error);
throw new Error(error);
}
if (!_.isFunction(processingFunction)) {
error = 'No processing function provided.';
logger.debug('QueueWorker(): ' + error);
Expand Down Expand Up @@ -62,6 +67,7 @@ function QueueWorker(tasksRef, processId, sanitize, processingFunction) {
self.taskNumber = 0;
self.errorState = DEFAULT_ERROR_STATE;
self.sanitize = sanitize;
self.suppressStack = suppressStack;

return self;
}
Expand Down Expand Up @@ -113,7 +119,7 @@ QueueWorker.prototype._resetTask = function(taskRef, deferred) {
} else {
var errorMsg = 'reset task errored too many times, no longer retrying';
logger.debug(self._getLogEntry(errorMsg), error);
deferred.reject(errorMsg);
deferred.reject(new Error(errorMsg));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for this? Passing a plain string into reject() should work as well correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a fan of using Error because it allows you to attach other
properties, etc. You could put the number of errors, etc. Also could create
custom errors for these scenarios. But yes, a string works too.

On Tue, Jun 16, 2015 at 10:20 AM, Matthew Tse notifications@github.com
wrote:

In src/lib/queue_worker.js
#24 (comment):

@@ -113,7 +119,7 @@ QueueWorker.prototype._resetTask = function(taskRef, deferred) {
} else {
var errorMsg = 'reset task errored too many times, no longer retrying';
logger.debug(self._getLogEntry(errorMsg), error);

  •    deferred.reject(errorMsg);
    
  •    deferred.reject(new Error(errorMsg));
    

What's the reason for this? Passing a plain string into reject() should
work as well correct?


Reply to this email directly or view it on GitHub
https://github.com/firebase/firebase-queue/pull/24/files#r32546129.

}
} else {
if (committed && snapshot.exists()) {
Expand Down Expand Up @@ -191,7 +197,7 @@ QueueWorker.prototype._resolve = function(taskNumber) {
var errorMsg = 'resolve task errored too many times, no longer ' +
'retrying';
logger.debug(self._getLogEntry(errorMsg), error);
deferred.reject(errorMsg);
deferred.reject(new Error(errorMsg));
}
} else {
if (committed && existedBefore) {
Expand Down Expand Up @@ -222,6 +228,7 @@ QueueWorker.prototype._reject = function(taskNumber) {
var self = this,
retries = 0,
errorString = null,
errorStack = null,
deferred = RSVP.defer();

/**
Expand All @@ -244,9 +251,18 @@ QueueWorker.prototype._reject = function(taskNumber) {
self.busy = false;
self._tryToProcess(self.nextTaskRef);
} else {
if (!_.isUndefined(error)) {
errorString = '' + error;
if (_.isError(error)) {
errorString = error.message;
} else if (_.isString(error)) {
errorString = error;
} else if (!_.isUndefined(error) && !_.isNull(error)) {
errorString = error.toString();
}

if (!self.suppressStack) {
errorStack = _.get(error, 'stack', null);
}

var existedBefore;
self.currentTaskRef.transaction(function(task) {
existedBefore = true;
Expand All @@ -268,6 +284,7 @@ QueueWorker.prototype._reject = function(taskNumber) {
task._error_details = {
previous_state: self.inProgressState,
error: errorString,
error_stack: errorStack,
attempts: attempts + 1
};
return task;
Expand All @@ -285,7 +302,7 @@ QueueWorker.prototype._reject = function(taskNumber) {
var errorMsg = 'reject task errored too many times, no longer ' +
'retrying';
logger.debug(self._getLogEntry(errorMsg), error);
deferred.reject(errorMsg);
deferred.reject(new Error(errorMsg));
}
} else {
if (committed && existedBefore) {
Expand Down Expand Up @@ -326,12 +343,12 @@ QueueWorker.prototype._updateProgress = function(taskNumber) {
_.isNaN(progress) ||
progress < 0 ||
progress > 100) {
return RSVP.reject('Invalid progress');
return RSVP.reject(new Error('Invalid progress'));
}
if ((taskNumber !== self.taskNumber) || _.isNull(self.currentTaskRef)) {
errorMsg = 'Can\'t update progress - no task currently being processed';
logger.debug(self._getLogEntry(errorMsg));
return RSVP.reject(errorMsg);
return RSVP.reject(new Error(errorMsg));
}
return new RSVP.Promise(function(resolve, reject) {
self.currentTaskRef.transaction(function(task) {
Expand All @@ -352,15 +369,15 @@ QueueWorker.prototype._updateProgress = function(taskNumber) {
if (error) {
errorMsg = 'errored while attempting to update progress';
logger.debug(self._getLogEntry(errorMsg), error);
return reject(errorMsg);
return reject(new Error(errorMsg));
}
if (committed && snapshot.exists()) {
resolve();
} else {
errorMsg = 'Can\'t update progress - current task no longer owned ' +
'by this process';
logger.debug(self._getLogEntry(errorMsg));
return reject(errorMsg);
return reject(new Error(errorMsg));
}
}, false);
});
Expand All @@ -386,7 +403,7 @@ QueueWorker.prototype._tryToProcess = function(nextTaskRef, deferred) {

if (!self.busy) {
if (!_.isNull(self.shutdownDeffered)) {
deferred.reject('Shutting down - can no longer process new tasks');
deferred.reject(new Error('Shutting down - can no longer process new tasks'));
self.setTaskSpec(null);
logger.debug(self._getLogEntry('finished shutdown'));
self.shutdownDeffered.resolve();
Expand All @@ -398,12 +415,18 @@ QueueWorker.prototype._tryToProcess = function(nextTaskRef, deferred) {
}
if (!_.isPlainObject(task)) {
malformed = true;
var error = new Error('Task was malformed');
var errorStack = null;
if (!self.suppressStack) {
errorStack = error.stack;
}
return {
_state: self.errorState,
_state_changed: Firebase.ServerValue.TIMESTAMP,
_error_details: {
error: 'Task was malformed',
original_task: task
error: error.message,
original_task: task,
error_stack: errorStack
}
};
}
Expand Down Expand Up @@ -431,7 +454,7 @@ QueueWorker.prototype._tryToProcess = function(nextTaskRef, deferred) {
var errorMsg = 'errored while attempting to claim a new task too ' +
'many times, no longer retrying';
logger.debug(self._getLogEntry(errorMsg), error);
return deferred.reject(errorMsg);
return deferred.reject(new Error(errorMsg));
}
} else if (committed && snapshot.exists()) {
if (malformed) {
Expand Down Expand Up @@ -484,7 +507,7 @@ QueueWorker.prototype._tryToProcess = function(nextTaskRef, deferred) {
self.processingFunction.call(null, data, progress, resolve,
reject);
} catch (error) {
reject(error.message);
reject(error);
}
});
}
Expand Down Expand Up @@ -713,3 +736,4 @@ QueueWorker.prototype.shutdown = function() {
};

module.exports = QueueWorker;

12 changes: 12 additions & 0 deletions src/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var _ = require('lodash'),

var DEFAULT_NUM_WORKERS = 1,
DEFAULT_SANITIZE = true,
DEFAULT_SUPPRESS_STACK = false,
DEFAULT_TASK_SPEC = {
inProgressState: 'in_progress',
timeout: 300000 // 5 minutes
Expand Down Expand Up @@ -55,6 +56,7 @@ function Queue() {
var error;
self.numWorkers = DEFAULT_NUM_WORKERS;
self.sanitize = DEFAULT_SANITIZE;
self.suppressStack = DEFAULT_SUPPRESS_STACK;
self.initialized = false;

self.specChangeListener = null;
Expand Down Expand Up @@ -104,6 +106,15 @@ function Queue() {
throw new Error(error);
}
}
if (!_.isUndefined(options.suppressStack)) {
if (_.isBoolean(options.suppressStack)) {
self.suppressStack = options.suppressStack;
} else {
error = 'options.suppressStack must be a boolean.';
logger.debug('Queue(): Error during initialization', error);
throw new Error(error);
}
}
self.processingFunction = constructorArguments[2];
} else {
error = 'Queue can only take at most three arguments - queueRef, ' +
Expand All @@ -119,6 +130,7 @@ function Queue() {
self.ref.child('tasks'),
processId,
self.sanitize,
self.suppressStack,
self.processingFunction
));
}
Expand Down
Loading