Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(grpc-sdk): module connection not re-established after core shutdown #858

Merged
merged 2 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions libraries/grpc-sdk/src/classes/ConduitModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ import { getGrpcSignedTokenInterceptor, getModuleNameInterceptor } from '../inte
import { CompatServiceDefinition } from 'nice-grpc/lib/service-definitions';
import { Channel, Client, createChannel, createClientFactory } from 'nice-grpc';
import { retryMiddleware } from 'nice-grpc-client-middleware-retry';
import { HealthCheckResponse, HealthDefinition } from '../protoUtils';
import {
ConduitModuleDefinition,
HealthCheckResponse,
HealthDefinition,
} from '../protoUtils';
import { EventEmitter } from 'events';
import { ConduitModuleDefinition } from '../protoUtils';
import ConduitGrpcSdk from '../index';

export class ConduitModule<T extends CompatServiceDefinition> {
Expand Down Expand Up @@ -58,7 +61,11 @@ export class ConduitModule<T extends CompatServiceDefinition> {
}

openConnection() {
if (this.channel) return;
if (this.channel) {
// used to make sure a connection attempt is made
this.channel.getConnectivityState(true);
return;
}
// ConduitGrpcSdk.Logger.log(`Opening connection for ${this._serviceName}`);
this.channel = createChannel(this._serviceUrl, undefined, {
'grpc.max_receive_message_length': 1024 * 1024 * 100,
Expand Down
28 changes: 23 additions & 5 deletions libraries/grpc-sdk/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -274,19 +274,24 @@ export default class ConduitGrpcSdk {
return this._initialize();
}
(this._core as unknown) = new Core(this.name, this.serverUrl, this._grpcToken);
await this.connectToCore().catch(() => process.exit(-1));
this._initialize();
}

async connectToCore() {
ConduitGrpcSdk.Logger.log('Waiting for Core...');
while (true) {
try {
this.core.openConnection();
const state = await this.core.check();
if ((state as unknown as HealthCheckStatus) === HealthCheckStatus.SERVING) {
ConduitGrpcSdk.Logger.log('Core connection established');
this._initialize();
break;
return;
}
} catch (err) {
if ((err as GrpcError).code === status.PERMISSION_DENIED) {
ConduitGrpcSdk.Logger.error(err as Error);
process.exit(-1);
throw err;
}
await sleep(1000);
}
Expand Down Expand Up @@ -319,6 +324,15 @@ export default class ConduitGrpcSdk {
emitter.emit(`module-connection-update:${m.moduleName}`, true);
});
});
emitter.on('core-status-update', () => {
this.connectToCore()
.then(() => {
this._initialize();
})
.catch(() => {
process.exit(-1);
});
});
}

monitorModule(
Expand Down Expand Up @@ -528,8 +542,12 @@ export default class ConduitGrpcSdk {
}

private _initialize() {
if (this._initialized)
throw new Error("Module's grpc-sdk has already been initialized");
if (this._initialized) {
this._config?.openConnection();
this._admin?.openConnection();
this.config.watchModules().then();
return;
}
(this._config as unknown) = new Config(
this.name,
this.serverUrl,
Expand Down
15 changes: 9 additions & 6 deletions libraries/grpc-sdk/src/modules/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
ConfigDefinition,
ModuleHealthRequest,
RegisterModuleRequest,
} from '../../protoUtils/core';
} from '../../protoUtils';
import { Indexable } from '../../interfaces';
import ConduitGrpcSdk from '../../index';
import { ClusterOptions, RedisOptions } from 'ioredis';
Expand All @@ -24,6 +24,7 @@ export class Config extends ConduitModule<typeof ConfigDefinition> {
super(moduleName, 'config', url, grpcToken);
this.initializeClient(ConfigDefinition);
this._serviceHealthStatusGetter = serviceHealthStatusGetter;
this.emitter.setMaxListeners(150);
}

getServerConfig() {
Expand Down Expand Up @@ -150,17 +151,19 @@ export class Config extends ConduitModule<typeof ConfigDefinition> {
}

async watchModules() {
const self = this;
this.emitter.setMaxListeners(150);
self.emitter.emit('serving-modules-update', await self.moduleList().catch());
if (!this.coreLive) {
this.coreLive = true;
}
this.emitter.emit('serving-modules-update', await this.moduleList().catch());
try {
const call = this.client!.watchModules({});
for await (const data of call) {
self.emitter.emit('serving-modules-update', data.modules);
this.emitter.emit('serving-modules-update', data.modules);
}
} catch (error) {
self.coreLive = false;
this.coreLive = false;
ConduitGrpcSdk.Logger.warn('Core unhealthy');
this.emitter.emit('core-status-update', HealthCheckStatus.UNKNOWN);
}
}
}
Loading