diff --git a/packages/it-ndjson-stream/src/index.ts b/packages/it-ndjson-stream/src/index.ts index cc26479e..0f2875ab 100644 --- a/packages/it-ndjson-stream/src/index.ts +++ b/packages/it-ndjson-stream/src/index.ts @@ -72,11 +72,7 @@ export function ndjsonStream = Du read: async (options?: AbortOptions) => { const result = await raceSignal(input.next(), options?.signal) - if (result.done === true) { - throw new UnexpectedEOFError('unexpected end of input') - } - - if (result.value == null) { + if (result.done === true || result.value == null) { throw new UnexpectedEOFError('unexpected end of input') } diff --git a/packages/it-ndjson-stream/test/index.spec.ts b/packages/it-ndjson-stream/test/index.spec.ts index c50d1c1b..1e12ee21 100644 --- a/packages/it-ndjson-stream/test/index.spec.ts +++ b/packages/it-ndjson-stream/test/index.spec.ts @@ -59,4 +59,48 @@ describe('it-ndjson-stream', () => { await expect(messages.read()).to.eventually.be.rejected .with.property('name', 'InvalidMessageLengthError') }) + + it('throws EOF on empty stream', async () => { + messages = ndjsonStream({ + source: [], + sink: async () => {} + }) + + // read using stream with message length limit + await expect(messages.read()).to.eventually.be.rejected + .with.property('name', 'UnexpectedEOFError') + }) + + it('throws EOF on short read when generator returns', async () => { + const source = (async function * () {})() + await source.return() + + messages = ndjsonStream({ + source, + sink: async () => {} + }) + + // read using stream with message length limit + await expect(messages.read()).to.eventually.be.rejected + .with.property('name', 'UnexpectedEOFError') + }) + + it('ends output when sink throws', async () => { + messages = ndjsonStream({ + source: [], + sink: async () => { + throw new Error('Oh noes!') + } + }) + + // wait for stream to end + await new Promise((resolve) => { + setTimeout(() => { + resolve() + }, 100) + }) + + await expect(messages.write(obj)).to.eventually.be.rejected + .with.property('message', 'Oh noes!') + }) }) diff --git a/packages/it-queueless-pushable/src/index.ts b/packages/it-queueless-pushable/src/index.ts index 42950f1e..363076a7 100644 --- a/packages/it-queueless-pushable/src/index.ts +++ b/packages/it-queueless-pushable/src/index.ts @@ -52,6 +52,7 @@ class QueuelessPushable implements Pushable { private haveNext: DeferredPromise private ended: boolean private nextResult: IteratorResult | undefined + private error?: Error constructor () { this.ended = false @@ -86,6 +87,7 @@ class QueuelessPushable implements Pushable { async throw (err?: Error): Promise> { this.ended = true + this.error = err if (err != null) { // this can cause unhandled promise rejections if nothing is awaiting the @@ -132,7 +134,7 @@ class QueuelessPushable implements Pushable { private async _push (value?: T, options?: AbortOptions & RaceSignalOptions): Promise { if (value != null && this.ended) { - throw new Error('Cannot push value onto an ended pushable') + throw this.error ?? new Error('Cannot push value onto an ended pushable') } // wait for all values to be read diff --git a/packages/it-queueless-pushable/test/index.spec.ts b/packages/it-queueless-pushable/test/index.spec.ts index 7489af1a..4ff7005f 100644 --- a/packages/it-queueless-pushable/test/index.spec.ts +++ b/packages/it-queueless-pushable/test/index.spec.ts @@ -121,6 +121,14 @@ describe('it-queueless-pushable', () => { await expect(pushable.push('nope!')).to.eventually.be.rejectedWith('Cannot push value onto an ended pushable') }) + it('should re-throw error when attempting to write to errored pushable', async () => { + const err = new Error('Urk!') + const pushable = queuelessPushable() + void pushable.throw(err) + + await expect(pushable.push('derp')).to.eventually.be.rejectedWith(err) + }) + it('should push in order even it promises are unawaited', async () => { const pushable = queuelessPushable()