From 3a161874513653090b3f211e5cc70d1b813b2c08 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 14 Nov 2023 14:37:13 -0800 Subject: [PATCH] grpc-js: Implement server drain method --- packages/grpc-js/src/server.ts | 44 +++++++++++++++++++++ packages/grpc-js/test/test-server.ts | 58 ++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+) diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 8ff0d0f22..9bd71fd42 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -853,6 +853,50 @@ export class Server { } } + /** + * Gracefully close all connections associated with a previously bound port. + * After the grace time, forcefully close all remaining open connections. + * + * If port 0 was bound, only the actual bound port can be + * drained. For example, if bindAsync was called with "localhost:0" and the + * bound port result was 54321, it can be drained as "localhost:54321". + * @param port + * @param graceTimeMs + * @returns + */ + drain(port: string, graceTimeMs: number): void { + this.trace('drain port=' + port + ' graceTimeMs=' + graceTimeMs); + const portUri = this.normalizePort(port); + const splitPort = splitHostPort(portUri.path); + if (splitPort?.port === 0) { + throw new Error('Cannot drain port 0'); + } + const boundPortObject = this.boundPorts.get(uriToString(portUri)); + if (!boundPortObject) { + return; + } + const allSessions: Set = new Set(); + for (const http2Server of boundPortObject.listeningServers) { + const serverEntry = this.http2Servers.get(http2Server); + if (!serverEntry) { + continue; + } + for (const session of serverEntry.sessions) { + allSessions.add(session); + this.closeSession(session, () => { + allSessions.delete(session); + }); + } + } + /* After the grace time ends, send another goaway to all remaining sessions + * with the CANCEL code. */ + setTimeout(() => { + for (const session of allSessions) { + session.destroy(http2.constants.NGHTTP2_CANCEL as any); + } + }, graceTimeMs).unref?.(); + } + forceShutdown(): void { for (const boundPortObject of this.boundPorts.values()) { boundPortObject.cancelled = true; diff --git a/packages/grpc-js/test/test-server.ts b/packages/grpc-js/test/test-server.ts index 56388a868..c497b8e27 100644 --- a/packages/grpc-js/test/test-server.ts +++ b/packages/grpc-js/test/test-server.ts @@ -228,6 +228,64 @@ describe('Server', () => { }); }); + describe.only('drain', () => { + let client: ServiceClient; + let portNumber: number; + const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto'); + const echoService = loadProtoFile(protoFile) + .EchoService as ServiceClientConstructor; + + const serviceImplementation = { + echo(call: ServerUnaryCall, callback: sendUnaryData) { + callback(null, call.request); + }, + echoBidiStream(call: ServerDuplexStream) { + call.on('data', data => { + call.write(data); + }); + call.on('end', () => { + call.end(); + }); + }, + }; + + beforeEach(done => { + server.addService(echoService.service, serviceImplementation); + + server.bindAsync( + 'localhost:0', + ServerCredentials.createInsecure(), + (err, port) => { + assert.ifError(err); + portNumber = port; + client = new echoService( + `localhost:${port}`, + grpc.credentials.createInsecure() + ); + server.start(); + done(); + } + ); + }); + + afterEach(done => { + client.close(); + server.tryShutdown(done); + }); + + it('Should cancel open calls after the grace period ends', done => { + const call = client.echoBidiStream(); + call.on('error', (error: ServiceError) => { + assert.strictEqual(error.code, grpc.status.CANCELLED); + done(); + }); + call.on('data', () => { + server.drain(`localhost:${portNumber!}`, 100); + }); + call.write({value: 'abc'}); + }); + }); + describe('start', () => { let server: Server;