diff --git a/lib/stream-to-async-iterator.js b/lib/stream-to-async-iterator.js index e450910..24d58b4 100644 --- a/lib/stream-to-async-iterator.js +++ b/lib/stream-to-async-iterator.js @@ -11,6 +11,15 @@ export const states = { errored: Symbol('errored'), }; +/* + * A contract for a promise that requires a clean up + * function be called after the promise finishes. + */ +type PromiseWithCleanUp = { + promise: Promise, + cleanup: () => void, +} + /** * @typedef {Object} StreamAsyncToIterator~Options * @property {number} [size] - the size of each read from the stream for each iteration @@ -106,9 +115,22 @@ export default class StreamAsyncToIterator { */ async next(): Promise { if (this._state === states.notReadable) { + const read = this._untilReadable(); + const end = this._untilEnd(); + //need to wait until the stream is readable or ended - await Promise.race([this._untilReadable(), this._untilEnd()]); - return this.next(); + try { + await Promise.race([read.promise, end.promise]); + return this.next(); + } + catch (e) { + throw e + } + finally { + //need to clean up any hanging event listeners + read.cleanup() + end.cleanup() + } } else if (this._state === states.ended) { return {done: true, value: null}; } else if (this._state === states.errored) { @@ -133,17 +155,34 @@ export default class StreamAsyncToIterator { * @private * @returns {Promise} */ - _untilReadable(): Promise { - return new Promise((resolve, reject) => { - const handleReadable = () => { + _untilReadable(): PromiseWithCleanUp { + //let is used here instead of const because the exact reference is + //required to remove it, this is why it is not a curried function that + //accepts resolve & reject as parameters. + let eventListener = null; + + const promise = new Promise((resolve, reject) => { + eventListener = () => { this._state = states.readable; this._rejections.delete(reject); + + // we set this to null to info the clean up not to do anything + eventListener = null; resolve(); }; - this._stream.once('readable', handleReadable); + //on is used here instead of once, because + //the listener is remove afterwards anyways. + this._stream.once('readable', eventListener); this._rejections.add(reject); }); + + const cleanup = () => { + if (eventListener == null) return; + this._stream.removeListener('readable', eventListener); + }; + + return { cleanup, promise } } /** @@ -151,16 +190,28 @@ export default class StreamAsyncToIterator { * @private * @returns {Promise} */ - _untilEnd(): Promise { - return new Promise((resolve, reject) => { - const handleEnd = () => { + _untilEnd(): PromiseWithCleanUp { + let eventListener = null; + + const promise = new Promise((resolve, reject) => { + eventListener = () => { this._state = states.ended; this._rejections.delete(reject); + + eventListener = null resolve(); }; - this._stream.once('end', handleEnd); + + this._stream.once('end', eventListener); this._rejections.add(reject); - }) + }); + + const cleanup = () => { + if (eventListener == null) return; + this._stream.removeListener('end', eventListener); + }; + + return { cleanup, promise } } }