diff --git a/packages/connect-node/src/node-universal-client.spec.ts b/packages/connect-node/src/node-universal-client.spec.ts new file mode 100644 index 000000000..73cbdb18d --- /dev/null +++ b/packages/connect-node/src/node-universal-client.spec.ts @@ -0,0 +1,153 @@ +// Copyright 2021-2023 Buf Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import * as http2 from "http2"; +import type * as net from "net"; +import { createAsyncIterable } from "@bufbuild/connect/protocol"; +import { createNodeHttp2Client } from "./node-universal-client.js"; +import { encodeEnvelope } from "@bufbuild/connect/protocol"; + +describe("Node.js http2 API", function () { + it("should see reset codes from the client side on the server", async function () { + const server = await startServer(); + await h2RequestWithReset(server.baseUrl, http2.constants.NGHTTP2_CANCEL); + const { rstCode } = await server.stop(); + expect(rstCode).toBe(http2.constants.NGHTTP2_CANCEL); + }); + + /** + * Issues an H2 request, and immediately resets with the given code. + */ + async function h2RequestWithReset(baseUrl: string, rstCode: number) { + return new Promise((resolve) => { + http2.connect(baseUrl, (session: http2.ClientHttp2Session) => { + const stream = session.request( + { + ":method": "POST", + ":path": "/", + }, + {} + ); + setTimeout(() => { + stream.close(rstCode, () => { + session.close(); + setTimeout(() => resolve(), 0); + }); + }, 0); + }); + }); + } +}); + +describe("universal node http2 client", function () { + describe("with a signal that is already aborted", function () { + it("should raise error with code canceled", async function () { + const signal = AbortSignal.abort(); + const client = createNodeHttp2Client( + "http://example.com", + false, + undefined + ); + try { + await client({ + url: "http://example.com", + method: "POST", + header: new Headers(), + body: createAsyncIterable([]), + signal, + }); + fail("expected error"); + } catch (e) { + expect((e as Error).message).toBe( + "[canceled] operation was aborted via signal" + ); + } + }); + }); + describe("with a signal aborting mid request", function () { + it("should send RST_STREAM with code CANCEL", async function () { + const server = await startServer(); + + // set up a client that aborts while still streaming the request body + const ac = new AbortController(); + const client = createNodeHttp2Client(server.baseUrl, false, undefined); + async function* body() { + await new Promise((resolve) => setTimeout(resolve, 50)); + ac.abort(); + yield encodeEnvelope(0, new Uint8Array(0)); + } + try { + await client({ + url: server.baseUrl, + method: "POST", + header: new Headers(), + body: body(), + signal: ac.signal, + }); + fail("expected error"); + } catch (e) { + expect((e as Error).message).toBe( + "[canceled] operation was aborted via signal" + ); + } + + const { rstCode } = await server.stop(); + expect(rstCode).toBe(http2.constants.NGHTTP2_CANCEL); + }); + }); +}); + +/** + * Start an H2 server that expects all requests to be closed right away. + * When stopped, waits for all connections to close, then returns the last + * received reset code. + */ +async function startServer(): Promise<{ + baseUrl: string; + stop(): Promise<{ rstCode: number }>; +}> { + const s = http2.createServer({}); + let rstCode = -1; + s.on("stream", (stream) => { + stream.on("close", () => { + rstCode = stream.rstCode; + }); + }); + await new Promise((resolve) => { + s.listen(0, () => resolve(s)); + }); + return { + baseUrl: `http://localhost:${(s.address() as net.AddressInfo).port}`, + async stop() { + for (;;) { + const count = await new Promise((resolve, reject) => { + s.getConnections((err, count) => { + if (err) { + return reject(err); + } + return resolve(count); + }); + }); + if (count === 0) { + break; + } + await new Promise((resolve) => setTimeout(resolve, 10)); + } + s.close(); + return Promise.resolve({ + rstCode, + }); + }, + }; +} diff --git a/packages/connect-node/src/node-universal-client.ts b/packages/connect-node/src/node-universal-client.ts index 928a24504..da5b6cb13 100644 --- a/packages/connect-node/src/node-universal-client.ts +++ b/packages/connect-node/src/node-universal-client.ts @@ -122,7 +122,7 @@ export function createNodeHttp2Client( return async function request( req: UniversalClientRequest ): Promise { - const sentinel = createSentinel(); + const sentinel = createSentinel(req.signal); return new Promise((resolve, reject) => { sentinel.catch((e) => { reject(e); @@ -133,9 +133,8 @@ export function createNodeHttp2Client( req.url, req.method, webHeaderToNodeHeaders(req.header), - { - signal: req.signal, - }, + req.signal, + {}, (stream) => { void pipeTo(req.body, sinkRequest(sentinel, stream), { propagateDownStreamError: true, @@ -287,7 +286,8 @@ function h2Request( url: string, method: string, headers: http2.OutgoingHttpHeaders, - options: http2.ClientSessionRequestOptions, + signal: AbortSignal | undefined, + options: Omit, onStream: (stream: http2.ClientHttp2Stream) => void ): void { const requestUrl = new URL(url); @@ -321,7 +321,33 @@ function h2Request( session.off("error", sentinel.reject); session.off("error", h2SessionConnectError); session.on("error", sentinel.reject); + const stream = session.request( + { + ...headers, + ":method": method, + ":path": requestUrl.pathname, + }, + options + ); sentinel + .catch((reason) => { + return new Promise((resolve) => { + if (stream.closed) { + return resolve(); + } + // Node.js http2 streams that are aborted via an AbortSignal close with + // an RST_STREAM with code INTERNAL_ERROR. + // To comply with the mapping between gRPC and HTTP/2 codes, we need to + // close with code CANCEL. + // See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#errors + // See https://www.rfc-editor.org/rfc/rfc7540#section-7 + if (reason instanceof ConnectError && reason.code == Code.Canceled) { + return stream.close(http2.constants.NGHTTP2_CANCEL, resolve); + } + // For other reasons, INTERNAL_ERROR is the best fit. + stream.close(http2.constants.NGHTTP2_INTERNAL_ERROR, resolve); + }); + }) .finally(() => { session.off("error", sentinel.reject); if (!sessionHolder.keepOpen) { @@ -332,14 +358,6 @@ function h2Request( // We intentionally swallow sentinel rejection - errors must // propagate through the request or response iterables. }); - const stream = session.request( - { - ...headers, - ":method": method, - ":path": requestUrl.pathname, - }, - options - ); stream.on("error", function h2StreamError(e: unknown) { if ( stream.writableEnded && @@ -424,7 +442,7 @@ type Sentinel = Promise & { race(promise: PromiseLike): Promise>; }; -function createSentinel(): Sentinel { +function createSentinel(signal?: AbortSignal): Sentinel { let res: (() => void) | undefined; let rej: ((reason: ConnectError | unknown) => void) | undefined; let resolved = false; @@ -463,5 +481,24 @@ function createSentinel(): Sentinel { return r as Awaited; }, }; - return Object.assign(p, c); + const s = Object.assign(p, c); + function onSignalAbort() { + c.reject( + new ConnectError("operation was aborted via signal", Code.Canceled) + ); + } + if (signal) { + if (signal.aborted) { + onSignalAbort(); + } else { + signal.addEventListener("abort", onSignalAbort); + } + p.finally(() => signal.removeEventListener("abort", onSignalAbort)).catch( + () => { + // We intentionally swallow sentinel rejection - errors must + // propagate through the request or response iterables. + } + ); + } + return s; }