Skip to content

Commit

Permalink
refactor(xrpc): factorize iterable to ReadableStream convertor
Browse files Browse the repository at this point in the history
  • Loading branch information
matthieusieben committed Apr 23, 2024
1 parent 0a1bbcc commit f4ab44f
Showing 1 changed file with 47 additions and 39 deletions.
86 changes: 47 additions & 39 deletions packages/xrpc/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,45 +217,9 @@ export function encodeMethodCallBody(
}

if (isIterable(data)) {
// Note that some environments support using Iterable / AsyncIterable as the
// body directly (Node's fetch), but not all (browsers). Let's make sure we
// always return a ReadableStream. ReadableStream.from is not available in
// all environments. Luckily, it is quite easy to implement ourselves.

// https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/from_static
// return ReadableStream.from(data)

const getIterator = async function* () {
for await (const part of data) yield part
}

let it: AsyncGenerator<unknown, void, undefined>
return new ReadableStream<Uint8Array>({
type: 'bytes',
start() {
it = getIterator()
},
async pull(controller: ReadableStreamDefaultController) {
const { done, value } = await it.next()
try {
if (done) {
controller.close()
} else {
// @ts-expect-error Uint8Array will throw if value is not valid
const bytes = new Uint8Array(value)
if (bytes.byteLength) controller.enqueue(bytes)
}
} catch (err) {
// cancel() won't be called if an error is thrown here. We need to
// manually return the iterator to avoid a memory leak.
await it.return()
throw err
}
},
async cancel() {
await it.return()
},
})
// Note that some environments support using Iterable & AsyncIterable as the
// body (e.g. Node's fetch), but not all of them do (browsers).
return iterableToReadableStream(data)
}

if (headers['content-type'].startsWith('text/')) {
Expand Down Expand Up @@ -293,6 +257,50 @@ export function encodeMethodCallBody(
)
}

/**
* @see {@link https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/from_static}
*/
function iterableToReadableStream(
iterable: Iterable<unknown> | AsyncIterable<unknown>,
): ReadableStream<Uint8Array> {
// Use the native ReadableStream.from() if available.
if ('from' in ReadableStream && typeof ReadableStream.from === 'function') {
return ReadableStream.from(iterable)
}

// Note, in environments where ReadableStream is not available either, we
// *could* load the iterable into memory and create an Arraybuffer from it.
// However, this would be a bad idea for large iterables. In order to keep
// things simple, we'll just allow the anonymous ReadableStream constructor
// to throw an error in those environments, hinting the user of the lib to find
// an alternate solution in that case (e.g. use a Blob if available).

let generator: AsyncGenerator<unknown, void, undefined>
return new ReadableStream<Uint8Array>({
type: 'bytes',
start() {
// Wrap the iterable in an async generator to handle both sync and async
// iterables, and make sure that the return() method exists.
generator = (async function* () {
yield* iterable
})()
},
async pull(controller: ReadableStreamDefaultController) {
const { done, value } = await generator.next()
if (done) {
controller.close()
} else {
// @ts-expect-error Uint8Array will be empty if value is not valid
const bytes = new Uint8Array(value)
if (bytes.byteLength) controller.enqueue(bytes)
}
},
async cancel() {
await generator.return()
},
})
}

export function httpResponseCodeToEnum(status: number): ResponseType {
let resCode: ResponseType
if (status in ResponseType) {
Expand Down

0 comments on commit f4ab44f

Please sign in to comment.