Skip to content

Commit

Permalink
Implement an error-first callback read.
Browse files Browse the repository at this point in the history
  • Loading branch information
flatheadmill committed Jun 28, 2016
1 parent 4c83965 commit c7e6896
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 1 deletion.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@
},
"scripts":
{
"test": "proof platform win32 && proof test */*/*.t.js || t/test"
"test": "proof platform win32 && proof test */*.t.js || t/test"
}
}
24 changes: 24 additions & 0 deletions staccato.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ var stream = require('stream'),

function Staccato (stream, opening) {
this._opened = !opening
// TODO Expose stream.
this._stream = stream
this._catcher = function (error) { this._error = error }.bind(this)
this._readable = false
if (!this._opened) {
this._stream.once('open', function () {
this._opened = true
Expand Down Expand Up @@ -34,6 +36,28 @@ Staccato.prototype._checkError = function () {
}
}

Staccato.prototype.read = cadence(function (async) {
var waited = false
var loop = async(function () {
if (!this._readable) {
waited = true
new Delta(async()).ee(this._stream).on('readable')
}
}, function () {
this._readable = true
var object = this._stream.read()
if (object == null) {
if (waited) {
return [ loop.break, null ]
} else {
this._readable = false
}
} else {
return [ loop.break, object ]
}
})()
})

Staccato.prototype.write = cadence(function (async, buffer) {
this._checkError()
if (!this._stream.write(buffer)) { // <- does this 'error' if `true`?
Expand Down
28 changes: 28 additions & 0 deletions t/read.t.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
require('proof')(1, require('cadence')(prove))

function prove (async, assert) {
var Staccato = require('..')
var stream = require('stream')
var through = new stream.PassThrough
var staccato = new Staccato(through)
var gathered = []
async(function () {
var loop = async(function () {
staccato.read(async())
}, function (buffer) {
if (buffer == null) {
return [ loop.break ]
}
gathered.push(buffer)
})()
}, function () {
assert(Buffer.concat(gathered).toString(), 'a', 'gathered')
})
// A sub-cadence because we have to wait for the loop above to start.
async(function () {
setImmediate(async())
}, function () {
through.write(new Buffer('a'))
through.end()
})
}

0 comments on commit c7e6896

Please sign in to comment.