Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/it-byte-stream/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
"abort-error": "^1.0.1",
"it-queueless-pushable": "^1.0.0",
"it-stream-types": "^2.0.2",
"race-signal": "^1.1.3",
"uint8arraylist": "^2.4.8"
},
"devDependencies": {
Expand Down
76 changes: 33 additions & 43 deletions packages/it-byte-stream/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,30 @@
* ```
*/

import { AbortError } from 'abort-error'
import { queuelessPushable } from 'it-queueless-pushable'
import { raceSignal } from 'race-signal'
import { Uint8ArrayList } from 'uint8arraylist'
import { UnexpectedEOFError } from './errors.js'
import type { AbortOptions } from 'abort-error'
import type { Duplex } from 'it-stream-types'

export interface ReadOptions extends AbortOptions {
bytes: number
}

export interface ByteStream <Stream = unknown> {
/**
* Read a set number of bytes from the stream
* Read bytes from the stream.
*
* If a required number of bytes is passed as an option, this will wait for
* the underlying stream to supply that number of bytes, throwing an
* `UnexpectedEOFError` if the stream closes before this happens.
*
* If no required number of bytes is passed, this will return `null` if the
* underlying stream closes before supplying any bytes.
*/
read(bytes?: number, options?: AbortOptions): Promise<Uint8ArrayList>
read(options: ReadOptions): Promise<Uint8ArrayList>
read(options?: AbortOptions): Promise<Uint8ArrayList | null>

/**
* Write the passed bytes to the stream
Expand Down Expand Up @@ -71,7 +83,7 @@ export function byteStream <Stream extends Duplex<any, any, any>> (duplex: Strea
await write.end()
}

let source = duplex.source
let source: AsyncGenerator<any> = duplex.source

if (duplex.source[Symbol.iterator] != null) {
source = duplex.source[Symbol.iterator]()
Expand All @@ -82,56 +94,34 @@ export function byteStream <Stream extends Duplex<any, any, any>> (duplex: Strea
const readBuffer = new Uint8ArrayList()

const W: ByteStream<Stream> = {
read: async (bytes?: number, options?: AbortOptions) => {
read: async (options?: ReadOptions) => {
options?.signal?.throwIfAborted()

let listener: EventListener | undefined
if (options?.bytes == null) {
// just read whatever arrives
const { done, value } = await raceSignal(source.next(), options?.signal)

const abortPromise = new Promise((resolve, reject) => {
listener = () => {
reject(new AbortError('Read aborted'))
if (done === true) {
return null
}

options?.signal?.addEventListener('abort', listener)
})

try {
if (bytes == null) {
// just read whatever arrives
const { done, value } = await Promise.race([
source.next(),
abortPromise
])
return value
}

if (done === true) {
return new Uint8ArrayList()
}
while (readBuffer.byteLength < options.bytes) {
const { value, done } = await raceSignal(source.next(), options?.signal)

return value
if (done === true) {
throw new UnexpectedEOFError('unexpected end of input')
}

while (readBuffer.byteLength < bytes) {
const { value, done } = await Promise.race([
source.next(),
abortPromise
])

if (done === true) {
throw new UnexpectedEOFError('unexpected end of input')
}

readBuffer.append(value)
}
readBuffer.append(value)
}

const buf = readBuffer.sublist(0, bytes)
readBuffer.consume(bytes)
const buf = readBuffer.sublist(0, options.bytes)
readBuffer.consume(options.bytes)

return buf
} finally {
if (listener != null) {
options?.signal?.removeEventListener('abort', listener)
}
}
return buf
},
write: async (data, options?: AbortOptions) => {
options?.signal?.throwIfAborted()
Expand Down
31 changes: 28 additions & 3 deletions packages/it-byte-stream/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,31 @@ const tests: Record<string, Test<any>> = {
}
}

describe('it-byte-stream', () => {
it('returns null if underlying stream is empty', async () => {
const b = byteStream({
source: [],
sink: async () => {}
})

const res = await b.read()

expect(res).to.be.null()
})

it('throws EOF if underlying stream is empty and bytes are specified', async () => {
const b = byteStream({
source: [],
sink: async () => {}
})

await expect(b.read({
bytes: 10
})).to.eventually.be.rejected
.with.property('name', 'UnexpectedEOFError')
})
})

Object.keys(tests).forEach(key => {
const test = tests[key]

Expand All @@ -70,7 +95,7 @@ Object.keys(tests).forEach(key => {
const data = test.from('ww')

void b.write(data)
const res = await b.read(2)
const res = await b.read({ bytes: 2 })

expect(res.subarray()).to.equalBytes(data.subarray())
})
Expand All @@ -81,8 +106,8 @@ Object.keys(tests).forEach(key => {
const r = test.from('w')
void b.write(data)

const r1 = await b.read(1)
const r2 = await b.read(1)
const r1 = await b.read({ bytes: 1 })
const r2 = await b.read({ bytes: 1 })

expect(r.subarray()).to.equalBytes(r1.subarray())
expect(r.subarray()).to.equalBytes(r2.subarray())
Expand Down
10 changes: 8 additions & 2 deletions packages/it-length-prefixed-stream/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ export function lpStream <Stream extends Duplex<any, any, any>> (duplex: Stream,

while (true) {
// read one byte at a time until we can decode a varint
lengthBuffer.append(await bytes.read(1, options))
lengthBuffer.append(await bytes.read({
...options,
bytes: 1
}))

try {
dataLength = decodeLength(lengthBuffer)
Expand Down Expand Up @@ -110,7 +113,10 @@ export function lpStream <Stream extends Duplex<any, any, any>> (duplex: Stream,
throw new InvalidDataLengthError('message length too long')
}

return bytes.read(dataLength, options)
return bytes.read({
...options,
bytes: dataLength
})
},
write: async (data, options?: AbortOptions) => {
// encode, write
Expand Down
4 changes: 2 additions & 2 deletions packages/it-length-prefixed-stream/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Object.keys(tests).forEach(key => {
await expect(lp.read({
signal: AbortSignal.timeout(10)
})).to.eventually.be.rejected()
.with.property('message', 'Read aborted')
.with.property('name', 'AbortError')
})

it('waits for read when writing', async () => {
Expand Down Expand Up @@ -116,7 +116,7 @@ Object.keys(tests).forEach(key => {
const length = test.allocUnsafe(4)
test.writeInt32BE(length, data.length, 0)
const expected = test.concat([length, data])
expect(res.subarray()).to.equalBytes(expected.subarray())
expect(res?.subarray()).to.equalBytes(expected.subarray())
})

it('lp fixed decode', async () => {
Expand Down