Skip to content

Commit

Permalink
Begin readable/writable split.
Browse files Browse the repository at this point in the history
  • Loading branch information
flatheadmill committed Jan 25, 2017
1 parent 245abb3 commit c0b5372
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 7 deletions.
102 changes: 102 additions & 0 deletions readable.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
var stream = require('stream')
var cadence = require('cadence')
var delta = require('delta')

function Staccato (stream, opening) {
this.destroyed = false
this._onceOpen = null
this.stream = stream
this._catcher = function (error) { this._error = error }.bind(this)
this._delta = null
this._readable = false
if (opening) {
this.stream.once('open', this._onceOpen = function () {
this._onceOpen = null
}.bind(this))
}
this.stream.once('error', this._catcher)
}

Staccato.prototype.destroy = function () {
this.destroyed = true
if (this._onceOpen != null) {
this.stream.removeListener('open', this._onceOpen)
}
if (this._delta != null) {
this._delta.cancel([])
}
this.stream.removeListener('error', this._catcher)
}

Staccato.prototype.ready = cadence(function (async) {
this._checkError()
if (this._onceOpen != null) {
this.stream.removeListener('open', this._onceOpen)
async(function () {
this.stream.removeListener('error', this._catcher)
this._delta = delta(async()).ee(this.stream).on('open')
}, function () {
this._delta = null
this.stream.once('error', this._catcher)
})
}
})

Staccato.prototype._checkError = function () {
if (this._error) {
var error = this._error
this._error = new Error('already errored')
throw error
}
}

Staccato.prototype.read = cadence(function (async) {
var waited = false
var loop = async(function () {
if (!this._readable) {
waited = true
this._delta = delta(async()).ee(this.stream).on('readable')
}
}, function () {
this._delta = null
if (this.destroyed) {
return [ loop.break, null ]
}
this._readable = true
var object = this.stream.read()
if (object == null) {
if (waited) {
return [ loop.break, null ]
} else {
this._readable = false
}
} else {
return [ loop.break, object ]
}
})()
})

Staccato.prototype.write = cadence(function (async, buffer) {
this._checkError()
if (!this.stream.write(buffer)) { // <- does this 'error' if `true`?
async(function () {
this.stream.removeListener('error', this._catcher)
delta(async()).ee(this.stream).on('drain')
}, function () {
this.stream.once('error', this._catcher)
})
}
})

Staccato.prototype.close = cadence(function (async) {
this._checkError() // <- would `error` be here?
async(function () {
this.stream.removeListener('error', this._catcher)
delta(async()).ee(this.stream).on('finish')
this.stream.end()
}, function () {
this._error = new Error('closed')
})
})

module.exports = Staccato
4 changes: 2 additions & 2 deletions t/read.t.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
require('proof/redux')(1, require('cadence')(prove))

function prove (async, assert) {
var Staccato = require('..')
var Staccato = { Readable: require('../readable') }
var stream = require('stream')
var through = new stream.PassThrough
var staccato = new Staccato(through)
var staccato = new Staccato.Readable(through)
var gathered = []
async(function () {
var loop = async(function () {
Expand Down
10 changes: 5 additions & 5 deletions t/write.t.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ proof(4, cadence(prove))

function prove (async, assert) {
var mkdirp = require('mkdirp'),
Staccato = require('..'),
Staccato = { Writable: require('..') },
staccato
var cleanup = cadence(function (async) {
var rimraf = require('rimraf')
Expand All @@ -30,7 +30,7 @@ function prove (async, assert) {
}, function () {
mkdirp(path.join(__dirname, 'tmp'), async())
}, function () {
staccato = new Staccato(createWritable(write), false)
staccato = new Staccato.Writable(createWritable(write), false)
assert(staccato, 'create')
staccato.ready(async())
}, function () {
Expand All @@ -39,7 +39,7 @@ function prove (async, assert) {
staccato.close(async())
}, function () {
var writable
staccato = new Staccato(writable = createWritable(write, 1), true)
staccato = new Staccato.Writable(writable = createWritable(write, 1), true)
staccato.ready(async())
writable.emit('open')
}, function () {
Expand All @@ -51,13 +51,13 @@ function prove (async, assert) {
staccato.close(async())
}, [function () {
var writable
staccato = new Staccato(writable = createWritable(write, 1), true)
staccato = new Staccato.Writable(writable = createWritable(write, 1), true)
writable.emit('error', new Error('foo'))
staccato.ready(async())
}, function (error) {
assert(error.message, 'foo', 'error caught')
}], function () {
staccato = new Staccato(createWritable(write, 1), true)
staccato = new Staccato.Writable(createWritable(write, 1), true)
staccato.destroy()
assert(staccato.destroyed, 'destroyed')
}, function () {
Expand Down
102 changes: 102 additions & 0 deletions writable.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
var stream = require('stream')
var cadence = require('cadence')
var delta = require('delta')

function Staccato (stream, opening) {
this.destroyed = false
this._onceOpen = null
this.stream = stream
this._catcher = function (error) { this._error = error }.bind(this)
this._delta = null
this._readable = false
if (opening) {
this.stream.once('open', this._onceOpen = function () {
this._onceOpen = null
}.bind(this))
}
this.stream.once('error', this._catcher)
}

Staccato.prototype.destroy = function () {
this.destroyed = true
if (this._onceOpen != null) {
this.stream.removeListener('open', this._onceOpen)
}
if (this._delta != null) {
this._delta.cancel([])
}
this.stream.removeListener('error', this._catcher)
}

Staccato.prototype.ready = cadence(function (async) {
this._checkError()
if (this._onceOpen != null) {
this.stream.removeListener('open', this._onceOpen)
async(function () {
this.stream.removeListener('error', this._catcher)
this._delta = delta(async()).ee(this.stream).on('open')
}, function () {
this._delta = null
this.stream.once('error', this._catcher)
})
}
})

Staccato.prototype._checkError = function () {
if (this._error) {
var error = this._error
this._error = new Error('already errored')
throw error
}
}

Staccato.prototype.read = cadence(function (async) {
var waited = false
var loop = async(function () {
if (!this._readable) {
waited = true
this._delta = delta(async()).ee(this.stream).on('readable')
}
}, function () {
this._delta = null
if (this.destroyed) {
return [ loop.break, null ]
}
this._readable = true
var object = this.stream.read()
if (object == null) {
if (waited) {
return [ loop.break, null ]
} else {
this._readable = false
}
} else {
return [ loop.break, object ]
}
})()
})

Staccato.prototype.write = cadence(function (async, buffer) {
this._checkError()
if (!this.stream.write(buffer)) { // <- does this 'error' if `true`?
async(function () {
this.stream.removeListener('error', this._catcher)
delta(async()).ee(this.stream).on('drain')
}, function () {
this.stream.once('error', this._catcher)
})
}
})

Staccato.prototype.close = cadence(function (async) {
this._checkError() // <- would `error` be here?
async(function () {
this.stream.removeListener('error', this._catcher)
delta(async()).ee(this.stream).on('finish')
this.stream.end()
}, function () {
this._error = new Error('closed')
})
})

module.exports = Staccato

0 comments on commit c0b5372

Please sign in to comment.