From b7ce4b59a6f7df438b37ecf652c92aa219d3bf48 Mon Sep 17 00:00:00 2001 From: Konstantinos Kopanidis Date: Tue, 19 Dec 2023 15:11:31 +0200 Subject: [PATCH] fix(grpc-sdk): module connection not re-established after core shutdown (#858) * fix(grpc-sdk): module connection not re-established after core shutdown * chore(grpc-sdk): rm unused vars --------- Co-authored-by: Konstantinos Feretos --- .../grpc-sdk/src/classes/ConduitModule.ts | 13 +++++++-- libraries/grpc-sdk/src/index.ts | 28 +++++++++++++++---- .../grpc-sdk/src/modules/config/index.ts | 15 ++++++---- 3 files changed, 42 insertions(+), 14 deletions(-) diff --git a/libraries/grpc-sdk/src/classes/ConduitModule.ts b/libraries/grpc-sdk/src/classes/ConduitModule.ts index d84178eeb..9b6f9b39a 100644 --- a/libraries/grpc-sdk/src/classes/ConduitModule.ts +++ b/libraries/grpc-sdk/src/classes/ConduitModule.ts @@ -2,9 +2,12 @@ import { getGrpcSignedTokenInterceptor, getModuleNameInterceptor } from '../inte import { CompatServiceDefinition } from 'nice-grpc/lib/service-definitions'; import { Channel, Client, createChannel, createClientFactory } from 'nice-grpc'; import { retryMiddleware } from 'nice-grpc-client-middleware-retry'; -import { HealthCheckResponse, HealthDefinition } from '../protoUtils'; +import { + ConduitModuleDefinition, + HealthCheckResponse, + HealthDefinition, +} from '../protoUtils'; import { EventEmitter } from 'events'; -import { ConduitModuleDefinition } from '../protoUtils'; import ConduitGrpcSdk from '../index'; export class ConduitModule { @@ -58,7 +61,11 @@ export class ConduitModule { } openConnection() { - if (this.channel) return; + if (this.channel) { + // used to make sure a connection attempt is made + this.channel.getConnectivityState(true); + return; + } // ConduitGrpcSdk.Logger.log(`Opening connection for ${this._serviceName}`); this.channel = createChannel(this._serviceUrl, undefined, { 'grpc.max_receive_message_length': 1024 * 1024 * 100, diff --git a/libraries/grpc-sdk/src/index.ts b/libraries/grpc-sdk/src/index.ts index 4f3c1128a..d232edc80 100644 --- a/libraries/grpc-sdk/src/index.ts +++ b/libraries/grpc-sdk/src/index.ts @@ -274,19 +274,24 @@ export default class ConduitGrpcSdk { return this._initialize(); } (this._core as unknown) = new Core(this.name, this.serverUrl, this._grpcToken); + await this.connectToCore().catch(() => process.exit(-1)); + this._initialize(); + } + + async connectToCore() { ConduitGrpcSdk.Logger.log('Waiting for Core...'); while (true) { try { + this.core.openConnection(); const state = await this.core.check(); if ((state as unknown as HealthCheckStatus) === HealthCheckStatus.SERVING) { ConduitGrpcSdk.Logger.log('Core connection established'); - this._initialize(); - break; + return; } } catch (err) { if ((err as GrpcError).code === status.PERMISSION_DENIED) { ConduitGrpcSdk.Logger.error(err as Error); - process.exit(-1); + throw err; } await sleep(1000); } @@ -319,6 +324,15 @@ export default class ConduitGrpcSdk { emitter.emit(`module-connection-update:${m.moduleName}`, true); }); }); + emitter.on('core-status-update', () => { + this.connectToCore() + .then(() => { + this._initialize(); + }) + .catch(() => { + process.exit(-1); + }); + }); } monitorModule( @@ -528,8 +542,12 @@ export default class ConduitGrpcSdk { } private _initialize() { - if (this._initialized) - throw new Error("Module's grpc-sdk has already been initialized"); + if (this._initialized) { + this._config?.openConnection(); + this._admin?.openConnection(); + this.config.watchModules().then(); + return; + } (this._config as unknown) = new Config( this.name, this.serverUrl, diff --git a/libraries/grpc-sdk/src/modules/config/index.ts b/libraries/grpc-sdk/src/modules/config/index.ts index e1fe15871..2963c6615 100644 --- a/libraries/grpc-sdk/src/modules/config/index.ts +++ b/libraries/grpc-sdk/src/modules/config/index.ts @@ -5,7 +5,7 @@ import { ConfigDefinition, ModuleHealthRequest, RegisterModuleRequest, -} from '../../protoUtils/core'; +} from '../../protoUtils'; import { Indexable } from '../../interfaces'; import ConduitGrpcSdk from '../../index'; import { ClusterOptions, RedisOptions } from 'ioredis'; @@ -24,6 +24,7 @@ export class Config extends ConduitModule { super(moduleName, 'config', url, grpcToken); this.initializeClient(ConfigDefinition); this._serviceHealthStatusGetter = serviceHealthStatusGetter; + this.emitter.setMaxListeners(150); } getServerConfig() { @@ -150,17 +151,19 @@ export class Config extends ConduitModule { } async watchModules() { - const self = this; - this.emitter.setMaxListeners(150); - self.emitter.emit('serving-modules-update', await self.moduleList().catch()); + if (!this.coreLive) { + this.coreLive = true; + } + this.emitter.emit('serving-modules-update', await this.moduleList().catch()); try { const call = this.client!.watchModules({}); for await (const data of call) { - self.emitter.emit('serving-modules-update', data.modules); + this.emitter.emit('serving-modules-update', data.modules); } } catch (error) { - self.coreLive = false; + this.coreLive = false; ConduitGrpcSdk.Logger.warn('Core unhealthy'); + this.emitter.emit('core-status-update', HealthCheckStatus.UNKNOWN); } } }