Skip to content
This repository was archived by the owner on Mar 17, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ Using Firebase Queue, you can create specs for each of these tasks, and then use
## The Queue in Your Firebase Database

The queue relies on having a Firebase database reference to coordinate workers e.g. `https://<your-firebase>.firebaseio.com/queue`. This queue can be stored at any path in your Firebase database, and you can have multiple queues as well. The queue will respond to tasks pushed onto the `tasks` subtree and optionally read specifications from a `specs` subtree.

```
queue
- specs
- tasks
```

See [Custom references to tasks and specs](#custom-references-to-tasks-and-specs) for defining the locations of these other than the default.

## Queue Workers

Expand Down Expand Up @@ -183,7 +185,7 @@ In this example, there are three categories of users, represented using fields o

These don't have to use a custom token, for instance you could use `auth != null` in place of `auth.canAddTasks` if application's users can write directly to the queue. Similarly, `auth.canProcessTasks` and `auth.canAddSpecs` could be `auth.admin === true` if a single trusted server process was used to perform all queue functions.

```js
```json
{
"rules": {
"queue": {
Expand Down Expand Up @@ -486,6 +488,22 @@ Since there is no `finished_state` in the `fanout_message` spec, the task will b

While this example is a little contrived since you could perform the sanitization and fanout in a single task, creating multiple specs for our tasks allows us to do things like add selective retries to certain tasks more likely to fail, put additional workers on more expensive tasks, or add expressive error states.

## Custom references to tasks and specs

It is possible to specify the locations the queue uses for tasks and the specs explicitly instead of using the defaults. To do this, simply pass an object to the Queue constructor in place of the Firebase reference; this object must contain the keys `tasksRef` and `specsRef`, and each value must be a Firebase reference.

```js
var Queue = require('firebase-queue'),
Firebase = require('firebase');

var jobsRef = new Firebase('https://<your-firebase>.firebaseio.com/jobs');
var specsRef = new Firebase('https://<your-firebase>.firebaseio.com/specs');

var queue = new Queue({ tasksRef: jobsRef, specsRef: specsRef }, function(data, progress, resolve, reject) {
// process task
});
```

## Wrap Up

As you can see, Firebase Queue is a powerful tool that allows you to securely and robustly perform background work on your Firebase data, from sanitization to data fanout and more. We'd love to hear about how you're using Firebase-Queue in your project! Let us know on [Twitter](https://twitter.com/firebase), [Facebook](https://www.facebook.com/Firebase), or [G+](https://plus.google.com/115330003035930967645). If you have any questions, please direct them to our [Google Group](https://groups.google.com/forum/#!forum/firebase-talk) or [support@firebase.com](mailto:support@firebase.com).
2 changes: 1 addition & 1 deletion gulpfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ gulp.task('test', function() {
gulp.src(paths.tests)
.pipe(mocha({
reporter: 'spec',
timeout: 2000
timeout: 5000
}))
.pipe(istanbul.writeReports())
.pipe(exit());
Expand Down
33 changes: 24 additions & 9 deletions src/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ var DEFAULT_NUM_WORKERS = 1,

/**
* @constructor
* @param {Firebase} ref A Firebase reference to the queue.
* @param {Firebase|Object} ref A Firebase reference to the queue or an object
* containing both keys:
* - tasksRef: {Firebase} A Firebase reference to the queue tasks location.
* - specsRef: {Firebase} A Firebase reference to the queue specs location.
* @param {Object} options (optional) Object containing possible keys:
* - specId: {String} the task specification ID for the workers.
* - numWorkers: {Number} The number of workers to create for this task.
* - sanitize: {Boolean} Whether to sanitize the 'data' passed to the
* - specId: {String} the task specification ID for the workers.
* - numWorkers: {Number} The number of workers to create for this task.
* - sanitize: {Boolean} Whether to sanitize the 'data' passed to the
* processing function of internal queue keys.
* @param {Function} processingFunction A function that is called each time to
* process a task. This function is passed four parameters:
Expand Down Expand Up @@ -67,10 +70,8 @@ function Queue() {
logger.debug('Queue(): Error during initialization', error);
throw new Error(error);
} else if (constructorArguments.length === 2) {
self.ref = constructorArguments[0];
self.processingFunction = constructorArguments[1];
} else if (constructorArguments.length === 3) {
self.ref = constructorArguments[0];
var options = constructorArguments[1];
if (!_.isPlainObject(options)) {
error = 'Options parameter must be a plain object.';
Expand Down Expand Up @@ -123,11 +124,25 @@ function Queue() {
throw new Error(error);
}

if (_.has(constructorArguments[0], 'tasksRef') &&
_.has(constructorArguments[0], 'specsRef')) {
self.tasksRef = constructorArguments[0].tasksRef;
self.specsRef = constructorArguments[0].specsRef;
} else if (_.isPlainObject(constructorArguments[0])) {
error = 'When ref is an object it must contain both keys \'tasksRef\' ' +
'and \'specsRef\'';
logger.debug('Queue(): Error during initialization', error);
throw new Error(error);
} else {
self.tasksRef = constructorArguments[0].child('tasks');
self.specsRef = constructorArguments[0].child('specs');
}

self.workers = [];
for (var i = 0; i < self.numWorkers; i++) {
var processId = (self.specId ? self.specId + ':' : '') + i;
self.workers.push(new QueueWorker(
self.ref.child('tasks'),
self.tasksRef,
processId,
self.sanitize,
self.suppressStack,
Expand All @@ -141,7 +156,7 @@ function Queue() {
}
self.initialized = true;
} else {
self.specChangeListener = self.ref.child('specs').child(self.specId).on(
self.specChangeListener = self.specsRef.child(self.specId).on(
'value',
function(taskSpecSnap) {
var taskSpec = {
Expand Down Expand Up @@ -177,7 +192,7 @@ Queue.prototype.shutdown = function() {

logger.debug('Queue: Shutting down');
if (!_.isNull(self.specChangeListener)) {
self.ref.child('specs').child(self.specId).off('value',
self.specsRef.child(self.specId).off('value',
self.specChangeListener);
self.specChangeListener = null;
}
Expand Down
12 changes: 12 additions & 0 deletions test/queue.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ describe('Queue', function() {
});
});

_.forEach([{}, { foo: 'bar' }, { tasksRef: th.testRef }, { specsRef: th.testRef }], function(invalidRefConfigurationObject) {
it('should not create a Queue with a ref configuration object that contains keys: {' + _.keys(invalidRefConfigurationObject).join(', ') + '}', function() {
expect(function() {
new th.Queue(invalidRefConfigurationObject, _.noop);
}).to.throw('When ref is an object it must contain both keys \'tasksRef\' and \'specsRef\'');
});
});

_.forEach(['', 'foo', NaN, Infinity, true, false, 0, 1, ['foo', 'bar'], { foo: 'bar' }, null, { foo: 'bar' }, { foo: { bar: { baz: true } } }], function(nonFunctionObject) {
it('should not create a Queue with a non-function callback: ' + JSON.stringify(nonFunctionObject), function() {
expect(function() {
Expand All @@ -40,6 +48,10 @@ describe('Queue', function() {
new th.Queue(th.testRef, _.noop);
});

it('should create a default Queue with tasks and specs Firebase references and a processing callback', function() {
new th.Queue({ tasksRef: th.testRef, specsRef: th.testRef }, _.noop);
});

_.forEach(['', 'foo', NaN, Infinity, true, false, 0, 1, ['foo', 'bar'], null, _.noop], function(nonPlainObject) {
it('should not create a Queue with a Firebase reference, a non-plain object options parameter (' + JSON.stringify(nonPlainObject) + '), and a processingCallback', function() {
expect(function() {
Expand Down