Skip to content

Commit

Permalink
Abort HTTP/2 streams with RST_STREAM code CANCEL (#552)
Browse files Browse the repository at this point in the history
  • Loading branch information
timostamm committed Mar 29, 2023
1 parent c18d298 commit 028c8f9
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 15 deletions.
153 changes: 153 additions & 0 deletions packages/connect-node/src/node-universal-client.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright 2021-2023 Buf Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import * as http2 from "http2";
import type * as net from "net";
import { createAsyncIterable } from "@bufbuild/connect/protocol";
import { createNodeHttp2Client } from "./node-universal-client.js";
import { encodeEnvelope } from "@bufbuild/connect/protocol";

describe("Node.js http2 API", function () {
it("should see reset codes from the client side on the server", async function () {
const server = await startServer();
await h2RequestWithReset(server.baseUrl, http2.constants.NGHTTP2_CANCEL);
const { rstCode } = await server.stop();
expect(rstCode).toBe(http2.constants.NGHTTP2_CANCEL);
});

/**
* Issues an H2 request, and immediately resets with the given code.
*/
async function h2RequestWithReset(baseUrl: string, rstCode: number) {
return new Promise<void>((resolve) => {
http2.connect(baseUrl, (session: http2.ClientHttp2Session) => {
const stream = session.request(
{
":method": "POST",
":path": "/",
},
{}
);
setTimeout(() => {
stream.close(rstCode, () => {
session.close();
setTimeout(() => resolve(), 0);
});
}, 0);
});
});
}
});

describe("universal node http2 client", function () {
describe("with a signal that is already aborted", function () {
it("should raise error with code canceled", async function () {
const signal = AbortSignal.abort();
const client = createNodeHttp2Client(
"http://example.com",
false,
undefined
);
try {
await client({
url: "http://example.com",
method: "POST",
header: new Headers(),
body: createAsyncIterable([]),
signal,
});
fail("expected error");
} catch (e) {
expect((e as Error).message).toBe(
"[canceled] operation was aborted via signal"
);
}
});
});
describe("with a signal aborting mid request", function () {
it("should send RST_STREAM with code CANCEL", async function () {
const server = await startServer();

// set up a client that aborts while still streaming the request body
const ac = new AbortController();
const client = createNodeHttp2Client(server.baseUrl, false, undefined);
async function* body() {
await new Promise<void>((resolve) => setTimeout(resolve, 50));
ac.abort();
yield encodeEnvelope(0, new Uint8Array(0));
}
try {
await client({
url: server.baseUrl,
method: "POST",
header: new Headers(),
body: body(),
signal: ac.signal,
});
fail("expected error");
} catch (e) {
expect((e as Error).message).toBe(
"[canceled] operation was aborted via signal"
);
}

const { rstCode } = await server.stop();
expect(rstCode).toBe(http2.constants.NGHTTP2_CANCEL);
});
});
});

/**
* Start an H2 server that expects all requests to be closed right away.
* When stopped, waits for all connections to close, then returns the last
* received reset code.
*/
async function startServer(): Promise<{
baseUrl: string;
stop(): Promise<{ rstCode: number }>;
}> {
const s = http2.createServer({});
let rstCode = -1;
s.on("stream", (stream) => {
stream.on("close", () => {
rstCode = stream.rstCode;
});
});
await new Promise<http2.Http2Server>((resolve) => {
s.listen(0, () => resolve(s));
});
return {
baseUrl: `http://localhost:${(s.address() as net.AddressInfo).port}`,
async stop() {
for (;;) {
const count = await new Promise<number>((resolve, reject) => {
s.getConnections((err, count) => {
if (err) {
return reject(err);
}
return resolve(count);
});
});
if (count === 0) {
break;
}
await new Promise((resolve) => setTimeout(resolve, 10));
}
s.close();
return Promise.resolve({
rstCode,
});
},
};
}
67 changes: 52 additions & 15 deletions packages/connect-node/src/node-universal-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ export function createNodeHttp2Client(
return async function request(
req: UniversalClientRequest
): Promise<UniversalClientResponse> {
const sentinel = createSentinel();
const sentinel = createSentinel(req.signal);
return new Promise<UniversalClientResponse>((resolve, reject) => {
sentinel.catch((e) => {
reject(e);
Expand All @@ -133,9 +133,8 @@ export function createNodeHttp2Client(
req.url,
req.method,
webHeaderToNodeHeaders(req.header),
{
signal: req.signal,
},
req.signal,
{},
(stream) => {
void pipeTo(req.body, sinkRequest(sentinel, stream), {
propagateDownStreamError: true,
Expand Down Expand Up @@ -287,7 +286,8 @@ function h2Request(
url: string,
method: string,
headers: http2.OutgoingHttpHeaders,
options: http2.ClientSessionRequestOptions,
signal: AbortSignal | undefined,
options: Omit<http2.ClientSessionRequestOptions, "signal">,
onStream: (stream: http2.ClientHttp2Stream) => void
): void {
const requestUrl = new URL(url);
Expand Down Expand Up @@ -321,7 +321,33 @@ function h2Request(
session.off("error", sentinel.reject);
session.off("error", h2SessionConnectError);
session.on("error", sentinel.reject);
const stream = session.request(
{
...headers,
":method": method,
":path": requestUrl.pathname,
},
options
);
sentinel
.catch((reason) => {
return new Promise<void>((resolve) => {
if (stream.closed) {
return resolve();
}
// Node.js http2 streams that are aborted via an AbortSignal close with
// an RST_STREAM with code INTERNAL_ERROR.
// To comply with the mapping between gRPC and HTTP/2 codes, we need to
// close with code CANCEL.
// See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#errors
// See https://www.rfc-editor.org/rfc/rfc7540#section-7
if (reason instanceof ConnectError && reason.code == Code.Canceled) {
return stream.close(http2.constants.NGHTTP2_CANCEL, resolve);
}
// For other reasons, INTERNAL_ERROR is the best fit.
stream.close(http2.constants.NGHTTP2_INTERNAL_ERROR, resolve);
});
})
.finally(() => {
session.off("error", sentinel.reject);
if (!sessionHolder.keepOpen) {
Expand All @@ -332,14 +358,6 @@ function h2Request(
// We intentionally swallow sentinel rejection - errors must
// propagate through the request or response iterables.
});
const stream = session.request(
{
...headers,
":method": method,
":path": requestUrl.pathname,
},
options
);
stream.on("error", function h2StreamError(e: unknown) {
if (
stream.writableEnded &&
Expand Down Expand Up @@ -424,7 +442,7 @@ type Sentinel = Promise<void> & {
race<T>(promise: PromiseLike<T>): Promise<Awaited<T>>;
};

function createSentinel(): Sentinel {
function createSentinel(signal?: AbortSignal): Sentinel {
let res: (() => void) | undefined;
let rej: ((reason: ConnectError | unknown) => void) | undefined;
let resolved = false;
Expand Down Expand Up @@ -463,5 +481,24 @@ function createSentinel(): Sentinel {
return r as Awaited<T>;
},
};
return Object.assign(p, c);
const s = Object.assign(p, c);
function onSignalAbort() {
c.reject(
new ConnectError("operation was aborted via signal", Code.Canceled)
);
}
if (signal) {
if (signal.aborted) {
onSignalAbort();
} else {
signal.addEventListener("abort", onSignalAbort);
}
p.finally(() => signal.removeEventListener("abort", onSignalAbort)).catch(
() => {
// We intentionally swallow sentinel rejection - errors must
// propagate through the request or response iterables.
}
);
}
return s;
}

0 comments on commit 028c8f9

Please sign in to comment.