Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions packages/it-ndjson-stream/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,7 @@ export function ndjsonStream <T = any, Stream extends Duplex<any, any, any> = Du
read: async (options?: AbortOptions) => {
const result = await raceSignal(input.next(), options?.signal)

if (result.done === true) {
throw new UnexpectedEOFError('unexpected end of input')
}

if (result.value == null) {
if (result.done === true || result.value == null) {
throw new UnexpectedEOFError('unexpected end of input')
}

Expand Down
44 changes: 44 additions & 0 deletions packages/it-ndjson-stream/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,48 @@ describe('it-ndjson-stream', () => {
await expect(messages.read()).to.eventually.be.rejected
.with.property('name', 'InvalidMessageLengthError')
})

it('throws EOF on empty stream', async () => {
messages = ndjsonStream({
source: [],
sink: async () => {}
})

// read using stream with message length limit
await expect(messages.read()).to.eventually.be.rejected
.with.property('name', 'UnexpectedEOFError')
})

it('throws EOF on short read when generator returns', async () => {
const source = (async function * () {})()
await source.return()

messages = ndjsonStream({
source,
sink: async () => {}
})

// read using stream with message length limit
await expect(messages.read()).to.eventually.be.rejected
.with.property('name', 'UnexpectedEOFError')
})

it('ends output when sink throws', async () => {
messages = ndjsonStream({
source: [],
sink: async () => {
throw new Error('Oh noes!')
}
})

// wait for stream to end
await new Promise<void>((resolve) => {
setTimeout(() => {
resolve()
}, 100)
})

await expect(messages.write(obj)).to.eventually.be.rejected
.with.property('message', 'Oh noes!')
})
})
4 changes: 3 additions & 1 deletion packages/it-queueless-pushable/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class QueuelessPushable <T> implements Pushable<T> {
private haveNext: DeferredPromise<void>
private ended: boolean
private nextResult: IteratorResult<T> | undefined
private error?: Error

constructor () {
this.ended = false
Expand Down Expand Up @@ -86,6 +87,7 @@ class QueuelessPushable <T> implements Pushable<T> {

async throw (err?: Error): Promise<IteratorReturnResult<undefined>> {
this.ended = true
this.error = err

if (err != null) {
// this can cause unhandled promise rejections if nothing is awaiting the
Expand Down Expand Up @@ -132,7 +134,7 @@ class QueuelessPushable <T> implements Pushable<T> {

private async _push (value?: T, options?: AbortOptions & RaceSignalOptions): Promise<void> {
if (value != null && this.ended) {
throw new Error('Cannot push value onto an ended pushable')
throw this.error ?? new Error('Cannot push value onto an ended pushable')
}

// wait for all values to be read
Expand Down
8 changes: 8 additions & 0 deletions packages/it-queueless-pushable/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ describe('it-queueless-pushable', () => {
await expect(pushable.push('nope!')).to.eventually.be.rejectedWith('Cannot push value onto an ended pushable')
})

it('should re-throw error when attempting to write to errored pushable', async () => {
const err = new Error('Urk!')
const pushable = queuelessPushable<string>()
void pushable.throw(err)

await expect(pushable.push('derp')).to.eventually.be.rejectedWith(err)
})

it('should push in order even it promises are unawaited', async () => {
const pushable = queuelessPushable<string>()

Expand Down