Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(node-http-handler): h2 handler accept options in provider #3579

Merged
merged 1 commit into from
Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 41 additions & 17 deletions packages/node-http-handler/src/node-http2-handler.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@ describe(NodeHttp2Handler.name, () => {
mockH2Server.close();
});

describe("without options", () => {
describe.each([
["undefined", undefined],
["empty object", {}],
["undefined provider", async () => void 0],
["empty object provider", async () => ({})],
])("without options in constructor parameter of %s", (_, option) => {
let createdSessions!: ClientHttp2Session[];
const connectReal = http2.connect;
let connectSpy!: jest.SpiedFunction<typeof http2.connect>;
Expand All @@ -58,7 +63,7 @@ describe(NodeHttp2Handler.name, () => {
return session;
});

nodeH2Handler = new NodeHttp2Handler();
nodeH2Handler = new NodeHttp2Handler(option);
});

const closeConnection = async (response: HttpResponse) => {
Expand Down Expand Up @@ -357,36 +362,48 @@ describe(NodeHttp2Handler.name, () => {
const requestTimeout = 200;

describe("does not throw error when request not timed out", () => {
it("disableConcurrentStreams: false (default)", async () => {
it.each([
["static object", { requestTimeout }],
["object provider", async () => ({ requestTimeout })],
])("disableConcurrentStreams: false (default) in constructor parameter of %s", async (_, options) => {
mockH2Server.removeAllListeners("request");
mockH2Server.on("request", createResponseFunctionWithDelay(mockResponse, requestTimeout - 100));

nodeH2Handler = new NodeHttp2Handler({ requestTimeout });
nodeH2Handler = new NodeHttp2Handler(options);
await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {});
});

it("disableConcurrentStreams: true", async () => {
it.each([
["static object", { requestTimeout, disableConcurrentStreams: true }],
["object provider", async () => ({ requestTimeout, disableConcurrentStreams: true })],
])("disableConcurrentStreams: true in constructor parameter of %s", async (_, options) => {
mockH2Server.removeAllListeners("request");
mockH2Server.on("request", createResponseFunctionWithDelay(mockResponse, requestTimeout - 100));

nodeH2Handler = new NodeHttp2Handler({ requestTimeout, disableConcurrentStreams: true });
nodeH2Handler = new NodeHttp2Handler(options);
await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {});
});
});

describe("throws timeoutError on requestTimeout", () => {
it("disableConcurrentStreams: false (default)", async () => {
it.each([
["static object", { requestTimeout }],
["object provider", async () => ({ requestTimeout })],
])("disableConcurrentStreams: false (default) in constructor parameter of %s", async (_, options) => {
mockH2Server.removeAllListeners("request");
mockH2Server.on("request", createResponseFunctionWithDelay(mockResponse, requestTimeout + 100));

nodeH2Handler = new NodeHttp2Handler({ requestTimeout });
nodeH2Handler = new NodeHttp2Handler(options);
await rejects(nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {}), {
name: "TimeoutError",
message: `Stream timed out because of no activity for ${requestTimeout} ms`,
});
});

it("disableConcurrentStreams: true", async () => {
it.each([
["object provider", async () => ({ requestTimeout })],
["static object", { requestTimeout }],
])("disableConcurrentStreams: true in constructor parameter of %s", async () => {
mockH2Server.removeAllListeners("request");
mockH2Server.on("request", createResponseFunctionWithDelay(mockResponse, requestTimeout + 100));

Expand All @@ -403,8 +420,11 @@ describe(NodeHttp2Handler.name, () => {
const sessionTimeout = 200;

describe("destroys sessions on sessionTimeout", () => {
it("disableConcurrentStreams: false (default)", async () => {
nodeH2Handler = new NodeHttp2Handler({ sessionTimeout });
it.each([
["object provider", async () => ({ sessionTimeout })],
["static object", { sessionTimeout }],
])("disableConcurrentStreams: false (default) in constructor parameter of %s", async (_, options) => {
nodeH2Handler = new NodeHttp2Handler(options);
await nodeH2Handler.handle(new HttpRequest(getMockReqOptions()), {});

const authority = `${protocol}//${hostname}:${port}`;
Expand All @@ -419,11 +439,14 @@ describe(NodeHttp2Handler.name, () => {
expect(nodeH2Handler.sessionCache.get(authority).length).toStrictEqual(0);
});

it("disableConcurrentStreams: true", async () => {
it.each([
["object provider", async () => ({ sessionTimeout, disableConcurrentStreams: true })],
["static object", { sessionTimeout, disableConcurrentStreams: true }],
])("disableConcurrentStreams: true in constructor parameter of %s", async (_, options) => {
let session;
const authority = `${protocol}//${hostname}:${port}`;

nodeH2Handler = new NodeHttp2Handler({ sessionTimeout, disableConcurrentStreams: true });
nodeH2Handler = new NodeHttp2Handler(options);

mockH2Server.removeAllListeners("request");
mockH2Server.on("request", (request: any, response: any) => {
Expand Down Expand Up @@ -487,11 +510,12 @@ describe(NodeHttp2Handler.name, () => {
);
});

describe("disableConcurrentStreams", () => {
describe.each([
["object provider", async () => ({ disableConcurrentStreams: true })],
["static object", { disableConcurrentStreams: true }],
])("disableConcurrentStreams in constructor parameter of %s", (_, options) => {
beforeEach(() => {
nodeH2Handler = new NodeHttp2Handler({
disableConcurrentStreams: true,
});
nodeH2Handler = new NodeHttp2Handler(options);
});

describe("number calls to http2.connect", () => {
Expand Down
44 changes: 27 additions & 17 deletions packages/node-http-handler/src/node-http2-handler.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { HttpHandler, HttpRequest, HttpResponse } from "@aws-sdk/protocol-http";
import { buildQueryString } from "@aws-sdk/querystring-builder";
import { HttpHandlerOptions } from "@aws-sdk/types";
import { HttpHandlerOptions, Provider } from "@aws-sdk/types";
import { ClientHttp2Session, connect, constants } from "http2";

import { getTransformedHeaders } from "./get-transformed-headers";
Expand Down Expand Up @@ -33,17 +33,24 @@ export interface NodeHttp2HandlerOptions {
}

export class NodeHttp2Handler implements HttpHandler {
private readonly requestTimeout?: number;
private readonly sessionTimeout?: number;
private readonly disableConcurrentStreams?: boolean;
private config?: NodeHttp2HandlerOptions;
private readonly configProvider: Promise<NodeHttp2HandlerOptions>;

public readonly metadata = { handlerProtocol: "h2" };
private sessionCache: Map<string, ClientHttp2Session[]>;

constructor({ requestTimeout, sessionTimeout, disableConcurrentStreams }: NodeHttp2HandlerOptions = {}) {
this.requestTimeout = requestTimeout;
this.sessionTimeout = sessionTimeout;
this.disableConcurrentStreams = disableConcurrentStreams;
constructor(options?: NodeHttp2HandlerOptions | Provider<NodeHttp2HandlerOptions | void>) {
this.configProvider = new Promise((resolve, reject) => {
if (typeof options === "function") {
options()
.then((opts) => {
resolve(opts || {});
})
.catch(reject);
} else {
resolve(options || {});
}
});
this.sessionCache = new Map<string, ClientHttp2Session[]>();
}

Expand All @@ -54,7 +61,11 @@ export class NodeHttp2Handler implements HttpHandler {
this.sessionCache.clear();
}

handle(request: HttpRequest, { abortSignal }: HttpHandlerOptions = {}): Promise<{ response: HttpResponse }> {
async handle(request: HttpRequest, { abortSignal }: HttpHandlerOptions = {}): Promise<{ response: HttpResponse }> {
if (!this.config) {
this.config = await this.configProvider;
}
const { requestTimeout, disableConcurrentStreams } = this.config;
return new Promise((resolve, rejectOriginal) => {
// It's redundant to track fulfilled because promises use the first resolution/rejection
// but avoids generating unnecessary stack traces in the "close" event handler.
Expand All @@ -71,10 +82,10 @@ export class NodeHttp2Handler implements HttpHandler {

const { hostname, method, port, protocol, path, query } = request;
const authority = `${protocol}//${hostname}${port ? `:${port}` : ""}`;
const session = this.getSession(authority, this.disableConcurrentStreams || false);
const session = this.getSession(authority, disableConcurrentStreams || false);

const reject = (err: Error) => {
if (this.disableConcurrentStreams) {
if (disableConcurrentStreams) {
this.destroySession(session);
}
fulfilled = true;
Expand All @@ -100,15 +111,14 @@ export class NodeHttp2Handler implements HttpHandler {
});
fulfilled = true;
resolve({ response: httpResponse });
if (this.disableConcurrentStreams) {
if (disableConcurrentStreams) {
// Gracefully closes the Http2Session, allowing any existing streams to complete
// on their own and preventing new Http2Stream instances from being created.
session.close();
this.deleteSessionFromCache(authority, session);
}
});

const requestTimeout = this.requestTimeout;
if (requestTimeout) {
req.setTimeout(requestTimeout, () => {
req.close();
Expand Down Expand Up @@ -141,7 +151,7 @@ export class NodeHttp2Handler implements HttpHandler {
// an 'error' event will have also been emitted.
req.on("close", () => {
session.unref();
if (this.disableConcurrentStreams) {
if (disableConcurrentStreams) {
session.destroy();
}
if (!fulfilled) {
Expand All @@ -162,6 +172,7 @@ export class NodeHttp2Handler implements HttpHandler {
*/
private getSession(authority: string, disableConcurrentStreams: boolean): ClientHttp2Session {
const sessionCache = this.sessionCache;

const existingSessions = sessionCache.get(authority) || [];

// If concurrent streams are not disabled, we can use the existing session.
Expand All @@ -179,9 +190,8 @@ export class NodeHttp2Handler implements HttpHandler {
newSession.on("error", destroySessionCb);
newSession.on("frameError", destroySessionCb);

const sessionTimeout = this.sessionTimeout;
if (sessionTimeout) {
newSession.setTimeout(sessionTimeout, destroySessionCb);
if (this.config?.sessionTimeout) {
newSession.setTimeout(this.config.sessionTimeout, destroySessionCb);
}

existingSessions.push(newSession);
Expand Down
13 changes: 13 additions & 0 deletions packages/smithy-client/src/defaults-mode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,20 @@ export type ResolvedDefaultsMode = Exclude<DefaultsMode, "auto">;
* @internal
*/
export interface DefaultsModeConfigs {
/**
* The retry mode describing how the retry strategy control the traffic flow.
*/
retryMode?: string;
/**
* The maximum time in milliseconds that the connection phase of a request
* may take before the connection attempt is abandoned.
*/
connectionTimeout?: number;
/**
* This timeout measures the time between when the first byte is sent over an
* established, open connection and when the last byte is received from the
* service. If the response is not received by the timeout, then the request
* is considered timed out.
*/
requestTimeout?: number;
}