Skip to content
This repository was archived by the owner on Mar 17, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 123 additions & 114 deletions src/lib/queue_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ QueueWorker.prototype._resolve = function(taskNumber) {
}
deferred.resolve();
self.busy = false;
self._tryToProcess(self.nextTaskRef);
self._tryToProcess();
} else {
var existedBefore;
self.currentTaskRef.transaction(function(task) {
Expand Down Expand Up @@ -208,7 +208,7 @@ QueueWorker.prototype._resolve = function(taskNumber) {
}
deferred.resolve();
self.busy = false;
self._tryToProcess(self.nextTaskRef);
self._tryToProcess();
}
}, false);
}
Expand Down Expand Up @@ -249,7 +249,7 @@ QueueWorker.prototype._reject = function(taskNumber) {
}
deferred.resolve();
self.busy = false;
self._tryToProcess(self.nextTaskRef);
self._tryToProcess();
} else {
if (_.isError(error)) {
errorString = error.message;
Expand Down Expand Up @@ -320,7 +320,7 @@ QueueWorker.prototype._reject = function(taskNumber) {
}
deferred.resolve();
self.busy = false;
self._tryToProcess(self.nextTaskRef);
self._tryToProcess();
}
}, false);
}
Expand Down Expand Up @@ -394,10 +394,8 @@ QueueWorker.prototype._updateProgress = function(taskNumber) {

/**
* Attempts to claim the next task in the queue.
* @param {Firebase} nextTaskRef Reference to the Firebase location of the next
* task.
*/
QueueWorker.prototype._tryToProcess = function(nextTaskRef, deferred) {
QueueWorker.prototype._tryToProcess = function(deferred) {
var self = this,
retries = 0,
malformed = false;
Expand All @@ -414,115 +412,128 @@ QueueWorker.prototype._tryToProcess = function(nextTaskRef, deferred) {
logger.debug(self._getLogEntry('finished shutdown'));
self.shutdownDeferred.resolve();
} else {
nextTaskRef.transaction(function(task) {
/* istanbul ignore if */
if (_.isNull(task)) {
return task;
}
if (!_.isPlainObject(task)) {
malformed = true;
var error = new Error('Task was malformed');
var errorStack = null;
if (!self.suppressStack) {
errorStack = error.stack;
}
return {
_state: self.errorState,
_state_changed: Firebase.ServerValue.TIMESTAMP,
_error_details: {
error: error.message,
original_task: task,
error_stack: errorStack
}
};
}
task._id = nextTaskRef.key();
if (_.isUndefined(task._state)) {
task._state = null;
}
if (task._state === self.startState) {
task._state = self.inProgressState;
task._state_changed = Firebase.ServerValue.TIMESTAMP;
task._owner = self.processId + ':' + (self.taskNumber + 1);
task._progress = 0;
return task;
} else {
return;
}
}, function(error, committed, snapshot) {
/* istanbul ignore if */
if (error) {
if (++retries < MAX_TRANSACTION_ATTEMPTS) {
logger.debug(self._getLogEntry('errored while attempting to claim' +
' a new task, retrying'), error);
return setImmediate(self._tryToProcess.bind(self), nextTaskRef,
deferred);
} else {
var errorMsg = 'errored while attempting to claim a new task too ' +
'many times, no longer retrying';
logger.debug(self._getLogEntry(errorMsg), error);
return deferred.reject(new Error(errorMsg));
if (!self.newTaskRef) {
deferred.resolve();
} else {
self.newTaskRef.once('value', function(snapshot) {
if (!snapshot.exists()) {
return deferred.resolve();
}
} else if (committed && snapshot.exists()) {
if (malformed) {
logger.debug(self._getLogEntry('found malformed entry ' +
snapshot.key()));
} else {
var nextTaskRef;
snapshot.forEach(function(childSnap) {
nextTaskRef = childSnap.ref();
});
nextTaskRef.transaction(function(task) {
/* istanbul ignore if */
if (self.busy) {
// Worker has become busy while the transaction was processing -
// so give up the task for now so another worker can claim it
self._resetTask(nextTaskRef);
} else {
self.busy = true;
self.taskNumber += 1;
logger.debug(self._getLogEntry('claimed ' + snapshot.key()));
self.currentTaskRef = snapshot.ref();
self.currentTaskListener = self.currentTaskRef
.child('_owner').on('value', function(ownerSnapshot) {
var id = self.processId + ':' + self.taskNumber;
/* istanbul ignore else */
if (ownerSnapshot.val() !== id &&
!_.isNull(self.currentTaskRef) &&
!_.isNull(self.currentTaskListener)) {
self.currentTaskRef.child('_owner').off(
'value',
self.currentTaskListener);
self.currentTaskRef = null;
self.currentTaskListener = null;
if (_.isNull(task)) {
return task;
}
if (!_.isPlainObject(task)) {
malformed = true;
var error = new Error('Task was malformed');
var errorStack = null;
if (!self.suppressStack) {
errorStack = error.stack;
}
return {
_state: self.errorState,
_state_changed: Firebase.ServerValue.TIMESTAMP,
_error_details: {
error: error.message,
original_task: task,
error_stack: errorStack
}
});
var data = snapshot.val();
if (self.sanitize) {
[
'_state',
'_state_changed',
'_owner',
'_progress',
'_error_details',
'_id'
].forEach(function(reserved) {
if (snapshot.hasChild(reserved)) {
delete data[reserved];
}
});
};
}
if (_.isUndefined(task._state)) {
task._state = null;
}
if (task._state === self.startState) {
task._id = nextTaskRef.key();
task._state = self.inProgressState;
task._state_changed = Firebase.ServerValue.TIMESTAMP;
task._owner = self.processId + ':' + (self.taskNumber + 1);
task._progress = 0;
return task;
} else {
logger.debug(self._getLogEntry('task no longer in correct state: expected ' + self.startState + ', got ' + task._state));
return;
}
}, function(error, committed, snapshot) {
/* istanbul ignore if */
if (error) {
if (++retries < MAX_TRANSACTION_ATTEMPTS) {
logger.debug(self._getLogEntry('errored while attempting to claim' +
' a new task, retrying'), error);
return setImmediate(self._tryToProcess.bind(self), deferred);
} else {
var errorMsg = 'errored while attempting to claim a new task too ' +
'many times, no longer retrying';
logger.debug(self._getLogEntry(errorMsg), error);
return deferred.reject(new Error(errorMsg));
}
var progress = self._updateProgress(self.taskNumber);
var resolve = self._resolve(self.taskNumber);
var reject = self._reject(self.taskNumber);
setImmediate(function() {
try {
self.processingFunction.call(null, data, progress, resolve,
reject);
} catch (error) {
reject(error);
} else if (committed && snapshot.exists()) {
if (malformed) {
logger.debug(self._getLogEntry('found malformed entry ' +
snapshot.key()));
} else {
/* istanbul ignore if */
if (self.busy) {
// Worker has become busy while the transaction was processing -
// so give up the task for now so another worker can claim it
self._resetTask(nextTaskRef);
} else {
self.busy = true;
self.taskNumber += 1;
logger.debug(self._getLogEntry('claimed ' + snapshot.key()));
self.currentTaskRef = snapshot.ref();
self.currentTaskListener = self.currentTaskRef
.child('_owner').on('value', function(ownerSnapshot) {
var id = self.processId + ':' + self.taskNumber;
/* istanbul ignore else */
if (ownerSnapshot.val() !== id &&
!_.isNull(self.currentTaskRef) &&
!_.isNull(self.currentTaskListener)) {
self.currentTaskRef.child('_owner').off(
'value',
self.currentTaskListener);
self.currentTaskRef = null;
self.currentTaskListener = null;
}
});
var data = snapshot.val();
if (self.sanitize) {
[
'_state',
'_state_changed',
'_owner',
'_progress',
'_error_details',
'_id'
].forEach(function(reserved) {
if (snapshot.hasChild(reserved)) {
delete data[reserved];
}
});
}
var progress = self._updateProgress(self.taskNumber);
var resolve = self._resolve(self.taskNumber);
var reject = self._reject(self.taskNumber);
setImmediate(function() {
try {
self.processingFunction.call(null, data, progress, resolve,
reject);
} catch (error) {
reject(error);
}
});
}
});
}
}
}
}
deferred.resolve();
}, false);
deferred.resolve();
}, false);
});
}
}
} else {
deferred.resolve();
Expand Down Expand Up @@ -702,9 +713,8 @@ QueueWorker.prototype.setTaskSpec = function(taskSpec) {
logger.debug(self._getLogEntry('listening'));
self.newTaskListener = self.newTaskRef.on(
'child_added',
function(snapshot) {
self.nextTaskRef = snapshot.ref();
self._tryToProcess(self.nextTaskRef);
function() {
self._tryToProcess();
}, /* istanbul ignore next */ function(error) {
logger.debug(self._getLogEntry('errored listening to Firebase'), error);
});
Expand Down Expand Up @@ -744,4 +754,3 @@ QueueWorker.prototype.shutdown = function() {
};

module.exports = QueueWorker;

Loading