Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(node): add webToNodeHandler #252

Merged
merged 6 commits into from
May 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/utils-node/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@hiogawa/utils-node",
"version": "0.0.1-pre.14",
"version": "0.0.1-pre.16",
"homepage": "https://github.com/hi-ogawa/js-utils/tree/main/packages/utils-node",
"repository": {
"type": "git",
Expand Down
125 changes: 125 additions & 0 deletions packages/utils-node/src/http.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import { createServer } from "node:http";
import { createManualPromise, tinyassert } from "@hiogawa/utils";
import { describe, expect, test, vi } from "vitest";
import { type WebHandler, webToNodeHandler } from "./http";

describe(webToNodeHandler, () => {
test("basic", async () => {
const abortFn = vi.fn();
await using server = await testWebHandler((request) => {
request.signal.addEventListener("abort", () => {
abortFn("abort");
});
const body = "hello = " + new URL(request.url).pathname;
return new Response(body, {
headers: { "content-type": "text/x-hello" },
});
});
const res = await fetch(server.url + "/abc");

expect(server.nextFn.mock.calls).toMatchInlineSnapshot(`[]`);
expect(abortFn.mock.calls).toMatchInlineSnapshot(`[]`);
expect(res.headers.get("content-type")).toMatchInlineSnapshot(
`"text/x-hello"`
);
expect(await res.text()).toMatchInlineSnapshot(`"hello = /abc"`);
});

test("stream", async () => {
// https://github.com/hi-ogawa/reproductions/pull/9

const trackFn = vi.fn();
const abortPromise = createManualPromise<void>();
const cancelPromise = createManualPromise<void>();
async function handler(req: Request) {
let aborted = false;
req.signal.addEventListener("abort", () => {
trackFn("abort");
abortPromise.resolve();
aborted = true;
});

let cancelled = false;
const stream = new ReadableStream<string>({
async start(controller) {
for (let i = 0; !aborted && !cancelled; i++) {
controller.enqueue(`i = ${i}\n`);
await new Promise((resolve) => setTimeout(resolve, 200));
}
controller.close();
},
cancel() {
trackFn("cancel");
cancelPromise.resolve();
cancelled = true;
},
});

return new Response(stream.pipeThrough(new TextEncoderStream()));
}

await using server = await testWebHandler(handler);
const abortController = new AbortController();
const res = await fetch(server.url, { signal: abortController.signal });
tinyassert(res.ok);
tinyassert(res.body);
const chunks: string[] = [];
const promise = res.body.pipeThrough(new TextDecoderStream()).pipeTo(
new WritableStream({
write(chunk) {
chunks.push(chunk);
if (chunk.includes("i = 2")) {
abortController.abort();
}
},
})
);
await expect(promise).rejects.toMatchInlineSnapshot(
`[AbortError: This operation was aborted]`
);
expect(chunks).toMatchInlineSnapshot(`
[
"i = 0
",
"i = 1
",
"i = 2
",
]
`);
await Promise.all([abortPromise, cancelPromise]);
expect(trackFn.mock.calls).toMatchInlineSnapshot(`
[
[
"abort",
],
[
"cancel",
],
]
`);
expect(server.nextFn.mock.calls).toMatchInlineSnapshot(`[]`);
});
});

async function testWebHandler(handler: WebHandler) {
// node server
const nextFn = vi.fn();
const nodeHandler = webToNodeHandler(handler);
const server = createServer((req, res) => nodeHandler(req, res, nextFn));

// listen
await new Promise<void>((resolve) => server.listen(() => resolve()));

// get address
const address = server.address();
tinyassert(address);
tinyassert(typeof address !== "string");
const url = `http://localhost:${address.port}`;

return {
url,
nextFn,
[Symbol.asyncDispose]: () => server[Symbol.asyncDispose](),
};
}
97 changes: 97 additions & 0 deletions packages/utils-node/src/http.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import type http from "node:http";
import { Readable } from "node:stream";

// https://github.com/hattipjs/hattip/blob/69237d181300b200a14114df2c3c115c44e0f3eb/packages/adapter/adapter-node/src/common.ts
// https://github.com/remix-run/remix/blob/6eb6acf3f065f4574fbc96e4e8879482ce77b560/packages/remix-express/server.ts
// https://github.com/honojs/node-server/blob/becc420b5d4ea51977a8e5b12e945ceb09c2ee9a/src/listener.ts
// https://github.com/Shopify/hydrogen/blob/adc840f584129968d1710d494d19165e6664a60d/packages/mini-oxygen/src/vite/utils.ts

export type WebHandler = (
request: Request
) => Response | undefined | Promise<Response | undefined>;

export type NodeHandler = (
req: http.IncomingMessage,
res: http.ServerResponse,
next: (error?: unknown) => void
) => void;

export function webToNodeHandler(handler: WebHandler): NodeHandler {
return async (req, res, next) => {
try {
const request = createRequest(req, res);
const response = await handler(request);
if (response) {
sendResponse(response, res);
} else {
next();
}
} catch (e) {
next(e);
}
};
}

function createRequest(
req: http.IncomingMessage,
res: http.ServerResponse
): Request {
// cf. hono
const abortController = new AbortController();
res.once("close", () => {
if (req.destroyed) {
abortController.abort();
}
});

// cf. hattip, remix
const headers = new Headers();
for (const [k, v] of Object.entries(req.headers)) {
// skip http2 pseudo header since it breaks undici
if (k.startsWith(":")) {
continue;
}
if (typeof v === "string") {
headers.set(k, v);
} else if (Array.isArray(v)) {
v.forEach((v) => headers.append(k, v));
}
}

return new Request(
new URL(req.url || "/", `http://${req.headers.host || "unknown.local"}`),
{
method: req.method,
body:
req.method === "GET" || req.method === "HEAD"
? null
: (Readable.toWeb(req) as any),
headers,
signal: abortController.signal,
// @ts-ignore required for undici ReadableStream body
duplex: "half",
}
);
}

function sendResponse(response: Response, res: http.ServerResponse) {
// cf. hydrogen
const headers = Object.fromEntries(response.headers);
if (headers["set-cookie"]) {
delete headers["set-cookie"];
res.setHeader("set-cookie", response.headers.getSetCookie());
}
res.writeHead(response.status, response.statusText, headers);

const body = response.body;
if (body) {
const abortController = new AbortController();
res.once("close", () => abortController.abort());
const nodeBody = Readable.fromWeb(body as any, {
signal: abortController.signal,
});
nodeBody.pipe(res);
} else {
res.end();
}
}
1 change: 1 addition & 0 deletions packages/utils-node/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from "./script";
export * from "./prompt";
export * from "./parse-args";
export * from "./http";
8 changes: 8 additions & 0 deletions packages/utils-node/vitest.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { defineConfig } from "vitest/config";

export default defineConfig({
esbuild: {
// transpile `using`
target: "es2022",
},
});