Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc-js: Add Server#createConnectionInjector API #2675

Merged
merged 1 commit into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
71 changes: 71 additions & 0 deletions packages/grpc-js/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ import { ServerInterceptingCallInterface, ServerInterceptor, getServerIntercepti
import { PartialStatusObject } from './call-interface';
import { CallEventTracker } from './transport';
import { Socket } from 'net';
import { Duplex } from 'stream';

const UNLIMITED_CONNECTION_AGE_MS = ~(1 << 31);
const KEEPALIVE_MAX_TIME_MS = ~(1 << 31);
Expand Down Expand Up @@ -225,6 +226,12 @@ export interface ServerOptions extends ChannelOptions {
interceptors?: ServerInterceptor[]
}

export interface ConnectionInjector {
injectConnection(connection: Duplex): void;
drain(graceTimeMs: number): void;
destroy(): void;
}

export class Server {
private boundPorts: Map<string, BoundPort>= new Map();
private http2Servers: Map<AnyHttp2Server, Http2ServerInfo> = new Map();
Expand Down Expand Up @@ -808,6 +815,70 @@ export class Server {
}
}

private registerInjectorToChannelz() {
return registerChannelzSocket(
'injector',
() => {
return {
localAddress: null,
remoteAddress: null,
security: null,
remoteName: null,
streamsStarted: 0,
streamsSucceeded: 0,
streamsFailed: 0,
messagesSent: 0,
messagesReceived: 0,
keepAlivesSent: 0,
lastLocalStreamCreatedTimestamp: null,
lastRemoteStreamCreatedTimestamp: null,
lastMessageSentTimestamp: null,
lastMessageReceivedTimestamp: null,
localFlowControlWindow: null,
remoteFlowControlWindow: null,
};
},
this.channelzEnabled
);
}

createConnectionInjector(credentials: ServerCredentials): ConnectionInjector {
if (credentials === null || !(credentials instanceof ServerCredentials)) {
throw new TypeError('creds must be a ServerCredentials object');
}
const server = this.createHttp2Server(credentials);
const channelzRef = this.registerInjectorToChannelz();
if (this.channelzEnabled) {
this.listenerChildrenTracker.refChild(channelzRef);
}
const sessionsSet: Set<http2.ServerHttp2Session> = new Set();
this.http2Servers.set(server, {
channelzRef: channelzRef,
sessions: sessionsSet
});
return {
injectConnection: (connection: Duplex) => {
server.emit('connection', connection);
},
drain: (graceTimeMs: number) => {
for (const session of sessionsSet) {
this.closeSession(session);
}
setTimeout(() => {
for (const session of sessionsSet) {
session.destroy(http2.constants.NGHTTP2_CANCEL as any);
}
}, graceTimeMs).unref?.();
},
destroy: () => {
this.closeServer(server)
for (const session of sessionsSet) {
this.closeSession(session);
}
}
};
}

private closeServer(server: AnyHttp2Server, callback?: () => void) {
this.trace('Closing server with address ' + JSON.stringify(server.address()));
const serverInfo = this.http2Servers.get(server);
Expand Down
67 changes: 67 additions & 0 deletions packages/grpc-js/test/test-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import * as assert from 'assert';
import * as fs from 'fs';
import * as http2 from 'http2';
import * as path from 'path';
import * as net from 'net';
import * as protoLoader from '@grpc/proto-loader';

import * as grpc from '../src';
Expand Down Expand Up @@ -905,6 +906,72 @@ describe('Echo service', () => {
});
});

describe('Connection injector', () => {
let tcpServer: net.Server;
let server: Server;
let client: ServiceClient;
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
const echoService = loadProtoFile(protoFile)
.EchoService as ServiceClientConstructor;

const serviceImplementation = {
echo(call: ServerUnaryCall<any, any>, callback: sendUnaryData<any>) {
callback(null, call.request);
},
echoBidiStream(call: ServerDuplexStream<any, any>) {
call.on('data', data => {
call.write(data);
});
call.on('end', () => {
call.end();
});
},
};

before(done => {
server = new Server();
const creds = ServerCredentials.createSsl(
null,
[{ private_key: key, cert_chain: cert }]
);
const connectionInjector = server.createConnectionInjector(creds);
tcpServer = net.createServer(socket => {
connectionInjector.injectConnection(socket);
});
server.addService(echoService.service, serviceImplementation);
tcpServer.listen(0, 'localhost', () => {
const port = (tcpServer.address() as net.AddressInfo).port;
client = new echoService(
`localhost:${port}`,
grpc.credentials.createSsl(ca),
{
'grpc.ssl_target_name_override': 'foo.test.google.fr',
'grpc.default_authority': 'foo.test.google.fr'
}
);
done();
});
});

after(done => {
client.close();
tcpServer.close();
server.tryShutdown(done);
});

it('should respond to a request', done => {
client.echo(
{ value: 'test value', value2: 3 },
(error: ServiceError, response: any) => {
assert.ifError(error);
assert.deepStrictEqual(response, { value: 'test value', value2: 3 });
done();
}
);
});

})

describe('Generic client and server', () => {
function toString(val: any) {
return val.toString();
Expand Down