Skip to content
This repository was archived by the owner on Mar 17, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions src/lib/queue_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ QueueWorker.prototype._resetTask = function(taskRef, deferred) {
} else {
var errorMsg = 'reset task errored too many times, no longer retrying';
logger.debug(self._getLogEntry(errorMsg), error);
deferred.reject(errorMsg);
deferred.reject(new Error(errorMsg));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how useful this being an error would be - won't the stack trace always be an internal Firebase Queue node module trace? Won't they be able to get that information from the error message string itself?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry, was confused for a second between reject and _reject - this could be useful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Event though the stack trace might not be helpful in all cases, it's nice just to be consistent. Also might help later on if more of the internal api is exposed and can be built upon.

}
} else {
if (committed && snapshot.exists()) {
Expand Down Expand Up @@ -191,7 +191,7 @@ QueueWorker.prototype._resolve = function(taskNumber) {
var errorMsg = 'resolve task errored too many times, no longer ' +
'retrying';
logger.debug(self._getLogEntry(errorMsg), error);
deferred.reject(errorMsg);
deferred.reject(new Error(errorMsg));
}
} else {
if (committed && existedBefore) {
Expand Down Expand Up @@ -244,8 +244,11 @@ QueueWorker.prototype._reject = function(taskNumber) {
self.busy = false;
self._tryToProcess(self.nextTaskRef);
} else {
if (!_.isUndefined(error) && !_.isError(error)) {
error = new Error(error);
}
if (!_.isUndefined(error)) {
errorString = '' + error;
errorString = '' + error.message;
}
var existedBefore;
self.currentTaskRef.transaction(function(task) {
Expand All @@ -268,6 +271,7 @@ QueueWorker.prototype._reject = function(taskNumber) {
task._error_details = {
previous_state: self.inProgressState,
error: errorString,
error_stack: (error||{}).stack||null,
attempts: attempts + 1
};
return task;
Expand All @@ -285,7 +289,7 @@ QueueWorker.prototype._reject = function(taskNumber) {
var errorMsg = 'reject task errored too many times, no longer ' +
'retrying';
logger.debug(self._getLogEntry(errorMsg), error);
deferred.reject(errorMsg);
deferred.reject(new Error(errorMsg));
}
} else {
if (committed && existedBefore) {
Expand Down Expand Up @@ -326,12 +330,12 @@ QueueWorker.prototype._updateProgress = function(taskNumber) {
_.isNaN(progress) ||
progress < 0 ||
progress > 100) {
return RSVP.reject('Invalid progress');
return RSVP.reject(new Error('Invalid progress'));
}
if ((taskNumber !== self.taskNumber) || _.isNull(self.currentTaskRef)) {
errorMsg = 'Can\'t update progress - no task currently being processed';
logger.debug(self._getLogEntry(errorMsg));
return RSVP.reject(errorMsg);
return RSVP.reject(new Error(errorMsg));
}
return new RSVP.Promise(function(resolve, reject) {
self.currentTaskRef.transaction(function(task) {
Expand All @@ -352,15 +356,15 @@ QueueWorker.prototype._updateProgress = function(taskNumber) {
if (error) {
errorMsg = 'errored while attempting to update progress';
logger.debug(self._getLogEntry(errorMsg), error);
return reject(errorMsg);
return reject(new Error(errorMsg));
}
if (committed && snapshot.exists()) {
resolve();
} else {
errorMsg = 'Can\'t update progress - current task no longer owned ' +
'by this process';
logger.debug(self._getLogEntry(errorMsg));
return reject(errorMsg);
return reject(new Error(errorMsg));
}
}, false);
});
Expand All @@ -386,7 +390,7 @@ QueueWorker.prototype._tryToProcess = function(nextTaskRef, deferred) {

if (!self.busy) {
if (!_.isNull(self.shutdownDeffered)) {
deferred.reject('Shutting down - can no longer process new tasks');
deferred.reject(new Error('Shutting down - can no longer process new tasks'));
self.setTaskSpec(null);
logger.debug(self._getLogEntry('finished shutdown'));
self.shutdownDeffered.resolve();
Expand Down Expand Up @@ -431,7 +435,7 @@ QueueWorker.prototype._tryToProcess = function(nextTaskRef, deferred) {
var errorMsg = 'errored while attempting to claim a new task too ' +
'many times, no longer retrying';
logger.debug(self._getLogEntry(errorMsg), error);
return deferred.reject(errorMsg);
return deferred.reject(new Error(errorMsg));
}
} else if (committed && snapshot.exists()) {
if (malformed) {
Expand Down Expand Up @@ -484,7 +488,7 @@ QueueWorker.prototype._tryToProcess = function(nextTaskRef, deferred) {
self.processingFunction.call(null, data, progress, resolve,
reject);
} catch (error) {
reject(error.message);
reject(error);
}
});
}
Expand Down Expand Up @@ -713,3 +717,4 @@ QueueWorker.prototype.shutdown = function() {
};

module.exports = QueueWorker;

11 changes: 6 additions & 5 deletions test/lib/queue_worker.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,7 @@ describe('QueueWorker', function() {
} else {
try {
var task = snapshot.val();
console.log('$$', task);
expect(task).to.have.all.keys(['_progress', '_state_changed', '_error_details']);
expect(task['_state_changed']).to.be.closeTo(new Date().getTime() + th.offset, 250);
expect(task['_progress']).to.equal(0);
Expand Down Expand Up @@ -641,7 +642,7 @@ describe('QueueWorker', function() {
expect(task['_state']).to.equal('error');
expect(task['_state_changed']).to.be.closeTo(new Date().getTime() + th.offset, 250);
expect(task['_progress']).to.equal(0);
expect(task['_error_details']).to.have.all.keys(['previous_state', 'error', 'attempts']);
expect(task['_error_details']).to.have.all.keys(['previous_state', 'error', 'attempts', 'error_stack']);
expect(task['_error_details'].previous_state).to.equal(th.validBasicTaskSpec.inProgressState);
expect(task['_error_details'].error).to.equal(nonStringObject.toString());
expect(task['_error_details'].attempts).to.equal(1);
Expand All @@ -657,7 +658,7 @@ describe('QueueWorker', function() {

it('should reject a task owned by the current worker and append the error string to the _error_details', function(done) {
qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, _.noop);
var error = 'My error message';
var error = new Error('My error message');
qw.setTaskSpec(th.validBasicTaskSpec);
testRef = tasksRef.push({
'_state': th.validBasicTaskSpec.inProgressState,
Expand All @@ -681,10 +682,10 @@ describe('QueueWorker', function() {
expect(task['_state']).to.equal('error');
expect(task['_state_changed']).to.be.closeTo(new Date().getTime() + th.offset, 250);
expect(task['_progress']).to.equal(0);
expect(task['_error_details']).to.have.all.keys(['previous_state', 'error', 'attempts']);
expect(task['_error_details']).to.have.all.keys(['previous_state', 'error', 'attempts', 'error_stack']);
expect(task['_error_details'].previous_state).to.equal(th.validBasicTaskSpec.inProgressState);
expect(task['_error_details'].attempts).to.equal(1);
expect(task['_error_details'].error).to.equal(error);
expect(task['_error_details'].error).to.equal(error.message);
done();
} catch (errorB) {
done(errorB);
Expand Down Expand Up @@ -1014,7 +1015,7 @@ describe('QueueWorker', function() {
expect(task['_state']).to.equal('error');
expect(task['_state_changed']).to.be.closeTo(new Date().getTime() + th.offset, 250);
expect(task['_progress']).to.equal(0);
expect(task['_error_details']).to.have.all.keys(['previous_state', 'attempts', 'error']);
expect(task['_error_details']).to.have.all.keys(['previous_state', 'attempts', 'error', 'error_stack']);
expect(task['_error_details'].previous_state).to.equal(th.validTaskSpecWithStartState.inProgressState);
expect(task['_error_details'].attempts).to.equal(1);
expect(task['_error_details'].error).to.equal('Error thrown in processingFunction');
Expand Down