Expressive middleware for queue workers
A task is exposed to the middleware chain as this
. A task must implement
the following interface.
- value
- resolve
function (result) { /* ... */ }
- reject
function (error) { /* ... */ }
Optional methods can be attached to a task e.g.
- progress (firebase-queue)
- touch (nsq.js)
import compose from 'qoo'
import Firebase from 'firebase'
import Queue from 'firebase-queue'
const xf = (value, progress, resolve, reject) => {
return { value, progress, resolve, reject }
}
function * logger (next) {
const start = new Date()
yield next
const ms = new Date() - start
console.log('%j - %s', this.value, ms)
}
function * processor (next) {
this.result = { foo: 'bar' }
yield next
}
const ref = new Firebase('https://<your-firebase>.firebaseio.com/queue')
const queue = new Queue(ref, compose(xf, logger, processor))
import nsq from 'nsq.js'
import compose from 'qoo'
const xf = (msg) => {
return {
value: msg.json(),
resolve: msg.finish.bind(msg),
reject: msg.requeue.bind(msg)
}
}
function * logger (next) {
const start = new Date()
yield next
const ms = new Date() - start
console.log('%j - %s', this.value, ms)
}
function * processor (next) {
this.result = { foo: 'bar' }
yield next
}
const reader = nsq.reader()
reader.on('message', compose(xf, logger, processor))