From bc9499f0795bf081b0ce606cf54e7b87b20fdaa2 Mon Sep 17 00:00:00 2001 From: Nathan Rajlich Date: Thu, 14 Mar 2024 01:43:24 -0700 Subject: [PATCH] Support streaming responses in HTTP server --- .changeset/pink-singers-smoke.md | 5 ++++ packages/http/src/server.ts | 44 +++++++++++++++++++++++++++----- 2 files changed, 42 insertions(+), 7 deletions(-) create mode 100644 .changeset/pink-singers-smoke.md diff --git a/.changeset/pink-singers-smoke.md b/.changeset/pink-singers-smoke.md new file mode 100644 index 0000000..6623887 --- /dev/null +++ b/.changeset/pink-singers-smoke.md @@ -0,0 +1,5 @@ +--- +"@nx.js/http": patch +--- + +Support streaming responses in HTTP server diff --git a/packages/http/src/server.ts b/packages/http/src/server.ts index 579b958..ff6b795 100644 --- a/packages/http/src/server.ts +++ b/packages/http/src/server.ts @@ -1,8 +1,7 @@ import { bodyStream } from './body'; import { readHeaders, toHeaders } from './headers'; import { UnshiftableStream } from './unshiftable-readable-stream'; - -const encoder = new TextEncoder(); +import { encoder } from './util'; const STATUS_CODES: Record = { '100': 'Continue', @@ -96,11 +95,39 @@ export async function readRequest( }); } +export function createChunkedWriter(readable: ReadableStream) { + const reader = readable.getReader(); + return new ReadableStream({ + async pull(controller) { + const { done, value } = await reader.read(); + const bytes = value?.length ?? 0; + controller.enqueue(encoder.encode(`${bytes.toString(16)}\r\n`)); + if (value) controller.enqueue(value); + controller.enqueue(encoder.encode(`\r\n`)); + if (done) { + controller.close(); + } + }, + }); +} + export async function writeResponse( writable: WritableStream, res: Response, httpVersion: string, ): Promise { + const close = res.headers.get('connection') === 'close'; + const chunked = !(close || res.headers.has('content-length')); + if (!res.headers.has('date')) { + res.headers.set('date', new Date().toUTCString()); + } + if (!res.headers.has('content-type')) { + res.headers.set('content-type', 'text/plain'); + } + if (chunked) { + res.headers.set('transfer-encoding', 'chunked'); + } + let headersStr = ''; for (const [k, v] of res.headers) { headersStr += `${k}: ${v}\r\n`; @@ -110,12 +137,15 @@ export async function writeResponse( const header = `${httpVersion} ${res.status} ${statusText}\r\n${headersStr}\r\n`; const w = writable.getWriter(); w.write(encoder.encode(header)); + w.releaseLock(); - const close = res.headers.get('connection') === 'close'; - if (res.body) { - w.releaseLock(); - await res.body.pipeTo(writable, { preventClose: !close }); + let body = res.body; + if (body) { + if (chunked) { + body = createChunkedWriter(body); + } + await body.pipeTo(writable, { preventClose: !close }); } else if (close) { - await w.close(); + await writable.close(); } }