Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 62 additions & 11 deletions lib/stream-to-async-iterator.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@ export const states = {
errored: Symbol('errored'),
};

/*
* A contract for a promise that requires a clean up
* function be called after the promise finishes.
*/
type PromiseWithCleanUp<T> = {
promise: Promise<T>,
cleanup: () => void,
}

/**
* @typedef {Object} StreamAsyncToIterator~Options
* @property {number} [size] - the size of each read from the stream for each iteration
Expand Down Expand Up @@ -106,9 +115,22 @@ export default class StreamAsyncToIterator {
*/
async next(): Promise<Iteration> {
if (this._state === states.notReadable) {
const read = this._untilReadable();
const end = this._untilEnd();

//need to wait until the stream is readable or ended
await Promise.race([this._untilReadable(), this._untilEnd()]);
return this.next();
try {
await Promise.race([read.promise, end.promise]);
return this.next();
}
catch (e) {
throw e
}
finally {
//need to clean up any hanging event listeners
read.cleanup()
end.cleanup()
}
} else if (this._state === states.ended) {
return {done: true, value: null};
} else if (this._state === states.errored) {
Expand All @@ -133,34 +155,63 @@ export default class StreamAsyncToIterator {
* @private
* @returns {Promise}
*/
_untilReadable(): Promise<void> {
return new Promise((resolve, reject) => {
const handleReadable = () => {
_untilReadable(): PromiseWithCleanUp<void> {
//let is used here instead of const because the exact reference is
//required to remove it, this is why it is not a curried function that
//accepts resolve & reject as parameters.
let eventListener = null;

const promise = new Promise((resolve, reject) => {
eventListener = () => {
this._state = states.readable;
this._rejections.delete(reject);

// we set this to null to info the clean up not to do anything
eventListener = null;
resolve();
};

this._stream.once('readable', handleReadable);
//on is used here instead of once, because
//the listener is remove afterwards anyways.
this._stream.once('readable', eventListener);
this._rejections.add(reject);
});

const cleanup = () => {
if (eventListener == null) return;
this._stream.removeListener('readable', eventListener);
};

return { cleanup, promise }
}

/**
* Waits until the stream is ended. Rejects if the stream errored out.
* @private
* @returns {Promise}
*/
_untilEnd(): Promise<void> {
return new Promise((resolve, reject) => {
const handleEnd = () => {
_untilEnd(): PromiseWithCleanUp<void> {
let eventListener = null;

const promise = new Promise((resolve, reject) => {
eventListener = () => {
this._state = states.ended;
this._rejections.delete(reject);

eventListener = null
resolve();
};
this._stream.once('end', handleEnd);

this._stream.once('end', eventListener);
this._rejections.add(reject);
})
});

const cleanup = () => {
if (eventListener == null) return;
this._stream.removeListener('end', eventListener);
};

return { cleanup, promise }
}
}

Expand Down