Skip to content

Commit

Permalink
fix(node-http-handler): http2 lets node exit (#3541)
Browse files Browse the repository at this point in the history
Http2Session keeps node alive even with no requests running, presumably
for server pushed streams. This is a behavior change from node's http 1
Agent, which does not keep node alive when there is no request running.

Emulate that behavior by unref()ing new sessions, and wrapping requests
in a ref() / unref() pair.
  • Loading branch information
simonbuchan committed Apr 21, 2022
1 parent 473f9b5 commit 7480667
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 14 deletions.
116 changes: 102 additions & 14 deletions packages/node-http-handler/src/node-http2-handler.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { AbortController } from "@aws-sdk/abort-controller";
import { HttpRequest } from "@aws-sdk/protocol-http";
import { rejects } from "assert";
import http2, { constants, Http2Stream } from "http2";
import http2, { ClientHttp2Session, ClientHttp2Stream, constants, Http2Stream } from "http2";
import { Duplex } from "stream";
import { promisify } from "util";

Expand Down Expand Up @@ -44,7 +44,20 @@ describe(NodeHttp2Handler.name, () => {
});

describe("without options", () => {
let createdSessions!: ClientHttp2Session[];
const connectReal = http2.connect;
let connectSpy!: jest.SpiedFunction<typeof http2.connect>;

beforeEach(() => {
createdSessions = [];
connectSpy = jest.spyOn(http2, "connect").mockImplementation((...args) => {
const session = connectReal(...args);
jest.spyOn(session, "ref");
jest.spyOn(session, "unref");
createdSessions.push(session);
return session;
});

nodeH2Handler = new NodeHttp2Handler();
});

Expand All @@ -58,51 +71,99 @@ describe(NodeHttp2Handler.name, () => {

describe("number calls to http2.connect", () => {
it("is zero on initialization", () => {
const connectSpy = jest.spyOn(http2, "connect");
expect(connectSpy).not.toHaveBeenCalled();
});

it("is one when request is made", async () => {
const connectSpy = jest.spyOn(http2, "connect");

// Make single request.
await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {});
const response = await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {});
const responseBody = response.response.body as ClientHttp2Stream;

const authority = `${protocol}//${hostname}:${port}`;
expect(connectSpy).toHaveBeenCalledTimes(1);
expect(connectSpy).toHaveBeenCalledWith(authority);

// Keeping node alive while request is open.
expect(createdSessions[0].ref).toHaveBeenCalledTimes(1);
expect(createdSessions[0].unref).toHaveBeenCalledTimes(1);

const closed = new Promise<void>((resolve) => responseBody.once("close", resolve));
(response.response.body as ClientHttp2Stream).destroy();
await closed;

// No longer keeping node alive
expect(createdSessions[0].ref).toHaveBeenCalledTimes(1);
expect(createdSessions[0].unref).toHaveBeenCalledTimes(2);
});

it("is one if multiple requests are made on same URL", async () => {
const connectSpy = jest.spyOn(http2, "connect");

// Make two requests.
await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {});
await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {});
const response1 = await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {});
const response1Body = response1.response.body as ClientHttp2Stream;
const response2 = await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {});
const response2Body = response2.response.body as ClientHttp2Stream;

const authority = `${protocol}//${hostname}:${port}`;
expect(connectSpy).toHaveBeenCalledTimes(1);
expect(connectSpy).toHaveBeenCalledWith(authority);

// Keeping node alive while requests are open.
expect(createdSessions[0].ref).toHaveBeenCalledTimes(2);
expect(createdSessions[0].unref).toHaveBeenCalledTimes(1);

const closed1 = new Promise<void>((resolve) => response1Body.once("close", resolve));
response1Body.destroy();
await closed1;
const closed2 = new Promise<void>((resolve) => response2Body.once("close", resolve));
response2Body.destroy();
await closed2;

// No longer keeping node alive
expect(createdSessions[0].ref).toHaveBeenCalledTimes(2);
expect(createdSessions[0].unref).toHaveBeenCalledTimes(3);
});

it("is many if requests are made on different URLs", async () => {
const connectSpy = jest.spyOn(http2, "connect");

// Make first request on default URL.
await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {});
const response1 = await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {});
const response1Body = response1.response.body as ClientHttp2Stream;

const port2 = port + 1;
const mockH2Server2 = createMockHttp2Server().listen(port2);
mockH2Server2.on("request", createResponseFunction(mockResponse));

// Make second request on URL with port2.
await nodeH2Handler.handle(new HttpRequest({ ...getMockReqOptions(), port: port2 }), {});
const response2 = await nodeH2Handler.handle(new HttpRequest({ ...getMockReqOptions(), port: port2 }), {});
const response2Body = response2.response.body as ClientHttp2Stream;

const authorityPrefix = `${protocol}//${hostname}`;
expect(connectSpy).toHaveBeenCalledTimes(2);
expect(connectSpy).toHaveBeenNthCalledWith(1, `${authorityPrefix}:${port}`);
expect(connectSpy).toHaveBeenNthCalledWith(2, `${authorityPrefix}:${port2}`);
mockH2Server2.close();

// Keeping node alive while requests are open.
expect(createdSessions[0].ref).toHaveBeenCalledTimes(1);
expect(createdSessions[0].unref).toHaveBeenCalledTimes(1);
expect(createdSessions[1].ref).toHaveBeenCalledTimes(1);
expect(createdSessions[1].unref).toHaveBeenCalledTimes(1);

const closed1 = new Promise<void>((resolve) => response1Body.once("close", resolve));
response1Body.destroy();
await closed1;
const closed2 = new Promise<void>((resolve) => response2Body.once("close", resolve));
response2Body.destroy();
await closed2;

// No longer keeping node alive
expect(createdSessions[0].ref).toHaveBeenCalledTimes(1);
expect(createdSessions[0].unref).toHaveBeenCalledTimes(2);
expect(createdSessions[1].ref).toHaveBeenCalledTimes(1);
expect(createdSessions[1].unref).toHaveBeenCalledTimes(2);
});
});

Expand Down Expand Up @@ -151,26 +212,44 @@ describe(NodeHttp2Handler.name, () => {
expect(establishedConnections).toBe(3);
expect(numRequests).toBe(3);

// Not keeping node alive
expect(createdSessions).toHaveLength(3);
expect(createdSessions[0].ref).toHaveBeenCalledTimes(1);
expect(createdSessions[0].unref).toHaveBeenCalledTimes(2);
expect(createdSessions[1].ref).toHaveBeenCalledTimes(1);
expect(createdSessions[1].unref).toHaveBeenCalledTimes(2);
expect(createdSessions[2].ref).toHaveBeenCalledTimes(1);
expect(createdSessions[2].unref).toHaveBeenCalledTimes(2);

// should be able to recover from goaway after reconnecting to a server
// that doesn't send goaway, and reuse the TCP connection (Http2Session)
shouldSendGoAway = false;
mockH2Server3.on("request", createResponseFunction(mockResponse));
await nodeH2Handler.handle(req, {});
const result = await nodeH2Handler.handle(req, {});
const resultReader = result.response.body;

// Keeping node alive
expect(createdSessions).toHaveLength(4);
expect(createdSessions[3].ref).toHaveBeenCalledTimes(1);
expect(createdSessions[3].unref).toHaveBeenCalledTimes(1);

// ...and validate that the mocked response is received
const responseBody = await new Promise((resolve) => {
const buffers = [];
resultReader.on("data", (chunk) => buffers.push(chunk));
resultReader.on("end", () => {
resultReader.on("close", () => {
resolve(Buffer.concat(buffers).toString("utf8"));
});
});
expect(responseBody).toBe("test");
expect(establishedConnections).toBe(4);
expect(numRequests).toBe(5);
expect(numRequests).toBe(4);
mockH2Server3.close();

// Not keeping node alive
expect(createdSessions).toHaveLength(4);
expect(createdSessions[3].ref).toHaveBeenCalledTimes(1);
expect(createdSessions[3].unref).toHaveBeenCalledTimes(2);
});

it("handles connections destroyed by servers", async () => {
Expand Down Expand Up @@ -212,6 +291,15 @@ describe(NodeHttp2Handler.name, () => {
expect(establishedConnections).toBe(3);
expect(numRequests).toBe(3);
mockH2Server3.close();

// Not keeping node alive
expect(createdSessions).toHaveLength(3);
expect(createdSessions[0].ref).toHaveBeenCalledTimes(1);
expect(createdSessions[0].unref).toHaveBeenCalledTimes(2);
expect(createdSessions[1].ref).toHaveBeenCalledTimes(1);
expect(createdSessions[1].unref).toHaveBeenCalledTimes(2);
expect(createdSessions[2].ref).toHaveBeenCalledTimes(1);
expect(createdSessions[2].unref).toHaveBeenCalledTimes(2);
});
});

Expand Down Expand Up @@ -380,7 +468,7 @@ describe(NodeHttp2Handler.name, () => {
const authority = `${protocol}//${hostname}:${port}`;
// @ts-ignore: access private property
const session: ClientHttp2Session = nodeH2Handler.sessionCache.get(authority)[0];
const fakeStream = new Duplex();
const fakeStream = new Duplex() as ClientHttp2Stream;
const fakeRstCode = 1;
// @ts-ignore: fake result code
fakeStream.rstCode = fakeRstCode;
Expand All @@ -405,7 +493,7 @@ describe(NodeHttp2Handler.name, () => {
const authority = `${protocol}//${hostname}:${port}`;
// @ts-ignore: access private property
const session: ClientHttp2Session = nodeH2Handler.sessionCache.get(authority)[0];
const fakeStream = new Duplex();
const fakeStream = new Duplex() as ClientHttp2Stream;
jest.spyOn(session, "request").mockImplementation(() => fakeStream);
// @ts-ignore: access private property
nodeH2Handler.sessionCache.set(`${protocol}//${hostname}:${port}`, [session]);
Expand Down
7 changes: 7 additions & 0 deletions packages/node-http-handler/src/node-http2-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ export class NodeHttp2Handler implements HttpHandler {
[constants.HTTP2_HEADER_METHOD]: method,
});

// Keep node alive while request is in progress. Matched with unref() in close event.
session.ref();

req.on("response", (headers) => {
const httpResponse = new HttpResponse({
statusCode: headers[":status"] || -1,
Expand Down Expand Up @@ -137,6 +140,7 @@ export class NodeHttp2Handler implements HttpHandler {
// http2stream.rstCode property. If the code is any value other than NGHTTP2_NO_ERROR (0),
// an 'error' event will have also been emitted.
req.on("close", () => {
session.unref();
if (this.disableConcurrentStreams) {
session.destroy();
}
Expand Down Expand Up @@ -164,6 +168,9 @@ export class NodeHttp2Handler implements HttpHandler {
if (existingSessions.length > 0 && !disableConcurrentStreams) return existingSessions[0];

const newSession = connect(authority);
// AWS SDK does not expect server push streams, don't keep node alive without a request.
newSession.unref();

const destroySessionCb = () => {
this.destroySession(newSession);
this.deleteSessionFromCache(authority, newSession);
Expand Down

0 comments on commit 7480667

Please sign in to comment.