Skip to content

Commit

Permalink
cleanup / improve WritableSource
Browse files Browse the repository at this point in the history
This error handling seems more correct to me.
  • Loading branch information
Fishrock123 committed Jul 23, 2019
1 parent 3a98b02 commit 7e2ec35
Showing 1 changed file with 38 additions and 35 deletions.
73 changes: 38 additions & 35 deletions helpers/writable-source.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ const debuglog = util.debuglog('bob')
const { Writable } = require('readable-stream')
const status_type = require('../reference-status-enum') // eslint-disable-line camelcase

const kWriteCallback = Symbol('write callback')
const kDestroyCallback = Symbol('destroy callback')
const kFinalCallback = Symbol('final callback')
const kErrored = Symbol('errored')
const kEnded = Symbol('ended')
const kPulling = Symbol('pulling')
const kWriteCallback = Symbol('WritableSource write callback')
const kDestroyCallback = Symbol('WritableSource destroy callback')
const kFinalCallback = Symbol('WritableSource final callback')
const kErrored = Symbol('WritableSource errored')
const kEnding = Symbol('WritableSource ending')
const kEnded = Symbol('WritableSource ended')
const kPulling = Symbol('WritableSource pulling')

class WritableSource extends Writable {
// Streams3 <-> BOB interface
Expand All @@ -34,9 +35,6 @@ class WritableSource extends Writable {
this[kErrored] = false
this[kEnded] = false
this[kPulling] = false

// Begin paused so we wait for pull.
// this.cork()
}

_write (chunk, encoding, callback) {
Expand All @@ -48,10 +46,10 @@ class WritableSource extends Writable {

if (this[kEnded]) {
if (this[kFinalCallback] === null) {
throw new Error('write happened but stream was ended without a final callback')
// _write should not have happened.
const error = new Error('write happened but stream was ended without a final callback')
process.nextTick(_ => this.emit('error', error))
}
this.sink.next(status_type.end, null, Buffer.alloc(0), 0)
this[kFinalCallback]()
return
}

Expand All @@ -63,9 +61,6 @@ class WritableSource extends Writable {
// Store callback so we can call it when pull is called.
this[kWriteCallback] = callback

// Pause and wait for pull.
// this.cork()

this[kPulling] = false

// Send data to our sink.
Expand All @@ -80,19 +75,16 @@ class WritableSource extends Writable {
}

this[kDestroyCallback] = cb
this[kErrored] = true
this[kEnding] = true

if (err) {
this[kErrored] = true
// If there is an error and we have a source, we want to propogate
// the error upwards so all sources can close.
// Store the callback for when the error returns to this component.
this.source.pull(err)
} else {
// No error, but need to propogate a forced close anyways.
// XXX(Fishrock): Does BOB need to support propogating a close upwards due to Streams3?
// XXX(Fishrock): Use extension-stop?
this.source.pull(new Error('BobDuplex: user called stream.destroy()'))
this.sink.next(status_type.error, err, Buffer.alloc(0), 0)
}
// Otherwise wait for _final (?)
}

_final (cb) {
Expand All @@ -103,6 +95,7 @@ class WritableSource extends Writable {
if (this[kPulling]) {
// Called when a chain of Steams3 streams closes, so send an 'end'.
this.sink.next(status_type.end, null, Buffer.alloc(0), 0)
cb()
} else {
// Hope that a pull is made...
this[kFinalCallback] = cb
Expand All @@ -116,36 +109,46 @@ class WritableSource extends Writable {
pull (error, buffer) {
debuglog(`PULL ${this.name}`, arguments, this[kWriteCallback])

if (this[kPulling]) {
throw new Error('WritableSource: pull already in progress!')
}

this[kPulling] = true

if (this[kFinalCallback]) {
this.sink.next(status_type.end, null, Buffer.alloc(0), 0)
if (typeof this[kFinalCallback] === 'function') {
if (error) {
this.sink.next(status_type.error, error, Buffer.alloc(0), 0)
this[kFinalCallback](error)
} else {
this.sink.next(status_type.end, null, Buffer.alloc(0), 0)
this[kFinalCallback]()
}
return
}

if (this[kErrored]) {
// Something is probably very wrong if this happens...
error = error || new Error('WritableSource: already in error state')
// XXX(Jeremiah): Maybe just send this to the sink directly?
this.destroy(error)
return
}

// Clear the stored write cb to prepare for another.
const writeCb = this[kWriteCallback]
this[kWriteCallback] = null

// We are ready for data, unpause.
// this.uncork()

// If we don't yet have a write callback from an attempted Streams3 write, just bail.
if (!writeCb) {
// Except if there is already an error in which case emit it Streams3 style.
// Except if there is already an error in which case attempt to teardown.
if (error) {
this[kErrored] = true
this.emit('error', error)
this.destroy(error)
}
return
}

// Let Streams3 know we are done writing.
if (error) {
writeCb(error)
} else {
writeCb()
}
writeCb(error)
}
}

Expand Down

0 comments on commit 7e2ec35

Please sign in to comment.