Skip to content

Commit

Permalink
Add more tests for Node.js universal handlers (#657)
Browse files Browse the repository at this point in the history
  • Loading branch information
timostamm committed May 30, 2023
1 parent a57540c commit f83fe73
Showing 1 changed file with 124 additions and 48 deletions.
172 changes: 124 additions & 48 deletions packages/connect-node/src/node-universal-handler.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,23 @@ import * as http2 from "http2";
import * as http from "http";
import { universalRequestFromNodeRequest } from "./node-universal-handler.js";
import { ConnectError } from "@bufbuild/connect";
import { readAllBytes } from "@bufbuild/connect/protocol";
import { getNodeErrorProps } from "./node-error.js";
import {
assertByteStreamRequest,
readAllBytes,
type UniversalServerRequest,
} from "@bufbuild/connect/protocol";

// Polyfill the Headers API for Node versions < 18
import "./node-headers-polyfill.js";

describe("universalRequestFromNodeRequest()", function () {
describe("with HTTP/2 stream closed with an RST code", function () {
let universalRequestSignal: AbortSignal | undefined;
let serverRequest: UniversalServerRequest | undefined;
const server = useNodeServer(() => {
universalRequestSignal = undefined;
serverRequest = undefined;
return http2.createServer(function (request) {
universalRequestSignal = universalRequestFromNodeRequest(
request,
undefined
).signal;
serverRequest = universalRequestFromNodeRequest(request, undefined);
});
});
async function request(rstCode: number) {
Expand Down Expand Up @@ -63,50 +65,91 @@ describe("universalRequestFromNodeRequest()", function () {
}
it("should abort request signal with ConnectError and Code.Canceled for NO_ERROR", async function () {
await request(http2.constants.NGHTTP2_NO_ERROR);
expect(universalRequestSignal).toBeInstanceOf(AbortSignal);
expect(universalRequestSignal?.aborted).toBeTrue();
expect(universalRequestSignal?.reason).toBeInstanceOf(Error);
if (universalRequestSignal?.reason instanceof Error) {
expect(universalRequestSignal.reason.name).toBe("AbortError");
expect(universalRequestSignal.reason.message).toBe(
expect(serverRequest?.signal).toBeInstanceOf(AbortSignal);
expect(serverRequest?.signal.aborted).toBeTrue();
expect(serverRequest?.signal.reason).toBeInstanceOf(Error);
if (serverRequest?.signal.reason instanceof Error) {
expect(serverRequest.signal.reason.name).toBe("AbortError");
expect(serverRequest.signal.reason.message).toBe(
"This operation was aborted"
);
}
});
it("should silently end request body stream for NO_ERROR", async function () {
await request(http2.constants.NGHTTP2_NO_ERROR);
expect(serverRequest).toBeDefined();
if (serverRequest !== undefined) {
assertByteStreamRequest(serverRequest);
const it = serverRequest.body[Symbol.asyncIterator]();
const r = await it.next();
expect(r.done).toBeTrue();
}
});
it("should abort request signal with ConnectError and Code.Canceled for CANCEL", async function () {
await request(http2.constants.NGHTTP2_CANCEL);
expect(universalRequestSignal).toBeInstanceOf(AbortSignal);
expect(universalRequestSignal?.aborted).toBeTrue();
expect(universalRequestSignal?.reason).toBeInstanceOf(ConnectError);
const ce = ConnectError.from(universalRequestSignal?.reason);
expect(serverRequest?.signal).toBeInstanceOf(AbortSignal);
expect(serverRequest?.signal.aborted).toBeTrue();
expect(serverRequest?.signal.reason).toBeInstanceOf(ConnectError);
const ce = ConnectError.from(serverRequest?.signal.reason);
expect(ce.message).toBe(
"[canceled] http/2 stream closed with RST code CANCEL (0x8)"
);
});
it("should silently end request body stream for CANCEL", async function () {
await request(http2.constants.NGHTTP2_CANCEL);
expect(serverRequest).toBeDefined();
if (serverRequest !== undefined) {
assertByteStreamRequest(serverRequest);
const it = serverRequest.body[Symbol.asyncIterator]();
const r = await it.next();
expect(r.done).toBeTrue();
}
});
it("should abort request signal with ConnectError and Code.ResourceExhausted for ENHANCE_YOUR_CALM", async function () {
await request(http2.constants.NGHTTP2_ENHANCE_YOUR_CALM);
expect(universalRequestSignal).toBeInstanceOf(AbortSignal);
expect(universalRequestSignal?.aborted).toBeTrue();
expect(universalRequestSignal?.reason).toBeInstanceOf(ConnectError);
const ce = ConnectError.from(universalRequestSignal?.reason);
expect(serverRequest?.signal).toBeInstanceOf(AbortSignal);
expect(serverRequest?.signal.aborted).toBeTrue();
expect(serverRequest?.signal.reason).toBeInstanceOf(ConnectError);
const ce = ConnectError.from(serverRequest?.signal.reason);
expect(ce.message).toBe(
"[resource_exhausted] http/2 stream closed with RST code ENHANCE_YOUR_CALM (0xb)"
);
});
it("should silently end request body stream for ENHANCE_YOUR_CALM", async function () {
await request(http2.constants.NGHTTP2_ENHANCE_YOUR_CALM);
expect(serverRequest).toBeDefined();
if (serverRequest !== undefined) {
assertByteStreamRequest(serverRequest);
const it = serverRequest.body[Symbol.asyncIterator]();
const r = await it.next();
expect(r.done).toBeTrue();
}
});
it("should abort request signal with ConnectError and Code.Internal for FRAME_SIZE_ERROR", async function () {
await request(http2.constants.NGHTTP2_FRAME_SIZE_ERROR);
expect(universalRequestSignal).toBeInstanceOf(AbortSignal);
expect(universalRequestSignal?.aborted).toBeTrue();
expect(universalRequestSignal?.reason).toBeInstanceOf(ConnectError);
const ce = ConnectError.from(universalRequestSignal?.reason);
expect(serverRequest?.signal).toBeInstanceOf(AbortSignal);
expect(serverRequest?.signal.aborted).toBeTrue();
expect(serverRequest?.signal.reason).toBeInstanceOf(ConnectError);
const ce = ConnectError.from(serverRequest?.signal.reason);
expect(ce.message).toBe(
"[internal] http/2 stream closed with RST code FRAME_SIZE_ERROR (0x6)"
);
});
it("should silently end request body stream for FRAME_SIZE_ERROR", async function () {
await request(http2.constants.NGHTTP2_FRAME_SIZE_ERROR);
expect(serverRequest).toBeDefined();
if (serverRequest !== undefined) {
assertByteStreamRequest(serverRequest);
const it = serverRequest.body[Symbol.asyncIterator]();
const r = await it.next();
expect(r.done).toBeTrue();
}
});
});
describe("with HTTP/1.1 ECONNRESET", function () {
let serverAbortReason: undefined | unknown;
let serverRequest: UniversalServerRequest | undefined;
const server = useNodeServer(() => {
serverRequest = undefined;
const s = http.createServer(
{
// We want to test behavior when a connection is dropped, and we do
Expand All @@ -130,10 +173,7 @@ describe("universalRequestFromNodeRequest()", function () {
requestTimeout: 0,
},
function (request) {
const uReq = universalRequestFromNodeRequest(request, undefined);
uReq.signal.addEventListener("abort", () => {
serverAbortReason = uReq.signal.reason;
});
serverRequest = universalRequestFromNodeRequest(request, undefined);
}
);
// For some reason, the type definitions for ServerOptions do not include
Expand All @@ -143,8 +183,8 @@ describe("universalRequestFromNodeRequest()", function () {
s.headersTimeout = 0;
return s;
});
it("should abort request signal with ConnectError and Code.Aborted", async function () {
await new Promise<void>((resolve) => {
async function request() {
return new Promise<void>((resolve) => {
const request = http.request(server.getUrl(), {
method: "POST",
});
Expand All @@ -158,19 +198,43 @@ describe("universalRequestFromNodeRequest()", function () {
resolve();
}, 20);
});
while (serverAbortReason === undefined) {
}
it("should abort request signal with ConnectError and Code.Aborted", async function () {
await request();
while (serverRequest?.signal.reason === undefined) {
await new Promise((r) => setTimeout(r, 1));
}
expect(serverAbortReason).toBeInstanceOf(Error);
if (serverAbortReason instanceof Error) {
expect(serverAbortReason).toBeInstanceOf(ConnectError);
const ce = ConnectError.from(serverAbortReason);
expect(serverRequest.signal.reason).toBeInstanceOf(Error);
if (serverRequest.signal.reason instanceof Error) {
expect(serverRequest.signal.reason).toBeInstanceOf(ConnectError);
const ce = ConnectError.from(serverRequest.signal.reason);
expect(ce.message).toBe("[aborted] aborted");
}
});
it("should error request body stream with ECONNRESET", async function () {
await request();
expect(serverRequest).toBeDefined();
if (serverRequest !== undefined) {
assertByteStreamRequest(serverRequest);
const it = serverRequest.body[Symbol.asyncIterator]();
try {
await it.next();
fail("expected error");
} catch (e) {
expect(e).toBeInstanceOf(Error);
expect(e).not.toBeInstanceOf(ConnectError);
if (e instanceof Error) {
expect(e.message).toBe("aborted");
expect(getNodeErrorProps(e)).toEqual({
code: "ECONNRESET",
});
}
}
}
});
});
describe("with HTTP/1.1 request finishing without error", function () {
let universalRequestSignal: AbortSignal | undefined;
let serverRequest: UniversalServerRequest | undefined;
const server = useNodeServer(() => {
const s = http.createServer(
{
Expand All @@ -195,8 +259,7 @@ describe("universalRequestFromNodeRequest()", function () {
requestTimeout: 0,
},
function (request, response) {
const uReq = universalRequestFromNodeRequest(request, undefined);
universalRequestSignal = uReq.signal;
serverRequest = universalRequestFromNodeRequest(request, undefined);
response.writeHead(200);
response.end();
}
Expand All @@ -208,8 +271,8 @@ describe("universalRequestFromNodeRequest()", function () {
s.headersTimeout = 0;
return s;
});
it("should abort request signal with AbortError", async function () {
await new Promise<void>((resolve) => {
async function request() {
return new Promise<void>((resolve) => {
const request = http.request(server.getUrl(), {
method: "POST",
// close TCP connection after we're done so that the server shuts down cleanly
Expand All @@ -223,15 +286,28 @@ describe("universalRequestFromNodeRequest()", function () {
);
});
});
expect(universalRequestSignal).toBeInstanceOf(AbortSignal);
expect(universalRequestSignal?.aborted).toBeTrue();
expect(universalRequestSignal?.reason).toBeInstanceOf(Error);
if (universalRequestSignal?.reason instanceof Error) {
expect(universalRequestSignal.reason.name).toBe("AbortError");
expect(universalRequestSignal.reason.message).toBe(
}
it("should abort request signal with AbortError", async function () {
await request();
expect(serverRequest?.signal).toBeInstanceOf(AbortSignal);
expect(serverRequest?.signal.aborted).toBeTrue();
expect(serverRequest?.signal.reason).toBeInstanceOf(Error);
if (serverRequest?.signal.reason instanceof Error) {
expect(serverRequest.signal.reason.name).toBe("AbortError");
expect(serverRequest.signal.reason.message).toBe(
"This operation was aborted"
);
}
});
it("should silently end request body stream", async function () {
await request();
expect(serverRequest).toBeDefined();
if (serverRequest !== undefined) {
assertByteStreamRequest(serverRequest);
const it = serverRequest.body[Symbol.asyncIterator]();
const r = await it.next();
expect(r.done).toBeTrue();
}
});
});
});

0 comments on commit f83fe73

Please sign in to comment.