Skip to content

Commit

Permalink
Merge pull request #3762 from shaharmor/issue-3441
Browse files Browse the repository at this point in the history
Safeguard for client disconnection
  • Loading branch information
wallet77 committed Jul 5, 2018
2 parents 24cddc2 + 76d07b9 commit 429e455
Showing 1 changed file with 31 additions and 25 deletions.
56 changes: 31 additions & 25 deletions lib/Client.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var axon = require('pm2-axon');
var util = require('util');
var fs = require('fs');
var path = require('path');
var pkg = require('../package.json')
var pkg = require('../package.json');

function noop() {}

Expand All @@ -26,7 +26,7 @@ var Client = module.exports = function(opts) {
this.conf = opts.conf;
}

this.daemon_mode = typeof(opts.daemon_mode) == 'undefined' ? true : opts.daemon_mode;
this.daemon_mode = typeof(opts.daemon_mode) === 'undefined' ? true : opts.daemon_mode;
this.pm2_home = this.conf.PM2_ROOT_PATH;
this.secret_key = opts.secret_key;
this.public_key = opts.public_key;
Expand All @@ -40,15 +40,15 @@ var Client = module.exports = function(opts) {
debug('Using PUB file %s', this.conf.DAEMON_PUB_PORT);
this.rpc_socket_file = this.conf.DAEMON_RPC_PORT;
this.pub_socket_file = this.conf.DAEMON_PUB_PORT;
}
};

// @breaking change (noDaemonMode has been drop)
// @todo ret err
Client.prototype.start = function(cb) {
var that = this;

this.pingDaemon(function(daemonAlive) {
if (daemonAlive == true)
if (daemonAlive === true)
return that.launchRPC(function(err, meta) {
return cb(null, {
daemon_mode : that.conf.daemon_mode,
Expand All @@ -62,7 +62,7 @@ Client.prototype.start = function(cb) {
/**
* No Daemon mode
*/
if (that.daemon_mode == false) {
if (that.daemon_mode === false) {
var Daemon = require('./Daemon.js');

var daemon = new Daemon({
Expand Down Expand Up @@ -310,7 +310,7 @@ Client.prototype.pingDaemon = function pingDaemon(cb) {
});

client.sock.once('error', function(e) {
if (e.code == 'EACCES') {
if (e.code === 'EACCES') {
fs.stat(that.conf.DAEMON_RPC_PORT, function(e, stats) {
if (stats.uid === 0) {
console.error(that.conf.PREFIX_MSG_ERR + 'Permission denied, to give access to current user:');
Expand Down Expand Up @@ -342,6 +342,7 @@ Client.prototype.pingDaemon = function pingDaemon(cb) {
* This method wait to be connected to the Daemon
* Once he's connected it trigger the command parsing (on ./bin/pm2 file, at the end)
* @method launchRPC
* @params {function} [cb]
* @return
*/
Client.prototype.launchRPC = function launchRPC(cb) {
Expand All @@ -350,22 +351,25 @@ Client.prototype.launchRPC = function launchRPC(cb) {
var req = axon.socket('req');
this.client = new rpc.Client(req);

this.client.sock.once('connect', function() {
// Avoid keeping the event loop busy if no more items running
// if (req && req.socks && req.socks[0] && req.socks[0].unref &&
// self.conf.PM2_PROGRAMMATIC)
// req.socks[0].unref();
var connectHandler = function() {
self.client.sock.removeListener('error', errorHandler);
debug('RPC Connected to Daemon');
//process.emit('satan:client:ready');
setTimeout(function() {
return cb ? cb(null) : false;
}, 4);
});
if (cb) {
setTimeout(function() {
cb(null);
}, 4);
}
};

this.client.sock.on('error', function(e) {
return cb(e);
});
var errorHandler = function(e) {
self.client.sock.removeListener('connect', connectHandler);
if (cb) {
return cb(e);
}
};

this.client.sock.once('connect', connectHandler);
this.client.sock.once('error', errorHandler);
this.client_sock = req.connect(this.rpc_socket_file);
};

Expand All @@ -384,8 +388,8 @@ Client.prototype.disconnectRPC = function disconnectRPC(cb) {
});
}

if (this.client_sock.connected == false ||
this.client_sock.closing == true) {
if (this.client_sock.connected === false ||
this.client_sock.closing === true) {
this.client = null;
return process.nextTick(function() {
cb(new Error('RPC already being closed'));
Expand Down Expand Up @@ -413,7 +417,7 @@ Client.prototype.disconnectRPC = function disconnectRPC(cb) {
} catch(e) {
debug('Error while disconnecting RPC PM2', e.stack || e);
return cb(e);
};
}
return false;
};

Expand All @@ -439,8 +443,8 @@ Client.prototype.disconnectBus = function disconnectBus(cb) {
});
}

if (this.sub_sock.connected == false ||
this.sub_sock.closing == true) {
if (this.sub_sock.connected === false ||
this.sub_sock.closing === true) {
that.sub = null;
return process.nextTick(function() {
cb(new Error('SUB connection is already being closed'));
Expand Down Expand Up @@ -515,7 +519,9 @@ Client.prototype.executeRemote = function executeRemote(method, app_conf, fn) {
console.error(error);
return process.exit(0);
}
return self.client.call(method, app_conf, fn);
if (self.client) {
return self.client.call(method, app_conf, fn);
}
});
return false;
}
Expand Down

0 comments on commit 429e455

Please sign in to comment.