Skip to content
This repository was archived by the owner on Mar 17, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ var queue = new Queue(ref, options, function(data, progress, resolve, reject) {

#### `resolve()`

A callback function for reporting that the current task has been completed and the worker is ready to process another task. If the current task specification has a `finished_state`, any plain JavaScript object passed into the `resolve()` function will be written to the `tasks` location and will be available to the next task if the tasks are chained. When a task is resolved, the `_progress` field is updated to 100.
A callback function for reporting that the current task has been completed and the worker is ready to process another task. Any plain JavaScript object passed into the `resolve()` function will be written to the `tasks` location and will be available to the next task if the tasks are chained. When a task is resolved, the `_progress` field is updated to 100 and the `_state` is replaced with either the `_new_state` key of the object passed in, or the `finished_state` of the task spec. If the task does not have a `finished_state` or the `_new_state` key is set to `false`, the task will be removed from the queue.

#### `reject()`

Expand Down Expand Up @@ -295,7 +295,7 @@ A default spec configuration is assumed if no specs are specified in the `specs`

- `start_state` - The default spec has no `start_state`, which means any task pushed into the `tasks` subtree without a `_state` key will be picked up by default spec workers. If `start_state` is specified, only tasks with that `_state` may be claimed by the worker.
- `in_progress_state` - When a worker picks up a task and begins processing it, it will change the tasks's `_state` to the value of `in_progress_state`. This is the only required spec property, and it cannot equal the `start_state`, `finished_state`, or `error_state`.
- `finished_state` - The default spec has no `finished_state` so the worker will remove tasks from the queue upon successful completion. If `finished_state` is specified, then the task's `_state` value will be updated to the `finished_state` upon task completion. Setting this value to another spec's `start_state` is useful for chaining tasks together to create a job.
- `finished_state` - The default spec has no `finished_state` so the worker will remove tasks from the queue upon successful completion. If `finished_state` is specified, then the task's `_state` value will be updated to the `finished_state` upon task completion. Setting this value to another spec's `start_state` is useful for chaining tasks together to create a job. It's possible to override the `finished_state` on a per-task basis by setting the `_new_state` key of the object passed into `resolve()` to a string to set the `_state` explicitly, `null` to remove the `_state` so it gets picked up by any spec without a `start_state`, or `false` to remove the task from the queue.
- `error_state` - If the task gets rejected the `_state` will be updated to this value and an additional key `_error_details` will be populated with the `previous_state` and an optional error message from the `reject()` callback. If this isn't specified, it defaults to "error". This can be useful for specifying different error states for different tasks, or chaining errors so that they can be logged.
- `timeout` - The default timeout is 5 minutes. When a task has been claimed by a worker but has not completed within `timeout` milliseconds, the queue will report that task as timed out, and reset that task to be claimable once again. If this is not specified, a task claimed by a worker could be orphaned and left in an unclaimable state if the worker processing it dies before the task is resolved or rejected.
- `retries` - The default spec doesn't retry failed tasks. When a task fails, if there are any remaining attempts, the queue will restart the task by setting the task's `_state` to its spec's `start_state`.
Expand Down
32 changes: 16 additions & 16 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,24 @@
"firebase": "2.x"
},
"dependencies": {
"firebase": "2.x",
"lodash": "~3.7.0",
"rsvp": "3.x",
"node-uuid": "1.4.x",
"winston": "1.x"
"firebase": "^2.4.1",
"lodash": "^4.6.1",
"rsvp": "^3.2.1",
"node-uuid": "^1.4.7",
"winston": "^2.2.0"
},
"devDependencies": {
"chai": "2.2.0",
"chai-as-promised": "5.0.0",
"coveralls": "^2.11.2",
"gulp": "3.8.11",
"gulp-exit": "0.0.2",
"gulp-istanbul": "0.8.1",
"gulp-jshint": "1.10.0",
"gulp-mocha": "2.0.1",
"jshint-stylish": "1.0.1",
"sinon": "1.14.1",
"sinon-chai": "2.7.0"
"chai": "^3.5.0",
"chai-as-promised": "^5.3.0",
"coveralls": "^2.11.8",
"gulp": "^3.9.1",
"gulp-exit": "^0.0.2",
"gulp-istanbul": "^0.10.3",
"gulp-jshint": "^1.10.0",
"gulp-mocha": "^2.2.0",
"jshint-stylish": "^1.0.1",
"sinon": "^1.17.3",
"sinon-chai": "^2.8.0"
},
"scripts": {
"test": "gulp test",
Expand Down
28 changes: 18 additions & 10 deletions src/lib/queue_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,18 +171,26 @@ QueueWorker.prototype._resolve = function(taskNumber) {
var id = self.processId + ':' + self.taskNumber;
if (task._state === self.inProgressState &&
task._owner === id) {
if (_.isNull(self.finishedState)) {
return null;
var outputTask = _.clone(newTask);
if (!_.isPlainObject(outputTask)) {
outputTask = {};
}
if (!_.isPlainObject(newTask)) {
newTask = {};
outputTask._state = _.get(outputTask, '_new_state');
delete outputTask._new_state;
if (!_.isNull(outputTask._state) && !_.isString(outputTask._state)) {
if (_.isNull(self.finishedState) || outputTask._state === false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment about the explicit false API just because it's not necessarily intuitive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

// Remove the item if no `finished_state` set in the spec or
// _new_state is explicitly set to `false`.
return null;
} else {
outputTask._state = self.finishedState;
}
}
newTask._state = self.finishedState;
newTask._state_changed = Firebase.ServerValue.TIMESTAMP;
newTask._owner = null;
newTask._progress = 100;
newTask._error_details = null;
return newTask;
outputTask._state_changed = Firebase.ServerValue.TIMESTAMP;
outputTask._owner = null;
outputTask._progress = 100;
outputTask._error_details = null;
return outputTask;
} else {
return;
}
Expand Down
149 changes: 149 additions & 0 deletions test/lib/queue_worker.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,155 @@ describe('QueueWorker', function() {
});
});

it('should resolve a task owned by the current worker and change the state to a provided valid string _new_state', function(done) {
qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop);
qw.setTaskSpec(th.validTaskSpecWithFinishedState);
testRef = tasksRef.push({
'_state': th.validTaskSpecWithFinishedState.inProgressState,
'_state_changed': new Date().getTime(),
'_owner': qw.processId + ':' + qw.taskNumber,
'_progress': 0
}, function(errorA) {
if (errorA) {
return done(errorA);
}
qw.currentTaskRef = testRef;
var initial = true;
testRef.on('value', function(snapshot) {
if (initial) {
initial = false;
qw._resolve(qw.taskNumber)({
foo: 'bar',
_new_state: 'valid_new_state'
});
} else {
try {
var task = snapshot.val();
expect(task).to.have.all.keys(['_state', '_state_changed', '_progress', 'foo']);
expect(task['_progress']).to.equal(100);
expect(task['_state']).to.equal('valid_new_state');
expect(task['_state_changed']).to.be.closeTo(new Date().getTime() + th.offset, 250);
expect(task.foo).to.equal('bar');
done();
} catch (errorB) {
done(errorB);
}
}
});
});
});

it('should resolve a task owned by the current worker and change the state to a provided valid null _new_state', function(done) {
qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop);
qw.setTaskSpec(th.validTaskSpecWithFinishedState);
testRef = tasksRef.push({
'_state': th.validTaskSpecWithFinishedState.inProgressState,
'_state_changed': new Date().getTime(),
'_owner': qw.processId + ':' + qw.taskNumber,
'_progress': 0
}, function(errorA) {
if (errorA) {
return done(errorA);
}
qw.currentTaskRef = testRef;
var initial = true;
testRef.on('value', function(snapshot) {
if (initial) {
initial = false;
qw._resolve(qw.taskNumber)({
foo: 'bar',
_new_state: null
});
} else {
try {
var task = snapshot.val();
expect(task).to.have.all.keys(['_state_changed', '_progress', 'foo']);
expect(task['_progress']).to.equal(100);
expect(task['_state_changed']).to.be.closeTo(new Date().getTime() + th.offset, 250);
expect(task.foo).to.equal('bar');
done();
} catch (errorB) {
done(errorB);
}
}
});
});
});

it('should resolve a task owned by the current worker and remove the task when provided _new_state = false', function(done) {
qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop);
qw.setTaskSpec(th.validTaskSpecWithFinishedState);
testRef = tasksRef.push({
'_state': th.validTaskSpecWithFinishedState.inProgressState,
'_state_changed': new Date().getTime(),
'_owner': qw.processId + ':' + qw.taskNumber,
'_progress': 0
}, function(errorA) {
if (errorA) {
return done(errorA);
}
qw.currentTaskRef = testRef;
var initial = true;
testRef.on('value', function(snapshot) {
if (initial) {
initial = false;
qw._resolve(qw.taskNumber)({
foo: 'bar',
_new_state: false
});
} else {
try {
var task = snapshot.val();
expect(snapshot.val()).to.be.null;
done();
} catch (errorB) {
done(errorB);
}
}
});
});
});

it('should resolve a task owned by the current worker and change the state to finishedState when provided an invalid _new_state', function(done) {
qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop);
qw.setTaskSpec(th.validTaskSpecWithFinishedState);
testRef = tasksRef.push({
'_state': th.validTaskSpecWithFinishedState.inProgressState,
'_state_changed': new Date().getTime(),
'_owner': qw.processId + ':' + qw.taskNumber,
'_progress': 0
}, function(errorA) {
if (errorA) {
return done(errorA);
}
qw.currentTaskRef = testRef;
var initial = true;
testRef.on('value', function(snapshot) {
if (initial) {
initial = false;
qw._resolve(qw.taskNumber)({
foo: 'bar',
_new_state: {
state: 'object_is_an_invalid_new_state'
}
});
} else {
try {
var task = snapshot.val();
expect(task).to.have.all.keys(['_state', '_state_changed', '_progress', 'foo']);
expect(task['_progress']).to.equal(100);
expect(task['_state']).to.equal(th.validTaskSpecWithFinishedState.finishedState);
expect(task['_state_changed']).to.be.closeTo(new Date().getTime() + th.offset, 250);
expect(task.foo).to.equal('bar');
done();
} catch (errorB) {
done(errorB);
}
}
});
});
});

it('should not resolve a task that no longer exists', function(done) {
qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop);
qw.setTaskSpec(th.validTaskSpecWithFinishedState);
Expand Down