Skip to content

Commit

Permalink
Use new push and check API.
Browse files Browse the repository at this point in the history
  • Loading branch information
flatheadmill committed Nov 4, 2015
1 parent 3775780 commit 4d4f9e7
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 112 deletions.
2 changes: 1 addition & 1 deletion package.json
Expand Up @@ -14,7 +14,7 @@
{
"abend": "0.0.2",
"operation": "0.0.2",
"turnstile": "0.0.15"
"turnstile": "0.0.17"
},
"devDependencies":
{
Expand Down
99 changes: 52 additions & 47 deletions reactor.js
@@ -1,68 +1,73 @@
var cadence = require('cadence'),
Operation = require('operation'),
abend = require('abend'),
Operation = require('operation')

function noop () {}
push = [].push,
slice = [].slice

function Reactor (options) {
this.turnstile = options.turnstile
this._groupBy = options.groupBy || function () { return 1 }
this._buffers = {}
this.count = 0
this._operation = new Operation(options.operation)
this._values = { system: {}, user: {} }
}

Reactor.prototype.write = function (items, callback) {
var catcher = this._catcher, seen = {}, created = [], buffers = 0, callbacks = 0, fiasco

callback || (callback = noop)

var seen = {}

this.count += items.length

items.forEach(function (item) {
var key = (this._groupBy)(item), buffer
Reactor.prototype.push = function (key, values, callback) {
var vargs = slice.call(arguments), map = 'system', key = 'default'
if (typeof vargs[0] == 'string') {
map = 'user'
key = vargs.shift()

seen[key] = true

if (!(buffer = this._buffers[key])) {
this._buffers[key] = buffer = { values: [], callbacks: [] }
created.push(key)
}

buffer.values.push(item)
}, this)
}
var entry = this._entry(map, key, [])
push.apply(entry.values, vargs[0])
this.count += vargs[0].length
if (vargs[1]) entry.callbacks.push(vargs[1])
this.turnstile.nudge(abend)
}

Object.keys(seen).forEach(function (key) {
var buffer = this._buffers[key]
buffer.callbacks.push(complete)
buffers++
}, this)
Reactor.prototype.check = function () {
var vargs = slice.call(arguments), map = 'system', key = 'default'
if (typeof vargs[0] == 'string') {
map = 'user'
key = String(vargs.shift())
}
var entry = this._entry(map, key, true)
this.count++
if (vargs[0]) entry.callbacks.push(vargs[0])
this.turnstile.nudge(abend)
}

created.forEach(function (key) {
var buffer = this._buffers[key]
this.turnstile.enter(this, this._consume, [ key ], function (error) {
Reactor.prototype._entry = function (map, key, initial) {
var entry = this._values[map][key]
if (!entry) {
entry = this._values[map][key] = { callbacks: [], values: initial }
this.turnstile.enter({
object: this, method: '_consume'
}, [ map, key ], function (error) {
abend(error)
buffer.callbacks.forEach(function (callback) { callback() })
entry.callbacks.forEach(function (callback) { callback() })
})
}, this)

this.turnstile.nudge(abend)

function complete () {
if (++callbacks == buffers) {
callback()
}
}
return entry
}

Reactor.prototype._consume = cadence(function (async, state, key) {
var buffer = this._buffers[key]
delete this._buffers[key]
Reactor.prototype._consume = cadence(function (async, state, map, key) {
var entry = this._values[map][key], count, vargs
delete this._values[map][key]
if (typeof entry.values == 'boolean') {
count = 1
vargs = [ state ]
} else {
count = entry.values.length
vargs = [ state, entry.values ]
}
if (map != 'system') {
vargs.push(key)
}
async([function () {
this.count -= buffer.values.length
this.count -= count
}], function () {
this._operation.apply([ state, buffer.values ].concat(async()))
this._operation.apply(vargs.concat(async()))
})
})

Expand Down
112 changes: 48 additions & 64 deletions t/reactor/reactor.t.js
@@ -1,83 +1,67 @@
require('proof')(2, require('cadence')(prove))
require('proof')(8, require('cadence')(prove))

function prove (async, assert) {
var Reactor = require('../..')
var Turnstile = require('turnstile')
var abend = require('abend')
var slice = [].slice

new Reactor({
operation: function () {}
})

// What does the callback to the enqueuing mean? Can it report an error? If
// it where the case that the entire set of items was to be handled by a
// single invocation of the worker, then yes, the worker could report an
// error, but we are first grouping the work by by a chunk size, so right
// there the callback comes apart. Some portion of these items are processed
// in one call, the rest are processed in another. If the first call
// produces an error, which error gets reported?
//
// Error first-callbacks fall down in the face of this sort of parallelism.
//
// It may be the case that the caller should not proceed until the work is
// done, however, what exactly is that case? If we depend on this write of
// work to complete, then if there is an error, we're going to need to know,
// but that's weird. We're saying, do all this work, then I'll know to
// continue to do what it is I have to do. If any of that work cannot be
// done, I can't do what I have to do, so I won't do it.
//
// This becomes easier when there is some sort of multi-error, one what has
// one or more causes, and yes, I've been here before with Cadence, and it
// wasn't pretty and I walked back from it.
//
// The next question becomes, what sort of error could be recoverable at
// this point, so that error handling is meaningful? Do we want to report
// the errors that the work generates, or do we simply want to report that
// the worker failed in some way? We can simply report some sort of write
// error, and we can guarantee that it will always be an error of a single
// type, that is, that we're not going to pass on the error thrown by the
// worker, and in fact, at this point, we can guarantee that there will be an
// error array, since this is not cadence.
//
// In Cadence, gathering up errors is a problem because the default action
// is to throw the error, which is going to give you a stack trace that does
// the right thing. Once you're writing documentation about how you've
// created this new, better, smarter exception, and explaining it a lot.
//
// But here, that exception is a given. It didn't work in Cadence without
// being too clever, because if there is only one async operation then we
// don't want an array of exceptions, do we detect, pass both the array and
// the first exception, what gets reported when we throw. I'm happy with how
// that sorted out.

var expected = []
var reactor, consumed = false, waiting = function (values, callback) {
assert(values, expected.shift(), 'queued')
callback()
}
function operation (state, values, callback) {
consumed = true
waiting(values, callback)
}
var waiting = null
var reactor = new Reactor({
turnstile: new Turnstile({ workers: 1 }),
operation: function () {
waiting.apply(this, slice.call(arguments))
}
})

async(function () {
expected = [ [ 1, 3, 5 ], [ 6 ] ]
reactor = new Reactor({
turnstile: new Turnstile({ workers: 1 }),
groupBy: function (value) { return value % 2 },
operation: operation
})
reactor.write([ 1, 3, 5, 6 ], async())
var wait = async()
waiting = function (status, values, callback) {
assert(values, [ 1, 2, 3 ], 'queued')
waiting = function (status, values, callback) {
assert(values, [ 4, 5, 6, 7, 8, 9 ], 'queued grouped')
callback()
wait()
}
reactor.push([ 4, 5, 6 ])
reactor.push([ 7, 8, 9 ])
callback()
}
reactor.push([ 1, 2, 3 ], async())
}, function () {
reactor = new Reactor({
turnstile: new Turnstile({ workers: 1 }),
operation: operation
})
var wait = async()
waiting = function (values, callback) {
waiting = function (status, values, key, callback) {
assert(values, [ 1, 3, 5 ], 'grouped values')
assert(key, 'a', 'grouped key')
waiting = function (status, values, key, callback) {
assert(values, [ 2, 4, 6 ], 'grouped values')
assert(key, 'b', 'grouped key')
wait()
callback()
}
callback()
}
reactor.push('a', [ 1, 3, 5 ], async())
reactor.push('b', [ 2, 4, 6 ], async())
}, function () {
var wait = async()
waiting = function (status, key, callback) {
assert(key, 'a', 'callback')
wait()
callback()
}
reactor.check('a', async())
}, function () {
var wait = async()
waiting = function (status, callback) {
assert(true, 'called')
wait()
callback()
}
reactor.write([ 1, 3, 5 ])
reactor.check(async())
})
}

0 comments on commit 4d4f9e7

Please sign in to comment.