Skip to content

Commit

Permalink
Refactor HTTP server implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
TooTallNate committed Mar 14, 2024
1 parent 7b6f8b5 commit f887e5a
Show file tree
Hide file tree
Showing 11 changed files with 579 additions and 169 deletions.
8 changes: 8 additions & 0 deletions .changeset/nervous-tigers-dress.md
@@ -0,0 +1,8 @@
---
"@nx.js/http": patch
---

Refactor HTTP server implementation
- Support Keep-Alive by default
- Support `Content-Length` and `Transfer-Encoding: chunked` bodies
- Add tests using `vitest`
7 changes: 5 additions & 2 deletions packages/http/package.json
Expand Up @@ -4,7 +4,8 @@
"description": "HTTP server for nx.js",
"main": "dist/index.js",
"scripts": {
"build": "tsc"
"build": "tsc",
"test": "vitest"
},
"files": [
"dist"
Expand All @@ -14,7 +15,9 @@
},
"devDependencies": {
"nxjs-runtime": "workspace:*",
"typescript": "^5.3.2"
"typescript": "^5.3.2",
"vite": "^5.1.6",
"vitest": "^1.3.1"
},
"keywords": [
"nx.js",
Expand Down
109 changes: 109 additions & 0 deletions packages/http/src/body.ts
@@ -0,0 +1,109 @@
import { flushBytes, readUntilEol } from './util';
import { UnshiftableStream } from './unshiftable-readable-stream';

function bodyWithContentLength(
unshiftable: UnshiftableStream,
contentLengthVal: string,
): ReadableStream<Uint8Array> {
let bytesRead = 0;
const reader = unshiftable.readable.getReader();
const contentLength = parseInt(contentLengthVal, 10);
//console.log(`content-length: ${contentLength}`);
return new ReadableStream<Uint8Array>({
async pull(controller) {
if (bytesRead < contentLength) {
unshiftable.resume();
const { done, value } = await reader.read();
unshiftable.pause();
if (done) {
reader.releaseLock();
controller.close();
return;
}
const remainingLength = contentLength - bytesRead;
const chunkLength = value.length;
if (chunkLength <= remainingLength) {
controller.enqueue(value);
bytesRead += chunkLength;
} else {
// If the chunk is larger than needed, slice it to fit and unshift the rest back
const neededPart = value.slice(0, remainingLength);
const excessPart = value.slice(remainingLength);
controller.enqueue(neededPart);
unshiftable.unshift(excessPart);
bytesRead += neededPart.length;
reader.releaseLock();
controller.close(); // Close the stream as we have read the required content length
}
} else {
reader.releaseLock();
controller.close(); // Close the stream if bytesRead is already equal to contentLength
}
},
cancel() {
reader.cancel();
},
});
}

function bodyWithChunkedEncoding(
unshiftable: UnshiftableStream,
): ReadableStream<Uint8Array> {
const reader = unshiftable.readable.getReader();
return new ReadableStream<Uint8Array>({
async pull(controller) {
const numBytesHex = await readUntilEol(reader, unshiftable);
const numBytes = parseInt(numBytesHex, 16);
if (Number.isNaN(numBytes)) {
return controller.error(
new Error(`Invalid chunk size: ${numBytesHex}`),
);
}
if (numBytes > 0) {
await flushBytes(controller, numBytes, reader, unshiftable);
}
const empty = await readUntilEol(reader, unshiftable);
if (empty) {
return controller.error(
new Error(`Expected \\r\\n after data chunk, received: ${empty}`),
);
}
if (numBytes === 0) {
// This is the final chunk
reader.releaseLock();
controller.close();
}
},

cancel(reason) {
if (reason) {
reader.cancel(reason);
} else {
reader.cancel();
}
},
});
}

export function bodyStream(
unshiftable: UnshiftableStream,
headers: Headers,
): ReadableStream<Uint8Array> {
const contentLength = headers.get('content-length');
if (typeof contentLength === 'string') {
return bodyWithContentLength(unshiftable, contentLength);
}

const transferEncoding = headers.get('transfer-encoding')?.split(/\s*,\s*/);
if (transferEncoding?.includes('chunked')) {
return bodyWithChunkedEncoding(unshiftable);
}

// Identity transfer encoding - read until the end of the stream for the body
const body = new TransformStream<Uint8Array, Uint8Array>();
unshiftable.readable.pipeTo(body.writable).finally(() => {
unshiftable.pause();
});
unshiftable.resume();
return body.readable;
}
12 changes: 12 additions & 0 deletions packages/http/src/deferred.ts
@@ -0,0 +1,12 @@
export class Deferred<T> {
promise: Promise<T>;
resolve!: (v: T) => void;
reject!: (v: any) => void;

constructor() {
this.promise = new Promise((r, j) => {
this.resolve = r;
this.reject = j;
});
}
}
48 changes: 48 additions & 0 deletions packages/http/src/headers.ts
@@ -0,0 +1,48 @@
import { concat, indexOfEol, decoder } from './util';
import { UnshiftableStream } from './unshiftable-readable-stream';

export async function readHeaders(
unshiftable: UnshiftableStream,
): Promise<string[]> {
let leftover: Uint8Array | null = null;
const reader = unshiftable.readable.getReader();
const lines: string[] = [];
while (true) {
unshiftable.resume();
const next = await reader.read();
unshiftable.pause();
if (next.done) return lines;
const chunk: Uint8Array = leftover
? concat(leftover, next.value)
: next.value;
let pos = 0;
while (true) {
const eol = indexOfEol(chunk, pos);
if (eol === -1) {
leftover = chunk.slice(pos);
break;
}
const line = decoder.decode(chunk.slice(pos, eol));
pos = eol + 2;
if (line) {
lines.push(line);
} else {
// end of headers
unshiftable.unshift(chunk.slice(pos));
reader.releaseLock();
return lines;
}
}
}
}

export function toHeaders(input: string[]) {
const headers = new Headers();
for (const line of input) {
const col = line.indexOf(':');
const name = line.slice(0, col);
const value = line.slice(col + 1).trim();
headers.set(name, value);
}
return headers;
}

0 comments on commit f887e5a

Please sign in to comment.