Skip to content

Commit

Permalink
Handle error events emitted before method
Browse files Browse the repository at this point in the history
  • Loading branch information
dex4er committed Feb 4, 2018
1 parent c001907 commit c927d0f
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 66 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

* New `setEncoding` and `destroy` methods.
* Support for `import PromiseDuplex from 'promise-duplex'` syntax.
* Bugfix when PromiseDuplex could ignore error event.
* Upgraded promise-readable@3.1.1 and promise-writable@3.1.0

## v2.0.4 2018-01-18

Expand Down
115 changes: 49 additions & 66 deletions lib/promise-duplex.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const PromiseReadable = require('promise-readable').PromiseReadable
const PromiseWritable = require('promise-writable').PromiseWritable
const PromiseReadable = require('promise-readable')
const PromiseWritable = require('promise-writable')

class PromiseDuplex extends PromiseReadable /* and PromiseWritable */ {
constructor (stream) {
Expand Down Expand Up @@ -43,102 +43,85 @@ class PromiseDuplex extends PromiseReadable /* and PromiseWritable */ {
const stream = this.stream

return new Promise((resolve, reject) => {
if (this._errored) {
return reject(this._errored)
} else if (this.readable._errored) {
return reject(this.readable._errored)
} else if (this.writable._errored) {
return reject(this.writable._errored)
} else if (stream.closed) {
if (this.readable._errored) {
const err = this.readable._errored
delete this.readable._errored
return reject(err)
}

if (this.writable._errored) {
const err = this.writable._errored
delete this.writable._errored
return reject(err)
}

if (stream.closed) {
if (event === 'close') {
return resolve()
} else {
return reject(new Error(`once ${event} after close`))
}
} else if (stream.destroyed) {
}

if (stream.destroyed) {
if (event === 'close' || event === 'end' || event === 'finish') {
return resolve()
} else {
return reject(new Error(`once ${event} after destroy`))
}
}

const onceEvent = event !== 'end' && event !== 'finish' && event !== 'error' ? argument => {
stream.removeListener('close', onceClose)
if (onceEnd) {
stream.removeListener('end', onceEnd)
}
stream.removeListener('error', onceError)
if (onceFinish) {
stream.removeListener('finish', onceFinish)
}
const eventHandler = event !== 'end' && event !== 'finish' && event !== 'error' ? argument => {
removeListeners()
resolve(argument)
} : undefined

const onceClose = () => {
if (onceEvent) {
stream.removeListener(event, onceEvent)
}
if (onceEnd) {
stream.removeListener('end', onceEnd)
}
stream.removeListener('error', onceError)
if (onceFinish) {
stream.removeListener('finish', onceFinish)
}
const closeHandler = () => {
removeListeners()
resolve()
}

const onceEnd = event !== 'close' ? () => {
if (onceEvent) {
stream.removeListener(event, onceEvent)
}
stream.removeListener('close', onceClose)
stream.removeListener('error', onceError)
if (onceFinish) {
stream.removeListener('finish', onceFinish)
}
const endHandler = event !== 'close' ? () => {
removeListeners()
resolve()
} : undefined

const onceFinish = event !== 'close' ? () => {
if (onceEvent) {
stream.removeListener(event, onceEvent)
}
stream.removeListener('close', onceClose)
if (onceEnd) {
stream.removeListener('end', onceEnd)
}
stream.removeListener('error', onceError)
const errorHandler = (err) => {
delete this._errored
removeListeners()
reject(err)
}

const finishHandler = event !== 'close' ? () => {
removeListeners()
resolve()
} : undefined

const onceError = (err) => {
if (onceEvent) {
stream.removeListener(event, onceEvent)
const removeListeners = () => {
if (eventHandler) {
stream.removeListener(event, eventHandler)
}
stream.removeListener('close', onceClose)
if (onceEnd) {
stream.removeListener('end', onceEnd)
stream.removeListener('close', closeHandler)
if (endHandler) {
stream.removeListener('end', endHandler)
}
if (onceFinish) {
stream.removeListener('finish', onceFinish)
stream.removeListener('error', errorHandler)
if (finishHandler) {
stream.removeListener('finish', finishHandler)
}
this._errored = err
reject(err)
}

if (onceEvent) {
stream.once(event, onceEvent)
if (eventHandler) {
stream.on(event, eventHandler)
}
stream.once('close', onceClose)
if (onceEnd) {
stream.once('end', onceEnd)
stream.on('close', closeHandler)
if (endHandler) {
stream.on('end', endHandler)
}
if (onceFinish) {
stream.once('finish', onceFinish)
if (finishHandler) {
stream.on('finish', finishHandler)
}
stream.once('error', onceError)
stream.on('error', errorHandler)
})
}

Expand Down
26 changes: 26 additions & 0 deletions test/promise-duplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -378,5 +378,31 @@ Feature('Test promise-duplex module', () => {
return promise.should.be.rejectedWith(Error, 'boom')
})
})

Scenario(`Wait for ${event} from stream with error emitted before method`, () => {
let promise
let promiseDuplex
let stream

Given('Duplex object', () => {
stream = new MockStream()
})

And('PromiseDuplex object', () => {
promiseDuplex = new PromiseDuplex(stream)
})

And('error event is emitted', () => {
stream.emit('error', new Error('boom'))
})

When(`I call ${event} method`, () => {
promise = promiseDuplex.once(event)
})

Then('promise is rejected', () => {
return promise.should.be.rejectedWith(Error, 'boom')
})
})
}
})

0 comments on commit c927d0f

Please sign in to comment.