Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent application from crashing if child process crashes - Closes #3142 #3168

44 changes: 38 additions & 6 deletions framework/src/controller/bus.js
@@ -1,5 +1,5 @@
const axon = require('axon');
const { Server: RPCServer, Client: RPCClient } = require('axon-rpc');
const axon = require('pm2-axon');
const { Server: RPCServer, Client: RPCClient } = require('pm2-axon-rpc');
const { EventEmitter2 } = require('eventemitter2');
const Action = require('./action');

Expand Down Expand Up @@ -32,6 +32,7 @@ class Bus extends EventEmitter2 {
this.actions = {};
this.events = {};
this.channels = {};
this.rpcClients = {};
}

/**
Expand Down Expand Up @@ -90,7 +91,6 @@ class Bus extends EventEmitter2 {
*
* @throws {Error} If event name is already registered.
*/
// eslint-disable-next-line no-unused-vars
async registerChannel(
moduleAlias,
events,
Expand Down Expand Up @@ -123,6 +123,7 @@ class Bus extends EventEmitter2 {
const rpcSocket = axon.socket('req');
rpcSocket.connect(options.rpcSocketPath);
channel = new RPCClient(rpcSocket);
this.rpcClients[moduleAlias] = rpcSocket;
}

this.channels[moduleAlias] = {
Expand All @@ -133,6 +134,37 @@ class Bus extends EventEmitter2 {
};
}

/**
* Unregister channel from bus.
*
* @async
* @param {string} moduleAlias - Alias for module used during registration
*
* @throws {Error} If channel not registered.
*/
async unregisterChannel(moduleAlias) {
Object.keys(this.events).forEach(eventName => {
shuse2 marked this conversation as resolved.
Show resolved Hide resolved
const [moduleName] = eventName.split(':');
if (moduleName === moduleAlias) {
delete this.events[eventName];
}
});

Object.keys(this.actions).forEach(actionName => {
const [moduleName] = actionName.split(':');
if (moduleName === moduleAlias) {
delete this.actions[actionName];
}
});

const rpcSocket = this.rpcClients[moduleAlias];
if (rpcSocket) {
rpcSocket.close();
}

delete this.channels[moduleAlias];
}

/**
* Invoke action on bus.
*
Expand Down Expand Up @@ -248,13 +280,13 @@ class Bus extends EventEmitter2 {
* @returns {Promise<void>}
*/
async cleanup() {
if (this.pubSocket && typeof this.pubSocket.close === 'function') {
if (this.pubSocket) {
this.pubSocket.close();
}
if (this.subSocket && typeof this.subSocket.close === 'function') {
if (this.subSocket) {
this.subSocket.close();
}
if (this.rpcSocket && typeof this.rpcSocket.close === 'function') {
if (this.rpcSocket) {
this.rpcSocket.close();
}
}
Expand Down
19 changes: 15 additions & 4 deletions framework/src/controller/channels/child_process_channel.js
@@ -1,6 +1,6 @@
const { EventEmitter2 } = require('eventemitter2');
const axon = require('axon');
const { Server: RPCServer, Client: RPCClient } = require('axon-rpc');
const axon = require('pm2-axon');
const { Server: RPCServer, Client: RPCClient } = require('pm2-axon-rpc');
const Action = require('../action');
const Event = require('../event');
const BaseChannel = require('./base_channel');
Expand Down Expand Up @@ -44,7 +44,9 @@ class ChildProcessChannel extends BaseChannel {

// Channel RPC Server is only required if the module has actions
if (this.actionsList.length > 0) {
this.rpcSocketPath = `${socketsPath.root}/${this.moduleAlias}_rpc.sock`;
this.rpcSocketPath = `unix://${socketsPath.root}/${
this.moduleAlias
}_rpc.sock`;

this.rpcSocket = axon.socket('rep');
this.rpcSocket.bind(this.rpcSocketPath);
Expand Down Expand Up @@ -137,9 +139,18 @@ class ChildProcessChannel extends BaseChannel {
* @returns {Promise<void>}
*/
async cleanup() {
if (this.rpcSocket && typeof this.rpcSocket.close === 'function') {
if (this.pubSocket) {
this.pubSocket.close();
}
if (this.subSocket) {
this.subSocket.close();
}
if (this.rpcSocket) {
this.rpcSocket.close();
}
if (this.busRpcSocket) {
this.busRpcSocket.close();
}
}

/**
Expand Down
21 changes: 18 additions & 3 deletions framework/src/controller/controller.js
Expand Up @@ -48,7 +48,9 @@ class Controller {
},
};

this.liskReady = false;
this.modules = {};
this.childrenList = [];
this.channel = null; // Channel for controller
this.bus = null;
}
Expand All @@ -70,6 +72,7 @@ class Controller {
this.logger.info('Bus listening to events', this.bus.getEvents());
this.logger.info('Bus ready for actions', this.bus.getActions());

this.liskReady = true;
this.channel.publish('lisk:ready');
}

Expand Down Expand Up @@ -127,6 +130,7 @@ class Controller {
'lisk',
['ready'],
{
isLiskReady: () => this.liskReady,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we don't need this action? We must not create actions preferably if the it can be sorted through events. And lisk:ready event is the right solution for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this when reloading the module

getComponentConfig: action => this.config.components[action.params],
},
{ skipInternalEvents: true }
Expand Down Expand Up @@ -228,12 +232,21 @@ class Controller {

const child = child_process.fork(program, parameters);

child.on('exit', (code, signal) => {
this.childrenList.push(child);
lsilvs marked this conversation as resolved.
Show resolved Hide resolved
yatki marked this conversation as resolved.
Show resolved Hide resolved

child.on('exit', async (code, signal) => {
this.logger.error(
`Module ${moduleAlias}(${name}:${version}) exited with code: ${code} and signal: ${signal}`
);
// TODO: Reload child instead of exiting the parent process
process.exit(1);

await this.bus.unregisterChannel(moduleAlias);

this.childrenList = this.childrenList.filter(
({ pid }) => pid !== child.pid
);

// Reload the module
await this._loadChildProcessModule(alias, Klass, options);
});

return Promise.race([
Expand Down Expand Up @@ -269,6 +282,8 @@ class Controller {
this.logger.error(`Reason: ${reason}`);
}

this.childrenList.forEach(child => child.kill());

try {
await this.bus.cleanup();
await this.unloadModules();
Expand Down
11 changes: 8 additions & 3 deletions framework/src/modules/chain/index.js
Expand Up @@ -86,10 +86,15 @@ module.exports = class ChainModule extends BaseModule {

async load(channel) {
this.chain = new Chain(channel, this.options);
const isLiskReady = await channel.invoke('lisk:isLiskReady');

channel.once('lisk:ready', () => {
this.chain.bootstrap();
});
if (isLiskReady) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not the right way to do it, lisk:ready event is triggered after load is called. So it must be listened as of previous implementation. I suspect you changed this because of module reload, if yes then as I told earlier, we should develop that strategy as an enhancement later. And leave that implication as it was before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it was introduced to allow modules to be reloaded in case it crashes and I don't see the reason of reverting this implementation if it works.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If child process cause some memory leaks crash, then this infinite loading will exhaust the resources.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree and we plan to improve this already. This is the first implementation of child process and wont be the final one. Knowing that this implementation can be unstable, I've created a property in the config file to completely disable it if required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change is not related to stability. If there is an issue and any of code crash the whole application exited. That's exactly the default behaviour we have from start and we should keep it consistent in this release. In next release we develop resilience with a detailed and extensive approach.

await this.chain.bootstrap();
} else {
channel.once('lisk:ready', async () => {
await this.chain.bootstrap();
});
}
}

async unload() {
Expand Down
12 changes: 9 additions & 3 deletions framework/src/modules/http_api/index.js
Expand Up @@ -41,9 +41,15 @@ class HttpAPIModule extends BaseModule {

async load(channel) {
this.httpApi = new HttpApi(channel, this.options);
channel.once('lisk:ready', () => {
this.httpApi.bootstrap();
});
const isLiskReady = await channel.invoke('lisk:isLiskReady');

if (isLiskReady) {
await this.httpApi.bootstrap();
} else {
channel.once('lisk:ready', async () => {
await this.httpApi.bootstrap();
});
}
}

async unload() {
Expand Down
4 changes: 2 additions & 2 deletions framework/test/jest/specs/unit/controller/bus.spec.js
Expand Up @@ -5,8 +5,8 @@ const Controller = require('../../../../../src/controller/controller');

jest.mock('../../../../../src/controller/controller');
jest.mock('eventemitter2');
jest.mock('axon');
jest.mock('axon-rpc');
jest.mock('pm2-axon');
jest.mock('pm2-axon-rpc');

describe('Bus', () => {
const controller = new Controller();
Expand Down
75 changes: 23 additions & 52 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Expand Up @@ -58,8 +58,8 @@
"dependencies": {
"ajv": "6.7.0",
"async": "2.6.1",
"axon": "2.0.3",
"axon-rpc": "0.0.3",
"pm2-axon": "3.3.0",
"pm2-axon-rpc": "0.5.1",
"bignumber.js": "8.0.2",
"bluebird": "3.5.3",
"body-parser": "1.18.3",
Expand Down