diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index c0deab872..b02ad4d6f 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -74,6 +74,7 @@ import { ServerInterceptingCallInterface, ServerInterceptor, getServerIntercepti import { PartialStatusObject } from './call-interface'; import { CallEventTracker } from './transport'; import { Socket } from 'net'; +import { Duplex } from 'stream'; const UNLIMITED_CONNECTION_AGE_MS = ~(1 << 31); const KEEPALIVE_MAX_TIME_MS = ~(1 << 31); @@ -225,6 +226,12 @@ export interface ServerOptions extends ChannelOptions { interceptors?: ServerInterceptor[] } +export interface ConnectionInjector { + injectConnection(connection: Duplex): void; + drain(graceTimeMs: number): void; + destroy(): void; +} + export class Server { private boundPorts: Map= new Map(); private http2Servers: Map = new Map(); @@ -808,6 +815,70 @@ export class Server { } } + private registerInjectorToChannelz() { + return registerChannelzSocket( + 'injector', + () => { + return { + localAddress: null, + remoteAddress: null, + security: null, + remoteName: null, + streamsStarted: 0, + streamsSucceeded: 0, + streamsFailed: 0, + messagesSent: 0, + messagesReceived: 0, + keepAlivesSent: 0, + lastLocalStreamCreatedTimestamp: null, + lastRemoteStreamCreatedTimestamp: null, + lastMessageSentTimestamp: null, + lastMessageReceivedTimestamp: null, + localFlowControlWindow: null, + remoteFlowControlWindow: null, + }; + }, + this.channelzEnabled + ); + } + + createConnectionInjector(credentials: ServerCredentials): ConnectionInjector { + if (credentials === null || !(credentials instanceof ServerCredentials)) { + throw new TypeError('creds must be a ServerCredentials object'); + } + const server = this.createHttp2Server(credentials); + const channelzRef = this.registerInjectorToChannelz(); + if (this.channelzEnabled) { + this.listenerChildrenTracker.refChild(channelzRef); + } + const sessionsSet: Set = new Set(); + this.http2Servers.set(server, { + channelzRef: channelzRef, + sessions: sessionsSet + }); + return { + injectConnection: (connection: Duplex) => { + server.emit('connection', connection); + }, + drain: (graceTimeMs: number) => { + for (const session of sessionsSet) { + this.closeSession(session); + } + setTimeout(() => { + for (const session of sessionsSet) { + session.destroy(http2.constants.NGHTTP2_CANCEL as any); + } + }, graceTimeMs).unref?.(); + }, + destroy: () => { + this.closeServer(server) + for (const session of sessionsSet) { + this.closeSession(session); + } + } + }; + } + private closeServer(server: AnyHttp2Server, callback?: () => void) { this.trace('Closing server with address ' + JSON.stringify(server.address())); const serverInfo = this.http2Servers.get(server); diff --git a/packages/grpc-js/test/test-server.ts b/packages/grpc-js/test/test-server.ts index 1ec0b1baf..14d6b4141 100644 --- a/packages/grpc-js/test/test-server.ts +++ b/packages/grpc-js/test/test-server.ts @@ -21,6 +21,7 @@ import * as assert from 'assert'; import * as fs from 'fs'; import * as http2 from 'http2'; import * as path from 'path'; +import * as net from 'net'; import * as protoLoader from '@grpc/proto-loader'; import * as grpc from '../src'; @@ -905,6 +906,72 @@ describe('Echo service', () => { }); }); +describe('Connection injector', () => { + let tcpServer: net.Server; + let server: Server; + let client: ServiceClient; + const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto'); + const echoService = loadProtoFile(protoFile) + .EchoService as ServiceClientConstructor; + + const serviceImplementation = { + echo(call: ServerUnaryCall, callback: sendUnaryData) { + callback(null, call.request); + }, + echoBidiStream(call: ServerDuplexStream) { + call.on('data', data => { + call.write(data); + }); + call.on('end', () => { + call.end(); + }); + }, + }; + + before(done => { + server = new Server(); + const creds = ServerCredentials.createSsl( + null, + [{ private_key: key, cert_chain: cert }] + ); + const connectionInjector = server.createConnectionInjector(creds); + tcpServer = net.createServer(socket => { + connectionInjector.injectConnection(socket); + }); + server.addService(echoService.service, serviceImplementation); + tcpServer.listen(0, 'localhost', () => { + const port = (tcpServer.address() as net.AddressInfo).port; + client = new echoService( + `localhost:${port}`, + grpc.credentials.createSsl(ca), + { + 'grpc.ssl_target_name_override': 'foo.test.google.fr', + 'grpc.default_authority': 'foo.test.google.fr' + } + ); + done(); + }); + }); + + after(done => { + client.close(); + tcpServer.close(); + server.tryShutdown(done); + }); + + it('should respond to a request', done => { + client.echo( + { value: 'test value', value2: 3 }, + (error: ServiceError, response: any) => { + assert.ifError(error); + assert.deepStrictEqual(response, { value: 'test value', value2: 3 }); + done(); + } + ); + }); + +}) + describe('Generic client and server', () => { function toString(val: any) { return val.toString();