Skip to content

Commit

Permalink
Merge 5cbc894 into 985be2a
Browse files Browse the repository at this point in the history
  • Loading branch information
delvedor committed Aug 29, 2018
2 parents 985be2a + 5cbc894 commit c1c10bf
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 37 deletions.
14 changes: 12 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,19 @@ function job (child, greeting, num, done) {
console.log(greeting, num) // 'hello' 42
done()
})
```

function nestedJob (child, done) {
// perform some work
If needed you can also use the `child` method to create custom child queues. The child queues will be executed once the current queue has finished its execution.
```js
const q = require('workq')()

const childq = q.child()

q.add(job, 'hello', 42)
childq.add(job, 'hello', 42)

function job (child, greeting, num, done) {
console.log(greeting, num) // 'hello' 42
done()
})
```
Expand Down
112 changes: 79 additions & 33 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
const assert = require('assert')
const debug = require('debug')('workq')

const kExhausted = Symbol('exhausted')
const kParent = Symbol('parent')
const kPause = Symbol('pause')
const kDrain = Symbol('drain')
const kChildren = Symbol('children')

var instance = null
var id = 0

Expand All @@ -22,56 +28,88 @@ function Queue (opts) {
// public api
this.q = []
this.running = false
this.id = id++
// private api
this._exhausted = false
this._parent = null
this._pause = false
this._id = id++
this._drain = drain.bind(this)
this[kExhausted] = false
this[kParent] = null
this[kPause] = false
this[kDrain] = drain.bind(this)
this[kChildren] = []

if (opts.singleton) {
return instance
}
}

/**
* Adds a new job to the queue.
* @params {function} the job
* @params {...params} optional custom params
* @returns {Queue}
*/
Queue.prototype.add = function add (job, ...params) {
assert(typeof job === 'function', 'The job to perform should be a function')
assert(this._exhausted === false, 'You cannot add more jobs after calling done')
debug(`Queue ${this._id}, adding new job`)
assert(this[kExhausted] === false, 'You cannot add more jobs after calling done')
debug(`Queue ${this.id}, adding new job`)
this.q.push({ job, params })
if (!this.running) {
this.run()
}
return this
}

/**
* Runs the current queue, it is called automatically by `add`
* @returns {Queue}
*/
Queue.prototype.run = function run () {
if (this._pause) {
debug(`Queue ${this._id}, job paused`)
return
if (this[kPause]) {
debug(`Queue ${this.id}, job paused`)
return this
}

this.running = true
setImmediate(() => runner.call(this))
return this
}

/**
* Adds a drain function
* @params {function}
* @returns {Queue}
*/
Queue.prototype.drain = function drain (fn) {
assert(typeof fn === 'function', 'Drain should be a function')
debug(`Queue ${this._id}, add drain function`)
this._drain = fn
debug(`Queue ${this.id}, add drain function`)
this[kDrain] = fn
return this
}

/**
* Adds a new child
* @returns {Queue} the new child
*/
Queue.prototype.child = function () {
debug(`Queue ${this.id}, creating child`)
const child = new Queue()
child[kParent] = this
child[kPause] = true
this[kChildren].push(child)
return child
}

function runner () {
/* istanbul ignore next */
if (this._pause) {
debug(`Queue ${this._id}, job paused`)
if (this[kPause]) {
debug(`Queue ${this.id}, job paused`)
return
}

debug(`Queue ${this._id}, running job`)
debug(`Queue ${this.id}, running job`)
const worker = this.q.shift()
if (worker === undefined) {
this.running = false
const asyncOp = this._drain(parent.bind(this))
const asyncOp = this[kDrain](parent.bind(this))
/* istanbul ignore next */
if (asyncOp && typeof asyncOp.then === 'function') {
asyncOp.then(parent.bind(this), parent.bind(this))
Expand All @@ -80,37 +118,37 @@ function runner () {
}

const child = new Queue()
child._parent = this
child._pause = true
child[kParent] = this
child[kPause] = true

const { job, params } = worker
const asyncOp = params.length === 0
? job(child, done.bind(this))
: job(child, ...params, done.bind(this))
if (asyncOp && typeof asyncOp.then === 'function') {
this._pause = true
this[kPause] = true
asyncOp.then(done.bind(this), done.bind(this))
}

function done () {
child._exhausted = true
this._pause = false
debug(`Queue ${this._id}, job ended`)
child[kExhausted] = true
this[kPause] = false
debug(`Queue ${this.id}, job ended`)
// if the child has jobs in the queue
if (child.q.length) {
debug(`Queue ${this._id}, starting child queue (${child.q.length} jobs yet to be executed)`)
this._pause = true
child._pause = false
debug(`Queue ${this.id}, starting child queue (${child.q.length} jobs yet to be executed)`)
this[kPause] = true
child[kPause] = false
child.run()
// if the current queue has jobs yet to be executed
} else if (this.q.length) {
debug(`Queue ${this._id}, running next job (${this.q.length} jobs yet to be executed)`)
debug(`Queue ${this.id}, running next job (${this.q.length} jobs yet to be executed)`)
this.run()
// the current queue has finished its execution
} else {
debug(`Queue ${this._id}, finished queue`)
debug(`Queue ${this.id}, finished queue`)
this.running = false
const asyncOp = this._drain(parent.bind(this))
const asyncOp = this[kDrain](parent.bind(this))
if (asyncOp && typeof asyncOp.then === 'function') {
asyncOp.then(parent.bind(this), parent.bind(this))
}
Expand All @@ -119,14 +157,22 @@ function runner () {
}

function parent () {
if (!this._parent) return
debug(`Queue ${this._id}, running parent ${this._parent._id}`)
this._parent._pause = false
this._parent.run()
const child = this[kChildren].shift()
// if there are not custom childs, call the parent
if (child === undefined) {
if (!this[kParent]) return
debug(`Queue ${this.id}, running parent ${this[kParent].id}`)
this[kParent][kPause] = false
this[kParent].run()
} else {
debug(`Queue ${this.id}, running child ${child.id}`)
child[kPause] = false
child.run()
}
}

function drain (done) {
debug(`Queue ${this._id}, drain`)
debug(`Queue ${this.id}, drain`)
done()
}

Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
"description": "A super tiny work queue",
"main": "index.js",
"scripts": {
"test": "standard && tap test/test.js",
"coveralls": "tap test/test.js --cov"
"test": "standard && tap test/*.test.js",
"coveralls": "tap test/*.test.js --cov"
},
"repository": {
"type": "git",
Expand Down
168 changes: 168 additions & 0 deletions test/child.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
'use strict'

const t = require('tap')
const test = t.test
const Queue = require('../index')

test('Should create a child q / 1', t => {
t.plan(4)

const order = [1, 2, 3, 4]
const q = Queue()

const q1 = q.child()
const q2 = q.child()

q.add((q, done) => {
t.strictEqual(order.shift(), 1)
done()
})

q1.add((q, done) => {
t.strictEqual(order.shift(), 3)
done()
})

q2.add((q, done) => {
t.strictEqual(order.shift(), 4)
done()
})

q.add((q, done) => {
t.strictEqual(order.shift(), 2)
done()
})
})

test('Should create a child q / 2', t => {
t.plan(4)

const order = [1, 2, 3, 4]
const q = Queue()

q.add((q, done) => {
t.strictEqual(order.shift(), 1)
done()
})

q.add((q, done) => {
t.strictEqual(order.shift(), 2)
done()
})

const q1 = q.child()
const q2 = q.child()

q1.add((q, done) => {
t.strictEqual(order.shift(), 3)
done()
})

q2.add((q, done) => {
t.strictEqual(order.shift(), 4)
done()
})
})

test('Should create a child q / 3', t => {
t.plan(4)

const order = [1, 2, 3, 4]
const q = Queue()

q.add((q, done) => {
t.strictEqual(order.shift(), 1)
done()
})

q.add((q, done) => {
t.strictEqual(order.shift(), 2)
done()
})

setImmediate(() => {
const q1 = q.child()
const q2 = q.child()

q1.add((q, done) => {
t.strictEqual(order.shift(), 3)
done()
})

q2.add((q, done) => {
t.strictEqual(order.shift(), 4)
done()
})
})
})

test('Should create a child q / 4', t => {
t.plan(5)

const order = [1, 2, 3, 4, 5]
const q = Queue()

q.add((q, done) => {
const q1 = q.child()
const q2 = q.child()

t.strictEqual(order.shift(), 1)

q1.add((q, done) => {
t.strictEqual(order.shift(), 3)
done()
})

q2.add((q, done) => {
t.strictEqual(order.shift(), 4)
done()
})

q.add((q, done) => {
t.strictEqual(order.shift(), 2)
done()
})

done()
})

q.add((q, done) => {
t.strictEqual(order.shift(), 5)
done()
})
})

test('Should create a child q / 5', t => {
t.plan(5)

const order = [1, 2, 3, 4, 5]
const q = Queue()

const q1 = q.child()
const q2 = q.child()

q.add((q, done) => {
t.strictEqual(order.shift(), 1)
done()
})

q1.add((q, done) => {
t.strictEqual(order.shift(), 3)

q.add((q, done) => {
t.strictEqual(order.shift(), 4)
done()
})
done()
})

q2.add((q, done) => {
t.strictEqual(order.shift(), 5)
done()
})

q.add((q, done) => {
t.strictEqual(order.shift(), 2)
done()
})
})
Loading

0 comments on commit c1c10bf

Please sign in to comment.