From df8b8976dcb1d2fb17f826f7ceb930eb83c6885a Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Wed, 7 Dec 2022 10:19:07 -0500 Subject: [PATCH] grpc-js: Refactor Transport and SubchannelConnector out of Subchannel --- packages/grpc-js/src/subchannel-call.ts | 30 +- packages/grpc-js/src/subchannel-pool.ts | 4 +- packages/grpc-js/src/subchannel.ts | 693 ++---------------------- packages/grpc-js/src/transport.ts | 634 ++++++++++++++++++++++ 4 files changed, 714 insertions(+), 647 deletions(-) create mode 100644 packages/grpc-js/src/transport.ts diff --git a/packages/grpc-js/src/subchannel-call.ts b/packages/grpc-js/src/subchannel-call.ts index 2bc6fb0c7..6556bc460 100644 --- a/packages/grpc-js/src/subchannel-call.ts +++ b/packages/grpc-js/src/subchannel-call.ts @@ -21,12 +21,12 @@ import * as os from 'os'; import { Status } from './constants'; import { Metadata } from './metadata'; import { StreamDecoder } from './stream-decoder'; -import { SubchannelCallStatsTracker, Subchannel } from './subchannel'; import * as logging from './logging'; import { LogVerbosity } from './constants'; import { ServerSurfaceCall } from './server-call'; import { Deadline } from './deadline'; import { InterceptingListener, MessageContext, StatusObject, WriteCallback } from './call-interface'; +import { CallEventTracker } from './transport'; const TRACER_NAME = 'subchannel_call'; @@ -110,9 +110,9 @@ export class Http2SubchannelCall implements SubchannelCall { constructor( private readonly http2Stream: http2.ClientHttp2Stream, - private readonly callStatsTracker: SubchannelCallStatsTracker, + private readonly callEventTracker: CallEventTracker, private readonly listener: SubchannelCallInterceptingListener, - private readonly subchannel: Subchannel, + private readonly peerName: string, private readonly callId: number ) { this.disconnectListener = () => { @@ -122,8 +122,6 @@ export class Http2SubchannelCall implements SubchannelCall { metadata: new Metadata(), }); }; - subchannel.addDisconnectListener(this.disconnectListener); - subchannel.callRef(); http2Stream.on('response', (headers, flags) => { let headersString = ''; for (const header of Object.keys(headers)) { @@ -185,7 +183,7 @@ export class Http2SubchannelCall implements SubchannelCall { for (const message of messages) { this.trace('parsed message of length ' + message.length); - this.callStatsTracker!.addMessageReceived(); + this.callEventTracker!.addMessageReceived(); this.tryPush(message); } }); @@ -289,7 +287,15 @@ export class Http2SubchannelCall implements SubchannelCall { ); this.internalError = err; } - this.callStatsTracker.onStreamEnd(false); + this.callEventTracker.onStreamEnd(false); + }); + } + + public onDisconnect() { + this.endCall({ + code: Status.UNAVAILABLE, + details: 'Connection dropped', + metadata: new Metadata(), }); } @@ -304,7 +310,7 @@ export class Http2SubchannelCall implements SubchannelCall { this.finalStatus!.details + '"' ); - this.callStatsTracker.onCallEnd(this.finalStatus!); + this.callEventTracker.onCallEnd(this.finalStatus!); /* We delay the actual action of bubbling up the status to insulate the * cleanup code in this class from any errors that may be thrown in the * upper layers as a result of bubbling up the status. In particular, @@ -319,8 +325,6 @@ export class Http2SubchannelCall implements SubchannelCall { * not push more messages after the status is output, so the messages go * nowhere either way. */ this.http2Stream.resume(); - this.subchannel.callUnref(); - this.subchannel.removeDisconnectListener(this.disconnectListener); } } @@ -395,7 +399,7 @@ export class Http2SubchannelCall implements SubchannelCall { } private handleTrailers(headers: http2.IncomingHttpHeaders) { - this.callStatsTracker.onStreamEnd(true); + this.callEventTracker.onStreamEnd(true); let headersString = ''; for (const header of Object.keys(headers)) { headersString += '\t\t' + header + ': ' + headers[header] + '\n'; @@ -467,7 +471,7 @@ export class Http2SubchannelCall implements SubchannelCall { } getPeer(): string { - return this.subchannel.getAddress(); + return this.peerName; } getCallNumber(): number { @@ -506,7 +510,7 @@ export class Http2SubchannelCall implements SubchannelCall { context.callback?.(); }; this.trace('sending data chunk of length ' + message.length); - this.callStatsTracker.addMessageSent(); + this.callEventTracker.addMessageSent(); try { this.http2Stream!.write(message, cb); } catch (error) { diff --git a/packages/grpc-js/src/subchannel-pool.ts b/packages/grpc-js/src/subchannel-pool.ts index b7ef362c3..bbfbea02b 100644 --- a/packages/grpc-js/src/subchannel-pool.ts +++ b/packages/grpc-js/src/subchannel-pool.ts @@ -23,6 +23,7 @@ import { } from './subchannel-address'; import { ChannelCredentials } from './channel-credentials'; import { GrpcUri, uriToString } from './uri-parser'; +import { Http2SubchannelConnector } from './transport'; // 10 seconds in milliseconds. This value is arbitrary. /** @@ -143,7 +144,8 @@ export class SubchannelPool { channelTargetUri, subchannelTarget, channelArguments, - channelCredentials + channelCredentials, + new Http2SubchannelConnector(channelTargetUri) ); if (!(channelTarget in this.pool)) { this.pool[channelTarget] = []; diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index 0d9773b30..b4876f178 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -15,60 +15,30 @@ * */ -import * as http2 from 'http2'; import { ChannelCredentials } from './channel-credentials'; import { Metadata } from './metadata'; import { ChannelOptions } from './channel-options'; -import { PeerCertificate, checkServerIdentity, TLSSocket, CipherNameAndProtocol } from 'tls'; import { ConnectivityState } from './connectivity-state'; import { BackoffTimeout, BackoffOptions } from './backoff-timeout'; -import { getDefaultAuthority } from './resolver'; import * as logging from './logging'; import { LogVerbosity, Status } from './constants'; -import { getProxiedConnection, ProxyConnectionResult } from './http_proxy'; -import * as net from 'net'; -import { GrpcUri, parseUri, splitHostPort, uriToString } from './uri-parser'; -import { ConnectionOptions } from 'tls'; +import { GrpcUri, uriToString } from './uri-parser'; import { - stringToSubchannelAddress, SubchannelAddress, subchannelAddressToString, } from './subchannel-address'; -import { SubchannelRef, ChannelzTrace, ChannelzChildrenTracker, SubchannelInfo, registerChannelzSubchannel, ChannelzCallTracker, SocketInfo, SocketRef, unregisterChannelzRef, registerChannelzSocket, TlsInfo } from './channelz'; +import { SubchannelRef, ChannelzTrace, ChannelzChildrenTracker, SubchannelInfo, registerChannelzSubchannel, ChannelzCallTracker, unregisterChannelzRef } from './channelz'; import { ConnectivityStateListener } from './subchannel-interface'; -import { Http2SubchannelCall, SubchannelCallInterceptingListener } from './subchannel-call'; -import { getNextCallNumber } from './call-number'; +import { SubchannelCallInterceptingListener } from './subchannel-call'; import { SubchannelCall } from './subchannel-call'; -import { InterceptingListener, StatusObject } from './call-interface'; - -const clientVersion = require('../../package.json').version; +import { CallEventTracker, SubchannelConnector, Transport } from './transport'; const TRACER_NAME = 'subchannel'; -const FLOW_CONTROL_TRACER_NAME = 'subchannel_flowctrl'; /* setInterval and setTimeout only accept signed 32 bit integers. JS doesn't * have a constant for the max signed 32 bit integer, so this is a simple way * to calculate it */ const KEEPALIVE_MAX_TIME_MS = ~(1 << 31); -const KEEPALIVE_TIMEOUT_MS = 20000; - -export interface SubchannelCallStatsTracker { - addMessageSent(): void; - addMessageReceived(): void; - onCallEnd(status: StatusObject): void; - onStreamEnd(success: boolean): void; -} - -const { - HTTP2_HEADER_AUTHORITY, - HTTP2_HEADER_CONTENT_TYPE, - HTTP2_HEADER_METHOD, - HTTP2_HEADER_PATH, - HTTP2_HEADER_TE, - HTTP2_HEADER_USER_AGENT, -} = http2.constants; - -const tooManyPingsData: Buffer = Buffer.from('too_many_pings', 'ascii'); export class Subchannel { /** @@ -79,7 +49,7 @@ export class Subchannel { /** * The underlying http2 session used to make requests. */ - private session: http2.ClientHttp2Session | null = null; + private transport: Transport | null = null; /** * Indicates that the subchannel should transition from TRANSIENT_FAILURE to * CONNECTING instead of IDLE when the backoff timeout ends. @@ -92,45 +62,9 @@ export class Subchannel { */ private stateListeners: ConnectivityStateListener[] = []; - /** - * A list of listener functions that will be called when the underlying - * socket disconnects. Used for ending active calls with an UNAVAILABLE - * status. - */ - private disconnectListeners: Set<() => void> = new Set(); - private backoffTimeout: BackoffTimeout; - /** - * The complete user agent string constructed using channel args. - */ - private userAgent: string; - - /** - * The amount of time in between sending pings - */ - private keepaliveTimeMs: number = KEEPALIVE_MAX_TIME_MS; - /** - * The amount of time to wait for an acknowledgement after sending a ping - */ - private keepaliveTimeoutMs: number = KEEPALIVE_TIMEOUT_MS; - /** - * Timer reference for timeout that indicates when to send the next ping - */ - private keepaliveIntervalId: NodeJS.Timer; - /** - * Timer reference tracking when the most recent ping will be considered lost - */ - private keepaliveTimeoutId: NodeJS.Timer; - /** - * Indicates whether keepalive pings should be sent without any active calls - */ - private keepaliveWithoutCalls = false; - - /** - * Tracks calls with references to this subchannel - */ - private callRefcount = 0; + private keepaliveTimeMultiplier = 1; /** * Tracks channels and subchannel pools with references to this subchannel */ @@ -149,18 +83,7 @@ export class Subchannel { private childrenTracker = new ChannelzChildrenTracker(); // Channelz socket info - private channelzSocketRef: SocketRef | null = null; - /** - * Name of the remote server, if it is not the same as the subchannel - * address, i.e. if connecting through an HTTP CONNECT proxy. - */ - private remoteName: string | null = null; private streamTracker = new ChannelzCallTracker(); - private keepalivesSent = 0; - private messagesSent = 0; - private messagesReceived = 0; - private lastMessageSentTimestamp: Date | null = null; - private lastMessageReceivedTimestamp: Date | null = null; /** * A class representing a connection to a single backend. @@ -176,33 +99,9 @@ export class Subchannel { private channelTarget: GrpcUri, private subchannelAddress: SubchannelAddress, private options: ChannelOptions, - private credentials: ChannelCredentials + private credentials: ChannelCredentials, + private connector: SubchannelConnector ) { - // Build user-agent string. - this.userAgent = [ - options['grpc.primary_user_agent'], - `grpc-node-js/${clientVersion}`, - options['grpc.secondary_user_agent'], - ] - .filter((e) => e) - .join(' '); // remove falsey values first - - if ('grpc.keepalive_time_ms' in options) { - this.keepaliveTimeMs = options['grpc.keepalive_time_ms']!; - } - if ('grpc.keepalive_timeout_ms' in options) { - this.keepaliveTimeoutMs = options['grpc.keepalive_timeout_ms']!; - } - if ('grpc.keepalive_permit_without_calls' in options) { - this.keepaliveWithoutCalls = - options['grpc.keepalive_permit_without_calls'] === 1; - } else { - this.keepaliveWithoutCalls = false; - } - this.keepaliveIntervalId = setTimeout(() => {}, 0); - clearTimeout(this.keepaliveIntervalId); - this.keepaliveTimeoutId = setTimeout(() => {}, 0); - clearTimeout(this.keepaliveTimeoutId); const backoffOptions: BackoffOptions = { initialDelay: options['grpc.initial_reconnect_backoff_ms'], maxDelay: options['grpc.max_reconnect_backoff_ms'], @@ -233,67 +132,6 @@ export class Subchannel { }; } - private getChannelzSocketInfo(): SocketInfo | null { - if (this.session === null) { - return null; - } - const sessionSocket = this.session.socket; - const remoteAddress = sessionSocket.remoteAddress ? stringToSubchannelAddress(sessionSocket.remoteAddress, sessionSocket.remotePort) : null; - const localAddress = sessionSocket.localAddress ? stringToSubchannelAddress(sessionSocket.localAddress, sessionSocket.localPort) : null; - let tlsInfo: TlsInfo | null; - if (this.session.encrypted) { - const tlsSocket: TLSSocket = sessionSocket as TLSSocket; - const cipherInfo: CipherNameAndProtocol & {standardName?: string} = tlsSocket.getCipher(); - const certificate = tlsSocket.getCertificate(); - const peerCertificate = tlsSocket.getPeerCertificate(); - tlsInfo = { - cipherSuiteStandardName: cipherInfo.standardName ?? null, - cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name, - localCertificate: (certificate && 'raw' in certificate) ? certificate.raw : null, - remoteCertificate: (peerCertificate && 'raw' in peerCertificate) ? peerCertificate.raw : null - }; - } else { - tlsInfo = null; - } - const socketInfo: SocketInfo = { - remoteAddress: remoteAddress, - localAddress: localAddress, - security: tlsInfo, - remoteName: this.remoteName, - streamsStarted: this.streamTracker.callsStarted, - streamsSucceeded: this.streamTracker.callsSucceeded, - streamsFailed: this.streamTracker.callsFailed, - messagesSent: this.messagesSent, - messagesReceived: this.messagesReceived, - keepAlivesSent: this.keepalivesSent, - lastLocalStreamCreatedTimestamp: this.streamTracker.lastCallStartedTimestamp, - lastRemoteStreamCreatedTimestamp: null, - lastMessageSentTimestamp: this.lastMessageSentTimestamp, - lastMessageReceivedTimestamp: this.lastMessageReceivedTimestamp, - localFlowControlWindow: this.session.state.localWindowSize ?? null, - remoteFlowControlWindow: this.session.state.remoteWindowSize ?? null - }; - return socketInfo; - } - - private resetChannelzSocketInfo() { - if (!this.channelzEnabled) { - return; - } - if (this.channelzSocketRef) { - unregisterChannelzRef(this.channelzSocketRef); - this.childrenTracker.unrefChild(this.channelzSocketRef); - this.channelzSocketRef = null; - } - this.remoteName = null; - this.streamTracker = new ChannelzCallTracker(); - this.keepalivesSent = 0; - this.messagesSent = 0; - this.messagesReceived = 0; - this.lastMessageSentTimestamp = null; - this.lastMessageReceivedTimestamp = null; - } - private trace(text: string): void { logging.trace(LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); } @@ -302,18 +140,6 @@ export class Subchannel { logging.trace(LogVerbosity.DEBUG, 'subchannel_refcount', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); } - private flowControlTrace(text: string): void { - logging.trace(LogVerbosity.DEBUG, FLOW_CONTROL_TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); - } - - private internalsTrace(text: string): void { - logging.trace(LogVerbosity.DEBUG, 'subchannel_internals', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); - } - - private keepaliveTrace(text: string): void { - logging.trace(LogVerbosity.DEBUG, 'keepalive', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); - } - private handleBackoffTimer() { if (this.continueConnecting) { this.transitionToState( @@ -340,313 +166,39 @@ export class Subchannel { this.backoffTimeout.reset(); } - private sendPing() { - if (this.channelzEnabled) { - this.keepalivesSent += 1; - } - this.keepaliveTrace('Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms'); - this.keepaliveTimeoutId = setTimeout(() => { - this.keepaliveTrace('Ping timeout passed without response'); - this.handleDisconnect(); - }, this.keepaliveTimeoutMs); - this.keepaliveTimeoutId.unref?.(); - try { - this.session!.ping( - (err: Error | null, duration: number, payload: Buffer) => { - this.keepaliveTrace('Received ping response'); - clearTimeout(this.keepaliveTimeoutId); - } - ); - } catch (e) { - /* If we fail to send a ping, the connection is no longer functional, so - * we should discard it. */ - this.transitionToState( - [ConnectivityState.READY], - ConnectivityState.TRANSIENT_FAILURE - ); - } - } - - private startKeepalivePings() { - this.keepaliveIntervalId = setInterval(() => { - this.sendPing(); - }, this.keepaliveTimeMs); - this.keepaliveIntervalId.unref?.(); - /* Don't send a ping immediately because whatever caused us to start - * sending pings should also involve some network activity. */ - } - - /** - * Stop keepalive pings when terminating a connection. This discards the - * outstanding ping timeout, so it should not be called if the same - * connection will still be used. - */ - private stopKeepalivePings() { - clearInterval(this.keepaliveIntervalId); - clearTimeout(this.keepaliveTimeoutId); - } - - private createSession(proxyConnectionResult: ProxyConnectionResult) { - if (proxyConnectionResult.realTarget) { - this.remoteName = uriToString(proxyConnectionResult.realTarget); - this.trace('creating HTTP/2 session through proxy to ' + proxyConnectionResult.realTarget); - } else { - this.remoteName = null; - this.trace('creating HTTP/2 session'); - } - const targetAuthority = getDefaultAuthority( - proxyConnectionResult.realTarget ?? this.channelTarget - ); - let connectionOptions: http2.SecureClientSessionOptions = - this.credentials._getConnectionOptions() || {}; - connectionOptions.maxSendHeaderBlockLength = Number.MAX_SAFE_INTEGER; - if ('grpc-node.max_session_memory' in this.options) { - connectionOptions.maxSessionMemory = this.options[ - 'grpc-node.max_session_memory' - ]; - } else { - /* By default, set a very large max session memory limit, to effectively - * disable enforcement of the limit. Some testing indicates that Node's - * behavior degrades badly when this limit is reached, so we solve that - * by disabling the check entirely. */ - connectionOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER; - } - let addressScheme = 'http://'; - if ('secureContext' in connectionOptions) { - addressScheme = 'https://'; - // If provided, the value of grpc.ssl_target_name_override should be used - // to override the target hostname when checking server identity. - // This option is used for testing only. - if (this.options['grpc.ssl_target_name_override']) { - const sslTargetNameOverride = this.options[ - 'grpc.ssl_target_name_override' - ]!; - connectionOptions.checkServerIdentity = ( - host: string, - cert: PeerCertificate - ): Error | undefined => { - return checkServerIdentity(sslTargetNameOverride, cert); - }; - connectionOptions.servername = sslTargetNameOverride; - } else { - const authorityHostname = - splitHostPort(targetAuthority)?.host ?? 'localhost'; - // We want to always set servername to support SNI - connectionOptions.servername = authorityHostname; - } - if (proxyConnectionResult.socket) { - /* This is part of the workaround for - * https://github.com/nodejs/node/issues/32922. Without that bug, - * proxyConnectionResult.socket would always be a plaintext socket and - * this would say - * connectionOptions.socket = proxyConnectionResult.socket; */ - connectionOptions.createConnection = (authority, option) => { - return proxyConnectionResult.socket!; - }; - } - } else { - /* In all but the most recent versions of Node, http2.connect does not use - * the options when establishing plaintext connections, so we need to - * establish that connection explicitly. */ - connectionOptions.createConnection = (authority, option) => { - if (proxyConnectionResult.socket) { - return proxyConnectionResult.socket; - } else { - /* net.NetConnectOpts is declared in a way that is more restrictive - * than what net.connect will actually accept, so we use the type - * assertion to work around that. */ - return net.connect(this.subchannelAddress); - } - }; - } - - connectionOptions = { - ...connectionOptions, - ...this.subchannelAddress, - }; - - /* http2.connect uses the options here: - * https://github.com/nodejs/node/blob/70c32a6d190e2b5d7b9ff9d5b6a459d14e8b7d59/lib/internal/http2/core.js#L3028-L3036 - * The spread operator overides earlier values with later ones, so any port - * or host values in the options will be used rather than any values extracted - * from the first argument. In addition, the path overrides the host and port, - * as documented for plaintext connections here: - * https://nodejs.org/api/net.html#net_socket_connect_options_connectlistener - * and for TLS connections here: - * https://nodejs.org/api/tls.html#tls_tls_connect_options_callback. In - * earlier versions of Node, http2.connect passes these options to - * tls.connect but not net.connect, so in the insecure case we still need - * to set the createConnection option above to create the connection - * explicitly. We cannot do that in the TLS case because http2.connect - * passes necessary additional options to tls.connect. - * The first argument just needs to be parseable as a URL and the scheme - * determines whether the connection will be established over TLS or not. - */ - const session = http2.connect( - addressScheme + targetAuthority, - connectionOptions - ); - this.session = session; - this.channelzSocketRef = registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzSocketInfo()!, this.channelzEnabled); - if (this.channelzEnabled) { - this.childrenTracker.refChild(this.channelzSocketRef); + private startConnectingInternal() { + let options = this.options; + if (options['grpc.keepalive_time_ms']) { + const adjustedKeepaliveTime = Math.min(options['grpc.keepalive_time_ms'] * this.keepaliveTimeMultiplier, KEEPALIVE_MAX_TIME_MS); + options = {...options, 'grpc.keepalive_time_ms': adjustedKeepaliveTime}; } - session.unref(); - /* For all of these events, check if the session at the time of the event - * is the same one currently attached to this subchannel, to ensure that - * old events from previous connection attempts cannot cause invalid state - * transitions. */ - session.once('connect', () => { - if (this.session === session) { - this.transitionToState( - [ConnectivityState.CONNECTING], - ConnectivityState.READY - ); - } - }); - session.once('close', () => { - if (this.session === session) { - this.trace('connection closed'); - this.transitionToState( - [ConnectivityState.CONNECTING], - ConnectivityState.TRANSIENT_FAILURE - ); - /* Transitioning directly to IDLE here should be OK because we are not - * doing any backoff, because a connection was established at some - * point */ - this.transitionToState( - [ConnectivityState.READY], - ConnectivityState.IDLE - ); - } - }); - session.once( - 'goaway', - (errorCode: number, lastStreamID: number, opaqueData: Buffer) => { - if (this.session === session) { - /* See the last paragraph of - * https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md#basic-keepalive */ - if ( - errorCode === http2.constants.NGHTTP2_ENHANCE_YOUR_CALM && - opaqueData.equals(tooManyPingsData) - ) { - this.keepaliveTimeMs = Math.min( - 2 * this.keepaliveTimeMs, - KEEPALIVE_MAX_TIME_MS - ); - logging.log( - LogVerbosity.ERROR, - `Connection to ${uriToString(this.channelTarget)} at ${ - this.subchannelAddressString - } rejected by server because of excess pings. Increasing ping interval to ${ - this.keepaliveTimeMs - } ms` - ); + this.connector.connect(this.subchannelAddress, this.credentials, options).then( + transport => { + if (this.transitionToState([ConnectivityState.CONNECTING], ConnectivityState.READY)) { + this.transport = transport; + if (this.channelzEnabled) { + this.childrenTracker.refChild(transport.getChannelzRef()); } - this.trace( - 'connection closed by GOAWAY with code ' + - errorCode - ); - this.transitionToState( - [ConnectivityState.CONNECTING, ConnectivityState.READY], - ConnectivityState.IDLE - ); - } - } - ); - session.once('error', (error) => { - /* Do nothing here. Any error should also trigger a close event, which is - * where we want to handle that. */ - this.trace( - 'connection closed with error ' + - (error as Error).message - ); - }); - if (logging.isTracerEnabled(TRACER_NAME)) { - session.on('remoteSettings', (settings: http2.Settings) => { - this.trace( - 'new settings received' + - (this.session !== session ? ' on the old connection' : '') + - ': ' + - JSON.stringify(settings) - ); - }); - session.on('localSettings', (settings: http2.Settings) => { - this.trace( - 'local settings acknowledged by remote' + - (this.session !== session ? ' on the old connection' : '') + - ': ' + - JSON.stringify(settings) - ); - }); - } - } - - private startConnectingInternal() { - /* Pass connection options through to the proxy so that it's able to - * upgrade it's connection to support tls if needed. - * This is a workaround for https://github.com/nodejs/node/issues/32922 - * See https://github.com/grpc/grpc-node/pull/1369 for more info. */ - const connectionOptions: ConnectionOptions = - this.credentials._getConnectionOptions() || {}; - - if ('secureContext' in connectionOptions) { - connectionOptions.ALPNProtocols = ['h2']; - // If provided, the value of grpc.ssl_target_name_override should be used - // to override the target hostname when checking server identity. - // This option is used for testing only. - if (this.options['grpc.ssl_target_name_override']) { - const sslTargetNameOverride = this.options[ - 'grpc.ssl_target_name_override' - ]!; - connectionOptions.checkServerIdentity = ( - host: string, - cert: PeerCertificate - ): Error | undefined => { - return checkServerIdentity(sslTargetNameOverride, cert); - }; - connectionOptions.servername = sslTargetNameOverride; - } else { - if ('grpc.http_connect_target' in this.options) { - /* This is more or less how servername will be set in createSession - * if a connection is successfully established through the proxy. - * If the proxy is not used, these connectionOptions are discarded - * anyway */ - const targetPath = getDefaultAuthority( - parseUri(this.options['grpc.http_connect_target'] as string) ?? { - path: 'localhost', + transport.addDisconnectListener((tooManyPings) => { + this.transitionToState([ConnectivityState.READY], ConnectivityState.IDLE); + if (tooManyPings) { + this.keepaliveTimeMultiplier *= 2; + logging.log( + LogVerbosity.ERROR, + `Connection to ${uriToString(this.channelTarget)} at ${ + this.subchannelAddressString + } rejected by server because of excess pings. Increasing ping interval multiplier to ${ + this.keepaliveTimeMultiplier + } ms` + ); } - ); - const hostPort = splitHostPort(targetPath); - connectionOptions.servername = hostPort?.host ?? targetPath; + }); } - } - } - - getProxiedConnection( - this.subchannelAddress, - this.options, - connectionOptions - ).then( - (result) => { - this.createSession(result); }, - (reason) => { - this.transitionToState( - [ConnectivityState.CONNECTING], - ConnectivityState.TRANSIENT_FAILURE - ); + error => { + this.transitionToState([ConnectivityState.CONNECTING], ConnectivityState.TRANSIENT_FAILURE); } - ); - } - - private handleDisconnect() { - this.transitionToState( - [ConnectivityState.READY], - ConnectivityState.TRANSIENT_FAILURE); - for (const listener of this.disconnectListeners.values()) { - listener(); - } + ) } /** @@ -676,15 +228,6 @@ export class Subchannel { switch (newState) { case ConnectivityState.READY: this.stopBackoff(); - const session = this.session!; - session.socket.once('close', () => { - if (this.session === session) { - this.handleDisconnect(); - } - }); - if (this.keepaliveWithoutCalls) { - this.startKeepalivePings(); - } break; case ConnectivityState.CONNECTING: this.startBackoff(); @@ -692,12 +235,11 @@ export class Subchannel { this.continueConnecting = false; break; case ConnectivityState.TRANSIENT_FAILURE: - if (this.session) { - this.session.close(); + if (this.channelzEnabled && this.transport) { + this.childrenTracker.unrefChild(this.transport.getChannelzRef()); } - this.session = null; - this.resetChannelzSocketInfo(); - this.stopKeepalivePings(); + this.transport?.shutdown(); + this.transport = null; /* If the backoff timer has already ended by the time we get to the * TRANSIENT_FAILURE state, we want to immediately transition out of * TRANSIENT_FAILURE as though the backoff timer is ending right now */ @@ -708,12 +250,11 @@ export class Subchannel { } break; case ConnectivityState.IDLE: - if (this.session) { - this.session.close(); + if (this.channelzEnabled && this.transport) { + this.childrenTracker.unrefChild(this.transport.getChannelzRef()); } - this.session = null; - this.resetChannelzSocketInfo(); - this.stopKeepalivePings(); + this.transport?.shutdown(); + this.transport = null; break; default: throw new Error(`Invalid state: unknown ConnectivityState ${newState}`); @@ -726,66 +267,6 @@ export class Subchannel { return true; } - /** - * Check if the subchannel associated with zero calls and with zero channels. - * If so, shut it down. - */ - private checkBothRefcounts() { - /* If no calls, channels, or subchannel pools have any more references to - * this subchannel, we can be sure it will never be used again. */ - if (this.callRefcount === 0 && this.refcount === 0) { - if (this.channelzEnabled) { - this.channelzTrace.addTrace('CT_INFO', 'Shutting down'); - } - this.transitionToState( - [ConnectivityState.CONNECTING, ConnectivityState.READY], - ConnectivityState.IDLE - ); - if (this.channelzEnabled) { - unregisterChannelzRef(this.channelzRef); - } - } - } - - callRef() { - this.refTrace( - 'callRefcount ' + - this.callRefcount + - ' -> ' + - (this.callRefcount + 1) - ); - if (this.callRefcount === 0) { - if (this.session) { - this.session.ref(); - } - this.backoffTimeout.ref(); - if (!this.keepaliveWithoutCalls) { - this.startKeepalivePings(); - } - } - this.callRefcount += 1; - } - - callUnref() { - this.refTrace( - 'callRefcount ' + - this.callRefcount + - ' -> ' + - (this.callRefcount - 1) - ); - this.callRefcount -= 1; - if (this.callRefcount === 0) { - if (this.session) { - this.session.unref(); - } - this.backoffTimeout.unref(); - if (!this.keepaliveWithoutCalls) { - clearInterval(this.keepaliveIntervalId); - } - this.checkBothRefcounts(); - } - } - ref() { this.refTrace( 'refcount ' + @@ -804,7 +285,18 @@ export class Subchannel { (this.refcount - 1) ); this.refcount -= 1; - this.checkBothRefcounts(); + if (this.refcount === 0) { + if (this.channelzEnabled) { + this.channelzTrace.addTrace('CT_INFO', 'Shutting down'); + } + this.transitionToState( + [ConnectivityState.CONNECTING, ConnectivityState.READY], + ConnectivityState.IDLE + ); + if (this.channelzEnabled) { + unregisterChannelzRef(this.channelzRef); + } + } } unrefIfOneRef(): boolean { @@ -816,83 +308,26 @@ export class Subchannel { } createCall(metadata: Metadata, host: string, method: string, listener: SubchannelCallInterceptingListener): SubchannelCall { - const headers = metadata.toHttp2Headers(); - headers[HTTP2_HEADER_AUTHORITY] = host; - headers[HTTP2_HEADER_USER_AGENT] = this.userAgent; - headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc'; - headers[HTTP2_HEADER_METHOD] = 'POST'; - headers[HTTP2_HEADER_PATH] = method; - headers[HTTP2_HEADER_TE] = 'trailers'; - let http2Stream: http2.ClientHttp2Stream; - /* In theory, if an error is thrown by session.request because session has - * become unusable (e.g. because it has received a goaway), this subchannel - * should soon see the corresponding close or goaway event anyway and leave - * READY. But we have seen reports that this does not happen - * (https://github.com/googleapis/nodejs-firestore/issues/1023#issuecomment-653204096) - * so for defense in depth, we just discard the session when we see an - * error here. - */ - try { - http2Stream = this.session!.request(headers); - } catch (e) { - this.transitionToState( - [ConnectivityState.READY], - ConnectivityState.TRANSIENT_FAILURE - ); - throw e; + if (!this.transport) { + throw new Error('Cannot create call, subchannel not READY'); } - this.flowControlTrace( - 'local window size: ' + - this.session!.state.localWindowSize + - ' remote window size: ' + - this.session!.state.remoteWindowSize - ); - const streamSession = this.session; - this.internalsTrace( - 'session.closed=' + - streamSession!.closed + - ' session.destroyed=' + - streamSession!.destroyed + - ' session.socket.destroyed=' + - streamSession!.socket.destroyed); - let statsTracker: SubchannelCallStatsTracker; + let statsTracker: Partial; if (this.channelzEnabled) { this.callTracker.addCallStarted(); this.streamTracker.addCallStarted(); statsTracker = { - addMessageSent: () => { - this.messagesSent += 1; - this.lastMessageSentTimestamp = new Date(); - }, - addMessageReceived: () => { - this.messagesReceived += 1; - }, onCallEnd: status => { if (status.code === Status.OK) { this.callTracker.addCallSucceeded(); } else { this.callTracker.addCallFailed(); } - }, - onStreamEnd: success => { - if (streamSession === this.session) { - if (success) { - this.streamTracker.addCallSucceeded(); - } else { - this.streamTracker.addCallFailed(); - } - } } } } else { - statsTracker = { - addMessageSent: () => {}, - addMessageReceived: () => {}, - onCallEnd: () => {}, - onStreamEnd: () => {} - } + statsTracker = {}; } - return new Http2SubchannelCall(http2Stream, statsTracker, listener, this, getNextCallNumber()); + return this.transport.createCall(metadata, host, method, listener, statsTracker); } /** @@ -946,14 +381,6 @@ export class Subchannel { } } - addDisconnectListener(listener: () => void) { - this.disconnectListeners.add(listener); - } - - removeDisconnectListener(listener: () => void) { - this.disconnectListeners.delete(listener); - } - /** * Reset the backoff timeout, and immediately start connecting if in backoff. */ diff --git a/packages/grpc-js/src/transport.ts b/packages/grpc-js/src/transport.ts new file mode 100644 index 000000000..e713ed6ef --- /dev/null +++ b/packages/grpc-js/src/transport.ts @@ -0,0 +1,634 @@ +/* + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import * as http2 from 'http2'; +import { checkServerIdentity, CipherNameAndProtocol, ConnectionOptions, PeerCertificate, TLSSocket } from 'tls'; +import { StatusObject } from './call-interface'; +import { ChannelCredentials } from './channel-credentials'; +import { ChannelOptions } from './channel-options'; +import { ChannelzCallTracker, registerChannelzSocket, SocketInfo, SocketRef, TlsInfo } from './channelz'; +import { LogVerbosity } from './constants'; +import { getProxiedConnection, ProxyConnectionResult } from './http_proxy'; +import * as logging from './logging'; +import { getDefaultAuthority } from './resolver'; +import { stringToSubchannelAddress, SubchannelAddress, subchannelAddressToString } from './subchannel-address'; +import { GrpcUri, parseUri, splitHostPort, uriToString } from './uri-parser'; +import * as net from 'net'; +import { Http2SubchannelCall, SubchannelCall, SubchannelCallInterceptingListener } from './subchannel-call'; +import { Metadata } from './metadata'; +import { getNextCallNumber } from './call-number'; + +const TRACER_NAME = 'transport'; +const FLOW_CONTROL_TRACER_NAME = 'transport_flowctrl'; + +const clientVersion = require('../../package.json').version; + +const { + HTTP2_HEADER_AUTHORITY, + HTTP2_HEADER_CONTENT_TYPE, + HTTP2_HEADER_METHOD, + HTTP2_HEADER_PATH, + HTTP2_HEADER_TE, + HTTP2_HEADER_USER_AGENT, +} = http2.constants; + +/* setInterval and setTimeout only accept signed 32 bit integers. JS doesn't + * have a constant for the max signed 32 bit integer, so this is a simple way + * to calculate it */ +const KEEPALIVE_MAX_TIME_MS = ~(1 << 31); +const KEEPALIVE_TIMEOUT_MS = 20000; + +export interface CallEventTracker { + addMessageSent(): void; + addMessageReceived(): void; + onCallEnd(status: StatusObject): void; + onStreamEnd(success: boolean): void; +} + +export interface TransportDisconnectListener { + (tooManyPings: boolean): void; +} + +export interface Transport { + getChannelzRef(): SocketRef; + createCall(metadata: Metadata, host: string, method: string, listener: SubchannelCallInterceptingListener, subchannelCallStatsTracker: Partial): SubchannelCall; + addDisconnectListener(listener: TransportDisconnectListener): void; + shutdown(): void; +} + +const tooManyPingsData: Buffer = Buffer.from('too_many_pings', 'ascii'); + +class Http2Transport implements Transport { + /** + * The amount of time in between sending pings + */ + private keepaliveTimeMs: number = KEEPALIVE_MAX_TIME_MS; + /** + * The amount of time to wait for an acknowledgement after sending a ping + */ + private keepaliveTimeoutMs: number = KEEPALIVE_TIMEOUT_MS; + /** + * Timer reference for timeout that indicates when to send the next ping + */ + private keepaliveIntervalId: NodeJS.Timer; + /** + * Timer reference tracking when the most recent ping will be considered lost + */ + private keepaliveTimeoutId: NodeJS.Timer | null = null; + /** + * Indicates whether keepalive pings should be sent without any active calls + */ + private keepaliveWithoutCalls = false; + + private userAgent: string; + + private activeCalls: Set = new Set(); + + private subchannelAddressString: string; + + private disconnectListeners: TransportDisconnectListener[] = []; + + private disconnectHandled = false; + + // Channelz info + private channelzRef: SocketRef; + private readonly channelzEnabled: boolean = true; + /** + * Name of the remote server, if it is not the same as the subchannel + * address, i.e. if connecting through an HTTP CONNECT proxy. + */ + private remoteName: string | null = null; + private streamTracker = new ChannelzCallTracker(); + private keepalivesSent = 0; + private messagesSent = 0; + private messagesReceived = 0; + private lastMessageSentTimestamp: Date | null = null; + private lastMessageReceivedTimestamp: Date | null = null; + + constructor( + private session: http2.ClientHttp2Session, + subchannelAddress: SubchannelAddress, + options: ChannelOptions + ) { + // Build user-agent string. + this.userAgent = [ + options['grpc.primary_user_agent'], + `grpc-node-js/${clientVersion}`, + options['grpc.secondary_user_agent'], + ] + .filter((e) => e) + .join(' '); // remove falsey values first + + if ('grpc.keepalive_time_ms' in options) { + this.keepaliveTimeMs = options['grpc.keepalive_time_ms']!; + } + if ('grpc.keepalive_timeout_ms' in options) { + this.keepaliveTimeoutMs = options['grpc.keepalive_timeout_ms']!; + } + if ('grpc.keepalive_permit_without_calls' in options) { + this.keepaliveWithoutCalls = + options['grpc.keepalive_permit_without_calls'] === 1; + } else { + this.keepaliveWithoutCalls = false; + } + this.keepaliveIntervalId = setTimeout(() => {}, 0); + clearTimeout(this.keepaliveIntervalId); + if (this.keepaliveWithoutCalls) { + this.startKeepalivePings(); + } + + this.subchannelAddressString = subchannelAddressToString(subchannelAddress); + + if (options['grpc.enable_channelz'] === 0) { + this.channelzEnabled = false; + } + this.channelzRef = registerChannelzSocket(this.subchannelAddressString, () => this.getChannelzInfo(), this.channelzEnabled); + + session.once('close', () => { + this.trace('session closed'); + this.stopKeepalivePings(); + this.handleDisconnect(false); + }); + session.once('goaway', (errorCode: number, lastStreamID: number, opaqueData: Buffer) => { + let tooManyPings = false; + /* See the last paragraph of + * https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md#basic-keepalive */ + if ( + errorCode === http2.constants.NGHTTP2_ENHANCE_YOUR_CALM && + opaqueData.equals(tooManyPingsData) + ) { + tooManyPings = true; + } + this.trace( + 'connection closed by GOAWAY with code ' + + errorCode + ); + this.handleDisconnect(tooManyPings); + }); + session.once('error', error => { + /* Do nothing here. Any error should also trigger a close event, which is + * where we want to handle that. */ + this.trace( + 'connection closed with error ' + + (error as Error).message + ); + }); + if (logging.isTracerEnabled(TRACER_NAME)) { + session.on('remoteSettings', (settings: http2.Settings) => { + this.trace( + 'new settings received' + + (this.session !== session ? ' on the old connection' : '') + + ': ' + + JSON.stringify(settings) + ); + }); + session.on('localSettings', (settings: http2.Settings) => { + this.trace( + 'local settings acknowledged by remote' + + (this.session !== session ? ' on the old connection' : '') + + ': ' + + JSON.stringify(settings) + ); + }); + } + } + + private getChannelzInfo(): SocketInfo { + const sessionSocket = this.session.socket; + const remoteAddress = sessionSocket.remoteAddress ? stringToSubchannelAddress(sessionSocket.remoteAddress, sessionSocket.remotePort) : null; + const localAddress = sessionSocket.localAddress ? stringToSubchannelAddress(sessionSocket.localAddress, sessionSocket.localPort) : null; + let tlsInfo: TlsInfo | null; + if (this.session.encrypted) { + const tlsSocket: TLSSocket = sessionSocket as TLSSocket; + const cipherInfo: CipherNameAndProtocol & {standardName?: string} = tlsSocket.getCipher(); + const certificate = tlsSocket.getCertificate(); + const peerCertificate = tlsSocket.getPeerCertificate(); + tlsInfo = { + cipherSuiteStandardName: cipherInfo.standardName ?? null, + cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name, + localCertificate: (certificate && 'raw' in certificate) ? certificate.raw : null, + remoteCertificate: (peerCertificate && 'raw' in peerCertificate) ? peerCertificate.raw : null + }; + } else { + tlsInfo = null; + } + const socketInfo: SocketInfo = { + remoteAddress: remoteAddress, + localAddress: localAddress, + security: tlsInfo, + remoteName: this.remoteName, + streamsStarted: this.streamTracker.callsStarted, + streamsSucceeded: this.streamTracker.callsSucceeded, + streamsFailed: this.streamTracker.callsFailed, + messagesSent: this.messagesSent, + messagesReceived: this.messagesReceived, + keepAlivesSent: this.keepalivesSent, + lastLocalStreamCreatedTimestamp: this.streamTracker.lastCallStartedTimestamp, + lastRemoteStreamCreatedTimestamp: null, + lastMessageSentTimestamp: this.lastMessageSentTimestamp, + lastMessageReceivedTimestamp: this.lastMessageReceivedTimestamp, + localFlowControlWindow: this.session.state.localWindowSize ?? null, + remoteFlowControlWindow: this.session.state.remoteWindowSize ?? null + }; + return socketInfo; + } + + private trace(text: string): void { + logging.trace(LogVerbosity.DEBUG, TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); + } + + private keepaliveTrace(text: string): void { + logging.trace(LogVerbosity.DEBUG, 'keepalive', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); + } + + private flowControlTrace(text: string): void { + logging.trace(LogVerbosity.DEBUG, FLOW_CONTROL_TRACER_NAME, '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); + } + + private internalsTrace(text: string): void { + logging.trace(LogVerbosity.DEBUG, 'transport_internals', '(' + this.channelzRef.id + ') ' + this.subchannelAddressString + ' ' + text); + } + + private handleDisconnect(tooManyPings: boolean) { + if (this.disconnectHandled) { + return; + } + this.disconnectHandled = true; + this.disconnectListeners.forEach(listener => listener(tooManyPings)); + for (const call of this.activeCalls) { + call.onDisconnect(); + } + } + + addDisconnectListener(listener: TransportDisconnectListener): void { + this.disconnectListeners.push(listener); + } + + private clearKeepaliveTimeout() { + if (!this.keepaliveTimeoutId) { + return; + } + clearTimeout(this.keepaliveTimeoutId); + this.keepaliveTimeoutId = null; + } + + private sendPing() { + if (this.channelzEnabled) { + this.keepalivesSent += 1; + } + this.keepaliveTrace('Sending ping with timeout ' + this.keepaliveTimeoutMs + 'ms'); + if (!this.keepaliveTimeoutId) { + this.keepaliveTimeoutId = setTimeout(() => { + this.keepaliveTrace('Ping timeout passed without response'); + this.handleDisconnect(false); + }, this.keepaliveTimeoutMs); + this.keepaliveTimeoutId.unref?.(); + } + try { + this.session!.ping( + (err: Error | null, duration: number, payload: Buffer) => { + this.keepaliveTrace('Received ping response'); + this.clearKeepaliveTimeout(); + } + ); + } catch (e) { + /* If we fail to send a ping, the connection is no longer functional, so + * we should discard it. */ + this.handleDisconnect(false); + } + } + + private startKeepalivePings() { + this.keepaliveIntervalId = setInterval(() => { + this.sendPing(); + }, this.keepaliveTimeMs); + this.keepaliveIntervalId.unref?.(); + /* Don't send a ping immediately because whatever caused us to start + * sending pings should also involve some network activity. */ + } + + /** + * Stop keepalive pings when terminating a connection. This discards the + * outstanding ping timeout, so it should not be called if the same + * connection will still be used. + */ + private stopKeepalivePings() { + clearInterval(this.keepaliveIntervalId); + this.clearKeepaliveTimeout(); + } + + private removeActiveCall(call: Http2SubchannelCall) { + this.activeCalls.delete(call); + if (this.activeCalls.size === 0 && !this.keepaliveWithoutCalls) { + this.stopKeepalivePings(); + } + } + + private addActiveCall(call: Http2SubchannelCall) { + if (this.activeCalls.size === 0 && !this.keepaliveWithoutCalls) { + this.startKeepalivePings(); + } + this.activeCalls.add(call); + } + + createCall(metadata: Metadata, host: string, method: string, listener: SubchannelCallInterceptingListener, subchannelCallStatsTracker: Partial): Http2SubchannelCall { + const headers = metadata.toHttp2Headers(); + headers[HTTP2_HEADER_AUTHORITY] = host; + headers[HTTP2_HEADER_USER_AGENT] = this.userAgent; + headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc'; + headers[HTTP2_HEADER_METHOD] = 'POST'; + headers[HTTP2_HEADER_PATH] = method; + headers[HTTP2_HEADER_TE] = 'trailers'; + let http2Stream: http2.ClientHttp2Stream; + /* In theory, if an error is thrown by session.request because session has + * become unusable (e.g. because it has received a goaway), this subchannel + * should soon see the corresponding close or goaway event anyway and leave + * READY. But we have seen reports that this does not happen + * (https://github.com/googleapis/nodejs-firestore/issues/1023#issuecomment-653204096) + * so for defense in depth, we just discard the session when we see an + * error here. + */ + try { + http2Stream = this.session!.request(headers); + } catch (e) { + this.handleDisconnect(false); + throw e; + } + this.flowControlTrace( + 'local window size: ' + + this.session.state.localWindowSize + + ' remote window size: ' + + this.session.state.remoteWindowSize + ); + this.internalsTrace( + 'session.closed=' + + this.session.closed + + ' session.destroyed=' + + this.session.destroyed + + ' session.socket.destroyed=' + + this.session.socket.destroyed); + let eventTracker: CallEventTracker; + let call: Http2SubchannelCall; + if (this.channelzEnabled) { + this.streamTracker.addCallStarted(); + eventTracker = { + addMessageSent: () => { + this.messagesSent += 1; + this.lastMessageSentTimestamp = new Date(); + subchannelCallStatsTracker.addMessageSent?.(); + }, + addMessageReceived: () => { + this.messagesReceived += 1; + this.lastMessageReceivedTimestamp = new Date(); + subchannelCallStatsTracker.addMessageReceived?.(); + }, + onCallEnd: status => { + subchannelCallStatsTracker.onCallEnd?.(status); + }, + onStreamEnd: success => { + if (success) { + this.streamTracker.addCallSucceeded(); + } else { + this.streamTracker.addCallFailed(); + } + this.removeActiveCall(call); + subchannelCallStatsTracker.onStreamEnd?.(success); + } + } + } else { + eventTracker = { + addMessageSent: () => { + subchannelCallStatsTracker.addMessageSent?.(); + }, + addMessageReceived: () => { + subchannelCallStatsTracker.addMessageReceived?.(); + }, + onCallEnd: (status) => { + subchannelCallStatsTracker.onCallEnd?.(status); + this.removeActiveCall(call); + }, + onStreamEnd: (success) => { + subchannelCallStatsTracker.onStreamEnd?.(success); + } + } + } + call = new Http2SubchannelCall(http2Stream, eventTracker, listener, this.subchannelAddressString, getNextCallNumber()); + this.addActiveCall(call); + return call; + } + + getChannelzRef(): SocketRef { + return this.channelzRef; + } + + shutdown() { + this.session.close(); + } +} + +export interface SubchannelConnector { + connect(address: SubchannelAddress, credentials: ChannelCredentials, options: ChannelOptions): Promise; + shutdown(): void; +} + +export class Http2SubchannelConnector implements SubchannelConnector { + private session: http2.ClientHttp2Session | null = null; + private isShutdown = false; + constructor(private channelTarget: GrpcUri) {} + private trace(text: string) { + + } + private createSession(address: SubchannelAddress, credentials: ChannelCredentials, options: ChannelOptions, proxyConnectionResult: ProxyConnectionResult): Promise { + if (this.isShutdown) { + return Promise.reject(); + } + return new Promise((resolve, reject) => { + let remoteName: string | null; + if (proxyConnectionResult.realTarget) { + remoteName = uriToString(proxyConnectionResult.realTarget); + this.trace('creating HTTP/2 session through proxy to ' + uriToString(proxyConnectionResult.realTarget)); + } else { + remoteName = null; + this.trace('creating HTTP/2 session to ' + subchannelAddressToString(address)); + } + const targetAuthority = getDefaultAuthority( + proxyConnectionResult.realTarget ?? this.channelTarget + ); + let connectionOptions: http2.SecureClientSessionOptions = + credentials._getConnectionOptions() || {}; + connectionOptions.maxSendHeaderBlockLength = Number.MAX_SAFE_INTEGER; + if ('grpc-node.max_session_memory' in options) { + connectionOptions.maxSessionMemory = options[ + 'grpc-node.max_session_memory' + ]; + } else { + /* By default, set a very large max session memory limit, to effectively + * disable enforcement of the limit. Some testing indicates that Node's + * behavior degrades badly when this limit is reached, so we solve that + * by disabling the check entirely. */ + connectionOptions.maxSessionMemory = Number.MAX_SAFE_INTEGER; + } + let addressScheme = 'http://'; + if ('secureContext' in connectionOptions) { + addressScheme = 'https://'; + // If provided, the value of grpc.ssl_target_name_override should be used + // to override the target hostname when checking server identity. + // This option is used for testing only. + if (options['grpc.ssl_target_name_override']) { + const sslTargetNameOverride = options[ + 'grpc.ssl_target_name_override' + ]!; + connectionOptions.checkServerIdentity = ( + host: string, + cert: PeerCertificate + ): Error | undefined => { + return checkServerIdentity(sslTargetNameOverride, cert); + }; + connectionOptions.servername = sslTargetNameOverride; + } else { + const authorityHostname = + splitHostPort(targetAuthority)?.host ?? 'localhost'; + // We want to always set servername to support SNI + connectionOptions.servername = authorityHostname; + } + if (proxyConnectionResult.socket) { + /* This is part of the workaround for + * https://github.com/nodejs/node/issues/32922. Without that bug, + * proxyConnectionResult.socket would always be a plaintext socket and + * this would say + * connectionOptions.socket = proxyConnectionResult.socket; */ + connectionOptions.createConnection = (authority, option) => { + return proxyConnectionResult.socket!; + }; + } + } else { + /* In all but the most recent versions of Node, http2.connect does not use + * the options when establishing plaintext connections, so we need to + * establish that connection explicitly. */ + connectionOptions.createConnection = (authority, option) => { + if (proxyConnectionResult.socket) { + return proxyConnectionResult.socket; + } else { + /* net.NetConnectOpts is declared in a way that is more restrictive + * than what net.connect will actually accept, so we use the type + * assertion to work around that. */ + return net.connect(address); + } + }; + } + + connectionOptions = { + ...connectionOptions, + ...address, + }; + + /* http2.connect uses the options here: + * https://github.com/nodejs/node/blob/70c32a6d190e2b5d7b9ff9d5b6a459d14e8b7d59/lib/internal/http2/core.js#L3028-L3036 + * The spread operator overides earlier values with later ones, so any port + * or host values in the options will be used rather than any values extracted + * from the first argument. In addition, the path overrides the host and port, + * as documented for plaintext connections here: + * https://nodejs.org/api/net.html#net_socket_connect_options_connectlistener + * and for TLS connections here: + * https://nodejs.org/api/tls.html#tls_tls_connect_options_callback. In + * earlier versions of Node, http2.connect passes these options to + * tls.connect but not net.connect, so in the insecure case we still need + * to set the createConnection option above to create the connection + * explicitly. We cannot do that in the TLS case because http2.connect + * passes necessary additional options to tls.connect. + * The first argument just needs to be parseable as a URL and the scheme + * determines whether the connection will be established over TLS or not. + */ + const session = http2.connect( + addressScheme + targetAuthority, + connectionOptions + ); + this.session = session; + session.unref(); + session.once('connect', () => { + session.removeAllListeners(); + resolve(new Http2Transport(session, address, options)); + this.session = null; + }); + session.once('close', () => { + this.session = null; + reject(); + }); + session.once('error', error => { + this.trace('connection failed with error ' + (error as Error).message) + }); + }); + } + connect(address: SubchannelAddress, credentials: ChannelCredentials, options: ChannelOptions): Promise { + if (this.isShutdown) { + return Promise.reject(); + } + /* Pass connection options through to the proxy so that it's able to + * upgrade it's connection to support tls if needed. + * This is a workaround for https://github.com/nodejs/node/issues/32922 + * See https://github.com/grpc/grpc-node/pull/1369 for more info. */ + const connectionOptions: ConnectionOptions = + credentials._getConnectionOptions() || {}; + + if ('secureContext' in connectionOptions) { + connectionOptions.ALPNProtocols = ['h2']; + // If provided, the value of grpc.ssl_target_name_override should be used + // to override the target hostname when checking server identity. + // This option is used for testing only. + if (options['grpc.ssl_target_name_override']) { + const sslTargetNameOverride = options[ + 'grpc.ssl_target_name_override' + ]!; + connectionOptions.checkServerIdentity = ( + host: string, + cert: PeerCertificate + ): Error | undefined => { + return checkServerIdentity(sslTargetNameOverride, cert); + }; + connectionOptions.servername = sslTargetNameOverride; + } else { + if ('grpc.http_connect_target' in options) { + /* This is more or less how servername will be set in createSession + * if a connection is successfully established through the proxy. + * If the proxy is not used, these connectionOptions are discarded + * anyway */ + const targetPath = getDefaultAuthority( + parseUri(options['grpc.http_connect_target'] as string) ?? { + path: 'localhost', + } + ); + const hostPort = splitHostPort(targetPath); + connectionOptions.servername = hostPort?.host ?? targetPath; + } + } + } + + return getProxiedConnection( + address, + options, + connectionOptions + ).then( + result => this.createSession(address, credentials, options, result) + ); + } + + shutdown(): void { + this.isShutdown = true; + this.session?.close(); + this.session = null; + } +} \ No newline at end of file