diff --git a/README.md b/README.md index 4e796ca..4575c4a 100644 --- a/README.md +++ b/README.md @@ -168,7 +168,7 @@ var queue = new Queue(ref, options, function(data, progress, resolve, reject) { #### `resolve()` -A callback function for reporting that the current task has been completed and the worker is ready to process another task. If the current task specification has a `finished_state`, any plain JavaScript object passed into the `resolve()` function will be written to the `tasks` location and will be available to the next task if the tasks are chained. When a task is resolved, the `_progress` field is updated to 100. +A callback function for reporting that the current task has been completed and the worker is ready to process another task. Any plain JavaScript object passed into the `resolve()` function will be written to the `tasks` location and will be available to the next task if the tasks are chained. When a task is resolved, the `_progress` field is updated to 100 and the `_state` is replaced with either the `_new_state` key of the object passed in, or the `finished_state` of the task spec. If the task does not have a `finished_state` or the `_new_state` key is set to `false`, the task will be removed from the queue. #### `reject()` @@ -295,7 +295,7 @@ A default spec configuration is assumed if no specs are specified in the `specs` - `start_state` - The default spec has no `start_state`, which means any task pushed into the `tasks` subtree without a `_state` key will be picked up by default spec workers. If `start_state` is specified, only tasks with that `_state` may be claimed by the worker. - `in_progress_state` - When a worker picks up a task and begins processing it, it will change the tasks's `_state` to the value of `in_progress_state`. This is the only required spec property, and it cannot equal the `start_state`, `finished_state`, or `error_state`. -- `finished_state` - The default spec has no `finished_state` so the worker will remove tasks from the queue upon successful completion. If `finished_state` is specified, then the task's `_state` value will be updated to the `finished_state` upon task completion. Setting this value to another spec's `start_state` is useful for chaining tasks together to create a job. +- `finished_state` - The default spec has no `finished_state` so the worker will remove tasks from the queue upon successful completion. If `finished_state` is specified, then the task's `_state` value will be updated to the `finished_state` upon task completion. Setting this value to another spec's `start_state` is useful for chaining tasks together to create a job. It's possible to override the `finished_state` on a per-task basis by setting the `_new_state` key of the object passed into `resolve()` to a string to set the `_state` explicitly, `null` to remove the `_state` so it gets picked up by any spec without a `start_state`, or `false` to remove the task from the queue. - `error_state` - If the task gets rejected the `_state` will be updated to this value and an additional key `_error_details` will be populated with the `previous_state` and an optional error message from the `reject()` callback. If this isn't specified, it defaults to "error". This can be useful for specifying different error states for different tasks, or chaining errors so that they can be logged. - `timeout` - The default timeout is 5 minutes. When a task has been claimed by a worker but has not completed within `timeout` milliseconds, the queue will report that task as timed out, and reset that task to be claimable once again. If this is not specified, a task claimed by a worker could be orphaned and left in an unclaimable state if the worker processing it dies before the task is resolved or rejected. - `retries` - The default spec doesn't retry failed tasks. When a task fails, if there are any remaining attempts, the queue will restart the task by setting the task's `_state` to its spec's `start_state`. diff --git a/package.json b/package.json index d0a04ca..c81c21d 100644 --- a/package.json +++ b/package.json @@ -41,24 +41,24 @@ "firebase": "2.x" }, "dependencies": { - "firebase": "2.x", - "lodash": "~3.7.0", - "rsvp": "3.x", - "node-uuid": "1.4.x", - "winston": "1.x" + "firebase": "^2.4.1", + "lodash": "^4.6.1", + "rsvp": "^3.2.1", + "node-uuid": "^1.4.7", + "winston": "^2.2.0" }, "devDependencies": { - "chai": "2.2.0", - "chai-as-promised": "5.0.0", - "coveralls": "^2.11.2", - "gulp": "3.8.11", - "gulp-exit": "0.0.2", - "gulp-istanbul": "0.8.1", - "gulp-jshint": "1.10.0", - "gulp-mocha": "2.0.1", - "jshint-stylish": "1.0.1", - "sinon": "1.14.1", - "sinon-chai": "2.7.0" + "chai": "^3.5.0", + "chai-as-promised": "^5.3.0", + "coveralls": "^2.11.8", + "gulp": "^3.9.1", + "gulp-exit": "^0.0.2", + "gulp-istanbul": "^0.10.3", + "gulp-jshint": "^1.10.0", + "gulp-mocha": "^2.2.0", + "jshint-stylish": "^1.0.1", + "sinon": "^1.17.3", + "sinon-chai": "^2.8.0" }, "scripts": { "test": "gulp test", diff --git a/src/lib/queue_worker.js b/src/lib/queue_worker.js index 08e69ad..97380b9 100644 --- a/src/lib/queue_worker.js +++ b/src/lib/queue_worker.js @@ -171,18 +171,26 @@ QueueWorker.prototype._resolve = function(taskNumber) { var id = self.processId + ':' + self.taskNumber; if (task._state === self.inProgressState && task._owner === id) { - if (_.isNull(self.finishedState)) { - return null; + var outputTask = _.clone(newTask); + if (!_.isPlainObject(outputTask)) { + outputTask = {}; } - if (!_.isPlainObject(newTask)) { - newTask = {}; + outputTask._state = _.get(outputTask, '_new_state'); + delete outputTask._new_state; + if (!_.isNull(outputTask._state) && !_.isString(outputTask._state)) { + if (_.isNull(self.finishedState) || outputTask._state === false) { + // Remove the item if no `finished_state` set in the spec or + // _new_state is explicitly set to `false`. + return null; + } else { + outputTask._state = self.finishedState; + } } - newTask._state = self.finishedState; - newTask._state_changed = Firebase.ServerValue.TIMESTAMP; - newTask._owner = null; - newTask._progress = 100; - newTask._error_details = null; - return newTask; + outputTask._state_changed = Firebase.ServerValue.TIMESTAMP; + outputTask._owner = null; + outputTask._progress = 100; + outputTask._error_details = null; + return outputTask; } else { return; } diff --git a/test/lib/queue_worker.spec.js b/test/lib/queue_worker.spec.js index 7cc1753..470220b 100644 --- a/test/lib/queue_worker.spec.js +++ b/test/lib/queue_worker.spec.js @@ -346,6 +346,155 @@ describe('QueueWorker', function() { }); }); + it('should resolve a task owned by the current worker and change the state to a provided valid string _new_state', function(done) { + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); + qw.setTaskSpec(th.validTaskSpecWithFinishedState); + testRef = tasksRef.push({ + '_state': th.validTaskSpecWithFinishedState.inProgressState, + '_state_changed': new Date().getTime(), + '_owner': qw.processId + ':' + qw.taskNumber, + '_progress': 0 + }, function(errorA) { + if (errorA) { + return done(errorA); + } + qw.currentTaskRef = testRef; + var initial = true; + testRef.on('value', function(snapshot) { + if (initial) { + initial = false; + qw._resolve(qw.taskNumber)({ + foo: 'bar', + _new_state: 'valid_new_state' + }); + } else { + try { + var task = snapshot.val(); + expect(task).to.have.all.keys(['_state', '_state_changed', '_progress', 'foo']); + expect(task['_progress']).to.equal(100); + expect(task['_state']).to.equal('valid_new_state'); + expect(task['_state_changed']).to.be.closeTo(new Date().getTime() + th.offset, 250); + expect(task.foo).to.equal('bar'); + done(); + } catch (errorB) { + done(errorB); + } + } + }); + }); + }); + + it('should resolve a task owned by the current worker and change the state to a provided valid null _new_state', function(done) { + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); + qw.setTaskSpec(th.validTaskSpecWithFinishedState); + testRef = tasksRef.push({ + '_state': th.validTaskSpecWithFinishedState.inProgressState, + '_state_changed': new Date().getTime(), + '_owner': qw.processId + ':' + qw.taskNumber, + '_progress': 0 + }, function(errorA) { + if (errorA) { + return done(errorA); + } + qw.currentTaskRef = testRef; + var initial = true; + testRef.on('value', function(snapshot) { + if (initial) { + initial = false; + qw._resolve(qw.taskNumber)({ + foo: 'bar', + _new_state: null + }); + } else { + try { + var task = snapshot.val(); + expect(task).to.have.all.keys(['_state_changed', '_progress', 'foo']); + expect(task['_progress']).to.equal(100); + expect(task['_state_changed']).to.be.closeTo(new Date().getTime() + th.offset, 250); + expect(task.foo).to.equal('bar'); + done(); + } catch (errorB) { + done(errorB); + } + } + }); + }); + }); + + it('should resolve a task owned by the current worker and remove the task when provided _new_state = false', function(done) { + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); + qw.setTaskSpec(th.validTaskSpecWithFinishedState); + testRef = tasksRef.push({ + '_state': th.validTaskSpecWithFinishedState.inProgressState, + '_state_changed': new Date().getTime(), + '_owner': qw.processId + ':' + qw.taskNumber, + '_progress': 0 + }, function(errorA) { + if (errorA) { + return done(errorA); + } + qw.currentTaskRef = testRef; + var initial = true; + testRef.on('value', function(snapshot) { + if (initial) { + initial = false; + qw._resolve(qw.taskNumber)({ + foo: 'bar', + _new_state: false + }); + } else { + try { + var task = snapshot.val(); + expect(snapshot.val()).to.be.null; + done(); + } catch (errorB) { + done(errorB); + } + } + }); + }); + }); + + it('should resolve a task owned by the current worker and change the state to finishedState when provided an invalid _new_state', function(done) { + qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); + qw.setTaskSpec(th.validTaskSpecWithFinishedState); + testRef = tasksRef.push({ + '_state': th.validTaskSpecWithFinishedState.inProgressState, + '_state_changed': new Date().getTime(), + '_owner': qw.processId + ':' + qw.taskNumber, + '_progress': 0 + }, function(errorA) { + if (errorA) { + return done(errorA); + } + qw.currentTaskRef = testRef; + var initial = true; + testRef.on('value', function(snapshot) { + if (initial) { + initial = false; + qw._resolve(qw.taskNumber)({ + foo: 'bar', + _new_state: { + state: 'object_is_an_invalid_new_state' + } + }); + } else { + try { + var task = snapshot.val(); + expect(task).to.have.all.keys(['_state', '_state_changed', '_progress', 'foo']); + expect(task['_progress']).to.equal(100); + expect(task['_state']).to.equal(th.validTaskSpecWithFinishedState.finishedState); + expect(task['_state_changed']).to.be.closeTo(new Date().getTime() + th.offset, 250); + expect(task.foo).to.equal('bar'); + done(); + } catch (errorB) { + done(errorB); + } + } + }); + }); + }); + it('should not resolve a task that no longer exists', function(done) { qw = new th.QueueWorkerWithoutProcessingOrTimeouts(tasksRef, '0', true, false, _.noop); qw.setTaskSpec(th.validTaskSpecWithFinishedState);