Skip to content

Commit

Permalink
grpc-js: Implement channel idle timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
murgatroid99 committed Jun 20, 2023
1 parent dbaaa89 commit fcff72b
Show file tree
Hide file tree
Showing 9 changed files with 292 additions and 29 deletions.
1 change: 1 addition & 0 deletions packages/grpc-js/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ Many channel arguments supported in `grpc` are not supported in `@grpc/grpc-js`.
- `grpc.per_rpc_retry_buffer_size`
- `grpc.retry_buffer_size`
- `grpc.service_config_disable_resolution`
- `grpc.client_idle_timeout_ms`
- `grpc-node.max_session_memory`
- `channelOverride`
- `channelFactoryOverride`
Expand Down
2 changes: 2 additions & 0 deletions packages/grpc-js/src/channel-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export interface ChannelOptions {
'grpc.max_connection_age_grace_ms'?: number;
'grpc-node.max_session_memory'?: number;
'grpc.service_config_disable_resolution'?: number;
'grpc.client_idle_timeout_ms'?: number;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
[key: string]: any;
}
Expand Down Expand Up @@ -89,6 +90,7 @@ export const recognizedOptions = {
'grpc.max_connection_age_grace_ms': true,
'grpc-node.max_session_memory': true,
'grpc.service_config_disable_resolution': true,
'grpc.client_idle_timeout_ms': true
};

export function channelOptionsEqual(
Expand Down
92 changes: 68 additions & 24 deletions packages/grpc-js/src/internal-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { ChannelOptions } from './channel-options';
import { ResolvingLoadBalancer } from './resolving-load-balancer';
import { SubchannelPool, getSubchannelPool } from './subchannel-pool';
import { ChannelControlHelper } from './load-balancer';
import { UnavailablePicker, Picker, PickResultType } from './picker';
import { UnavailablePicker, Picker, PickResultType, QueuePicker } from './picker';
import { Metadata } from './metadata';
import { Status, LogVerbosity, Propagate } from './constants';
import { FilterStackFactory } from './filter-stack';
Expand Down Expand Up @@ -85,6 +85,11 @@ import {
*/
const MAX_TIMEOUT_TIME = 2147483647;

const MIN_IDLE_TIMEOUT_MS = 1000;

// 30 minutes
const DEFAULT_IDLE_TIMEOUT_MS = 30 * 60 * 1000;

interface ConnectivityStateWatcher {
currentState: ConnectivityState;
timer: NodeJS.Timeout | null;
Expand Down Expand Up @@ -153,8 +158,8 @@ class ChannelSubchannelWrapper
}

export class InternalChannel {
private resolvingLoadBalancer: ResolvingLoadBalancer;
private subchannelPool: SubchannelPool;
private readonly resolvingLoadBalancer: ResolvingLoadBalancer;
private readonly subchannelPool: SubchannelPool;
private connectivityState: ConnectivityState = ConnectivityState.IDLE;
private currentPicker: Picker = new UnavailablePicker();
/**
Expand All @@ -164,17 +169,17 @@ export class InternalChannel {
private configSelectionQueue: ResolvingCall[] = [];
private pickQueue: LoadBalancingCall[] = [];
private connectivityStateWatchers: ConnectivityStateWatcher[] = [];
private defaultAuthority: string;
private filterStackFactory: FilterStackFactory;
private target: GrpcUri;
private readonly defaultAuthority: string;
private readonly filterStackFactory: FilterStackFactory;
private readonly target: GrpcUri;
/**
* This timer does not do anything on its own. Its purpose is to hold the
* event loop open while there are any pending calls for the channel that
* have not yet been assigned to specific subchannels. In other words,
* the invariant is that callRefTimer is reffed if and only if pickQueue
* is non-empty.
*/
private callRefTimer: NodeJS.Timer;
private readonly callRefTimer: NodeJS.Timer;
private configSelector: ConfigSelector | null = null;
/**
* This is the error from the name resolver if it failed most recently. It
Expand All @@ -184,17 +189,21 @@ export class InternalChannel {
* than TRANSIENT_FAILURE.
*/
private currentResolutionError: StatusObject | null = null;
private retryBufferTracker: MessageBufferTracker;
private readonly retryBufferTracker: MessageBufferTracker;
private keepaliveTime: number;
private wrappedSubchannels: Set<ChannelSubchannelWrapper> = new Set();
private readonly wrappedSubchannels: Set<ChannelSubchannelWrapper> = new Set();

private callCount: number = 0;
private idleTimer: NodeJS.Timer | null = null;
private readonly idleTimeoutMs: number;

// Channelz info
private readonly channelzEnabled: boolean = true;
private originalTarget: string;
private channelzRef: ChannelRef;
private channelzTrace: ChannelzTrace;
private callTracker = new ChannelzCallTracker();
private childrenTracker = new ChannelzChildrenTracker();
private readonly originalTarget: string;
private readonly channelzRef: ChannelRef;
private readonly channelzTrace: ChannelzTrace;
private readonly callTracker = new ChannelzCallTracker();
private readonly childrenTracker = new ChannelzChildrenTracker();

constructor(
target: string,
Expand Down Expand Up @@ -265,6 +274,7 @@ export class InternalChannel {
DEFAULT_PER_RPC_RETRY_BUFFER_SIZE_BYTES
);
this.keepaliveTime = options['grpc.keepalive_time_ms'] ?? -1;
this.idleTimeoutMs = Math.max(options['grpc.client_idle_timeout_ms'] ?? DEFAULT_IDLE_TIMEOUT_MS, MIN_IDLE_TIMEOUT_MS);
const channelControlHelper: ChannelControlHelper = {
createSubchannel: (
subchannelAddress: SubchannelAddress,
Expand Down Expand Up @@ -548,6 +558,45 @@ export class InternalChannel {
this.callRefTimerRef();
}

private enterIdle() {
this.resolvingLoadBalancer.destroy();
this.updateState(ConnectivityState.IDLE);
this.currentPicker = new QueuePicker(this.resolvingLoadBalancer);
}

private maybeStartIdleTimer() {
if (this.callCount === 0) {
this.idleTimer = setTimeout(() => {
this.trace('Idle timer triggered after ' + this.idleTimeoutMs + 'ms of inactivity');
this.enterIdle();
}, this.idleTimeoutMs);
this.idleTimer.unref?.();
}
}

private onCallStart() {
if (this.channelzEnabled) {
this.callTracker.addCallStarted();
}
this.callCount += 1;
if (this.idleTimer) {
clearTimeout(this.idleTimer);
this.idleTimer = null;
}
}

private onCallEnd(status: StatusObject) {
if (this.channelzEnabled) {
if (status.code === Status.OK) {
this.callTracker.addCallSucceeded();
} else {
this.callTracker.addCallFailed();
}
}
this.callCount -= 1;
this.maybeStartIdleTimer();
}

createLoadBalancingCall(
callConfig: CallConfig,
method: string,
Expand Down Expand Up @@ -653,16 +702,10 @@ export class InternalChannel {
callNumber
);

if (this.channelzEnabled) {
this.callTracker.addCallStarted();
call.addStatusWatcher(status => {
if (status.code === Status.OK) {
this.callTracker.addCallSucceeded();
} else {
this.callTracker.addCallFailed();
}
});
}
this.onCallStart();
call.addStatusWatcher(status => {
this.onCallEnd(status);
});
return call;
}

Expand All @@ -685,6 +728,7 @@ export class InternalChannel {
const connectivityState = this.connectivityState;
if (tryToConnect) {
this.resolvingLoadBalancer.exitIdle();
this.maybeStartIdleTimer();
}
return connectivityState;
}
Expand Down
4 changes: 4 additions & 0 deletions packages/grpc-js/src/load-balancer-child-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ export class ChildLoadBalancerHandler implements LoadBalancer {
}
}
destroy(): void {
/* Note: state updates are only propagated from the child balancer if that
* object is equal to this.currentChild or this.pendingChild. Since this
* function sets both of those to null, no further state updates will
* occur after this function returns. */
if (this.currentChild) {
this.currentChild.destroy();
this.currentChild = null;
Expand Down
20 changes: 20 additions & 0 deletions packages/grpc-js/src/resolver-dns.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ class DnsResolver implements Resolver {
this.pendingLookupPromise = dnsLookupPromise(hostname, { all: true });
this.pendingLookupPromise.then(
addressList => {
if (this.pendingLookupPromise === null) {
return;
}
this.pendingLookupPromise = null;
this.backoff.reset();
this.backoff.stop();
Expand Down Expand Up @@ -248,6 +251,9 @@ class DnsResolver implements Resolver {
);
},
err => {
if (this.pendingLookupPromise === null) {
return;
}
trace(
'Resolution error for target ' +
uriToString(this.target) +
Expand All @@ -268,6 +274,9 @@ class DnsResolver implements Resolver {
this.pendingTxtPromise = resolveTxtPromise(hostname);
this.pendingTxtPromise.then(
txtRecord => {
if (this.pendingTxtPromise === null) {
return;
}
this.pendingTxtPromise = null;
try {
this.latestServiceConfig = extractAndSelectServiceConfig(
Expand Down Expand Up @@ -348,10 +357,21 @@ class DnsResolver implements Resolver {
}
}

/**
* Reset the resolver to the same state it had when it was created. In-flight
* DNS requests cannot be cancelled, but they are discarded and their results
* will be ignored.
*/
destroy() {
this.continueResolving = false;
this.backoff.reset();
this.backoff.stop();
this.stopNextResolutionTimer();
this.pendingLookupPromise = null;
this.pendingTxtPromise = null;
this.latestLookupResult = null;
this.latestServiceConfig = null;
this.latestServiceConfigError = null;
}

/**
Expand Down
5 changes: 4 additions & 1 deletion packages/grpc-js/src/resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ export interface Resolver {
updateResolution(): void;

/**
* Destroy the resolver. Should be called when the owning channel shuts down.
* Discard all resources owned by the resolver. A later call to
* `updateResolution` should reinitialize those resources. No
* `ResolverListener` callbacks should be called after `destroy` is called
* until `updateResolution` is called again.
*/
destroy(): void;
}
Expand Down
12 changes: 9 additions & 3 deletions packages/grpc-js/src/resolving-load-balancer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ export class ResolvingLoadBalancer implements LoadBalancer {
/**
* The resolver class constructed for the target address.
*/
private innerResolver: Resolver;
private readonly innerResolver: Resolver;

private childLoadBalancer: ChildLoadBalancerHandler;
private readonly childLoadBalancer: ChildLoadBalancerHandler;
private latestChildState: ConnectivityState = ConnectivityState.IDLE;
private latestChildPicker: Picker = new QueuePicker(this);
/**
Expand Down Expand Up @@ -324,7 +324,13 @@ export class ResolvingLoadBalancer implements LoadBalancer {
destroy() {
this.childLoadBalancer.destroy();
this.innerResolver.destroy();
this.updateState(ConnectivityState.SHUTDOWN, new UnavailablePicker());
this.backoffTimeout.reset();
this.backoffTimeout.stop();
this.latestChildState = ConnectivityState.IDLE;
this.latestChildPicker = new QueuePicker(this);
this.currentState = ConnectivityState.IDLE;
this.previousServiceConfig = null;
this.continueResolving = false;
}

getTypeName() {
Expand Down
90 changes: 89 additions & 1 deletion packages/grpc-js/test/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

import * as loader from '@grpc/proto-loader';
import * as assert2 from './assert2';
import * as path from 'path';
import * as grpc from '../src';

import { GrpcObject, loadPackageDefinition } from '../src/make-client';
import { GrpcObject, ServiceClientConstructor, ServiceClient, loadPackageDefinition } from '../src/make-client';
import { readFileSync } from 'fs';

const protoLoaderOptions = {
keepCase: true,
Expand All @@ -37,4 +40,89 @@ export function loadProtoFile(file: string): GrpcObject {
return loadPackageDefinition(packageDefinition);
}

const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
const echoService = loadProtoFile(protoFile)
.EchoService as ServiceClientConstructor;

const ca = readFileSync(path.join(__dirname, 'fixtures', 'ca.pem'));
const key = readFileSync(path.join(__dirname, 'fixtures', 'server1.key'));
const cert = readFileSync(path.join(__dirname, 'fixtures', 'server1.pem'));

const serviceImpl = {
echo: (
call: grpc.ServerUnaryCall<any, any>,
callback: grpc.sendUnaryData<any>
) => {
callback(null, call.request);
},
};

export class TestServer {
private server: grpc.Server;
public port: number | null = null;
constructor(public useTls: boolean, options?: grpc.ChannelOptions) {
this.server = new grpc.Server(options);
this.server.addService(echoService.service, serviceImpl);
}
start(): Promise<void> {
let credentials: grpc.ServerCredentials;
if (this.useTls) {
credentials = grpc.ServerCredentials.createSsl(null, [{private_key: key, cert_chain: cert}]);
} else {
credentials = grpc.ServerCredentials.createInsecure();
}
return new Promise<void>((resolve, reject) => {
this.server.bindAsync('localhost:0', credentials, (error, port) => {
if (error) {
reject(error);
return;
}
this.port = port;
this.server.start();
resolve();
});
});
}

shutdown() {
this.server.forceShutdown();
}
}

export class TestClient {
private client: ServiceClient;
constructor(port: number, useTls: boolean, options?: grpc.ChannelOptions) {
let credentials: grpc.ChannelCredentials;
if (useTls) {
credentials = grpc.credentials.createSsl(ca);
} else {
credentials = grpc.credentials.createInsecure();
}
this.client = new echoService(`localhost:${port}`, credentials, options);
}

static createFromServer(server: TestServer, options?: grpc.ChannelOptions) {
if (server.port === null) {
throw new Error('Cannot create client, server not started');
}
return new TestClient(server.port, server.useTls, options);
}

waitForReady(deadline: grpc.Deadline, callback: (error?: Error) => void) {
this.client.waitForReady(deadline, callback);
}

sendRequest(callback: (error: grpc.ServiceError) => void) {
this.client.echo({}, callback);
}

getChannelState() {
return this.client.getChannel().getConnectivityState(false);
}

close() {
this.client.close();
}
}

export { assert2 };

0 comments on commit fcff72b

Please sign in to comment.