From 775f0ca354debc120be95705ed505a81de2e83d0 Mon Sep 17 00:00:00 2001 From: vmarchaud Date: Wed, 1 Mar 2017 01:53:11 +0100 Subject: [PATCH] wip --- constants.js | 19 +- lib/API.js | 2 +- lib/API/Configuration.js | 8 +- lib/API/Interaction.js | 2 +- lib/Client.js | 2 +- lib/Common.js | 2 +- lib/Interactor/Cipher.js | 50 -- lib/Interactor/Daemon.js | 408 -------------- lib/Interactor/Filter.js | 146 +++--- lib/Interactor/HttpRequest.js | 71 --- ...actorDaemonizer.js => InteractorClient.js} | 16 +- lib/Interactor/InteractorDaemon.js | 308 +++++++++++ .../{pm2-interface.js => PM2Client.js} | 82 ++- lib/Interactor/PushInteractor.js | 496 ++++++------------ lib/Interactor/RemoteActions/CustomActions.js | 92 ---- lib/Interactor/RemoteActions/Pm2Actions.js | 336 ------------ .../RemoteActions/ScopedExecution.js | 34 -- lib/Interactor/ReverseInteractor.js | 349 ++++++++---- lib/Interactor/ScopedExecution.js | 31 ++ lib/Interactor/TransactionAggregator.js | 317 +++++------ lib/Interactor/Utility.js | 341 ++++++++++-- lib/Interactor/WebsocketTransport.js | 178 +++++++ lib/Satan.js | 4 +- test/interface/custom-actions.mocha.js | 2 +- test/interface/interactor.daemonizer.mocha.js | 16 +- test/interface/scoped_pm2_actions.mocha.js | 2 +- 26 files changed, 1511 insertions(+), 1803 deletions(-) delete mode 100644 lib/Interactor/Cipher.js delete mode 100644 lib/Interactor/Daemon.js delete mode 100644 lib/Interactor/HttpRequest.js rename lib/Interactor/{InteractorDaemonizer.js => InteractorClient.js} (97%) create mode 100644 lib/Interactor/InteractorDaemon.js rename lib/Interactor/{pm2-interface.js => PM2Client.js} (50%) delete mode 100644 lib/Interactor/RemoteActions/CustomActions.js delete mode 100644 lib/Interactor/RemoteActions/Pm2Actions.js delete mode 100644 lib/Interactor/RemoteActions/ScopedExecution.js create mode 100644 lib/Interactor/ScopedExecution.js create mode 100644 lib/Interactor/WebsocketTransport.js diff --git a/constants.js b/constants.js index 8102b97f3..23f03fc57 100644 --- a/constants.js +++ b/constants.js @@ -52,21 +52,22 @@ var csts = { CLUSTER_MODE_ID : 'cluster_mode', FORK_MODE_ID : 'fork_mode', - KEYMETRICS_ROOT_URL : process.env.KEYMETRICS_NODE || 'root.keymetrics.io', + KEYMETRICS_ROOT_URL : process.env.KEYMETRICS_NODE || 'https://root.keymetrics.io', KEYMETRICS_BANNER : '../lib/keymetrics', - DEFAULT_MODULE_JSON : 'package.json', - REMOTE_PORT_TCP : isNaN(parseInt(process.env.KEYMETRICS_PUSH_PORT)) ? 80 : parseInt(process.env.KEYMETRICS_PUSH_PORT), - REMOTE_PORT : 41624, - REMOTE_REVERSE_PORT : isNaN(parseInt(process.env.KEYMETRICS_REVERSE_PORT)) ? 43554 : parseInt(process.env.KEYMETRICS_REVERSE_PORT), - REMOTE_HOST : 's1.keymetrics.io', - SEND_INTERVAL : 1000, - GRACEFUL_TIMEOUT : parseInt(process.env.PM2_GRACEFUL_TIMEOUT) || 8000, - GRACEFUL_LISTEN_TIMEOUT : parseInt(process.env.PM2_GRACEFUL_LISTEN_TIMEOUT) || 3000, + PROTOCOL_VERSION : 1, + COMPRESS_PROTOCOL : false, + STATUS_INTERVAL : 1000, + LOGS_BUFFER : 10, CONTEXT_ON_ERROR : 2, AGGREGATION_DURATION : process.env.PM2_DEBUG || process.env.NODE_ENV === 'test' || process.env.NODE_ENV === 'development' ? 0 : 60 * 10, + GRACEFUL_TIMEOUT : parseInt(process.env.PM2_GRACEFUL_TIMEOUT) || 8000, + GRACEFUL_LISTEN_TIMEOUT : parseInt(process.env.PM2_GRACEFUL_LISTEN_TIMEOUT) || 3000, + + DEFAULT_MODULE_JSON : 'package.json', + // Concurrent actions when doing start/restart/reload CONCURRENT_ACTIONS : (function() { var concurrent_actions = parseInt(process.env.PM2_CONCURRENT_ACTIONS) || 1; diff --git a/lib/API.js b/lib/API.js index 6b3c36b44..5652c8f8f 100644 --- a/lib/API.js +++ b/lib/API.js @@ -17,7 +17,7 @@ var fclone = require('fclone'); var conf = require('../constants.js'); var Client = require('./Client'); var Common = require('./Common'); -var KMDaemon = require('./Interactor/InteractorDaemonizer'); +var KMDaemon = require('./Interactor/InteractorClient'); var Config = require('./tools/Config'); var Modularizer = require('./API/Modules/Modularizer.js'); var path_structure = require('../paths.js'); diff --git a/lib/API/Configuration.js b/lib/API/Configuration.js index 556a53908..3171d8a48 100644 --- a/lib/API/Configuration.js +++ b/lib/API/Configuration.js @@ -7,7 +7,7 @@ var chalk = require('chalk'); var async = require('async'); var Configuration = require('../Configuration.js'); //@todo double check that imported methods works -var InteractorDaemonizer = require('../Interactor/InteractorDaemonizer'); +var InteractorClient = require('../Interactor/InteractorClient'); module.exports = function(CLI) { @@ -65,15 +65,15 @@ module.exports = function(CLI) { Configuration.set(key, value, function(err) { if (err) return cb ? cb(Common.retErr(err)) : that.exitCli(cst.ERROR_EXIT); - InteractorDaemonizer.launchRPC(that._conf, function(err) { + InteractorClient.launchRPC(that._conf, function(err) { if (err) { displayConf('pm2', function() { return cb ? cb(null, {success:true}) : that.exitCli(cst.SUCCESS_EXIT); }); return false; } - InteractorDaemonizer.rpc.passwordSet(function() { - InteractorDaemonizer.disconnectRPC(function() { + InteractorClient.rpc.passwordSet(function() { + InteractorClient.disconnectRPC(function() { displayConf('pm2', function() { return cb ? cb(null, {success:true}) : that.exitCli(cst.SUCCESS_EXIT); }); diff --git a/lib/API/Interaction.js b/lib/API/Interaction.js index 46ea88a41..8cdc420f0 100644 --- a/lib/API/Interaction.js +++ b/lib/API/Interaction.js @@ -6,7 +6,7 @@ var chalk = require('chalk'); var async = require('async'); var path = require('path'); var fs = require('fs'); -var KMDaemon = require('../Interactor/InteractorDaemonizer'); +var KMDaemon = require('../Interactor/InteractorClient'); module.exports = function(CLI) { diff --git a/lib/Client.js b/lib/Client.js index 6aa0209ab..2528e4889 100644 --- a/lib/Client.js +++ b/lib/Client.js @@ -6,7 +6,7 @@ var debug = require('debug')('pm2:client'); var Common = require('./Common.js'); -var KMDaemon = require('./Interactor/InteractorDaemonizer.js'); +var KMDaemon = require('./Interactor/InteractorClient.js'); var rpc = require('pm2-axon-rpc'); var async = require('async'); var axon = require('pm2-axon'); diff --git a/lib/Common.js b/lib/Common.js index 3d1c15bf4..8d534ffef 100644 --- a/lib/Common.js +++ b/lib/Common.js @@ -19,7 +19,7 @@ var isBinary = require('./tools/isbinaryfile.js'); var cst = require('../constants.js'); var extItps = require('./API/interpreter.json'); var Config = require('./tools/Config'); -var KMDaemon = require('./Interactor/InteractorDaemonizer.js'); +var KMDaemon = require('./Interactor/InteractorClient.js'); /** * Common methods (used by CLI and God) diff --git a/lib/Interactor/Cipher.js b/lib/Interactor/Cipher.js deleted file mode 100644 index 1204a3eb8..000000000 --- a/lib/Interactor/Cipher.js +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Copyright 2013 the PM2 project authors. All rights reserved. - * Use of this source code is governed by a license that - * can be found in the LICENSE file. - */ - -var crypto = require('crypto'); - -const CIPHER_ALGORITHM = 'aes256'; - -var Cipher = module.exports = {}; - -/** - * Description - * @method decipherMessage - * @param {} msg - * @return ret - */ -Cipher.decipherMessage = function(msg, key) { - var ret = {}; - - try { - var decipher = crypto.createDecipher(CIPHER_ALGORITHM, key); - var decipheredMessage = decipher.update(msg, 'hex', 'utf8'); - decipheredMessage += decipher.final('utf8'); - ret = JSON.parse(decipheredMessage); - } catch(e) { - return null; - } - - return ret; -} - -/** - * Description - * @method cipherMessage - * @param {} data - * @param {} key - * @return - */ -Cipher.cipherMessage = function(data, key) { - try { - var cipher = crypto.createCipher(CIPHER_ALGORITHM, key); - var cipheredData = cipher.update(data, 'utf8', 'hex'); - cipheredData += cipher.final('hex'); - return cipheredData; - } catch(e) { - return null; - } -} diff --git a/lib/Interactor/Daemon.js b/lib/Interactor/Daemon.js deleted file mode 100644 index f70e2072d..000000000 --- a/lib/Interactor/Daemon.js +++ /dev/null @@ -1,408 +0,0 @@ -/** - * Copyright 2013 the PM2 project authors. All rights reserved. - * Use of this source code is governed by a license that - * can be found in the LICENSE file. - */ - -var fs = require('fs'); -var ipm2 = require('./pm2-interface.js'); -var rpc = require('pm2-axon-rpc'); -var axon = require('pm2-axon'); -var debug = require('debug')('interface:driver'); // Interface -var chalk = require('chalk'); -var Url = require('url'); -var os = require('os'); -var pkg = require('../../package.json'); -var PM2 = require('../..'); - -var cst = require('../../constants.js'); -var Cipher = require('./Cipher.js'); -var ReverseInteractor = require('./ReverseInteractor.js'); -var PushInteractor = require('./PushInteractor.js'); -var Utility = require('../Utility.js'); -var WatchDog = require('./WatchDog.js'); -var Conf = require('../Configuration.js'); -var HttpRequest = require('./HttpRequest.js'); - -global._pm2_password_protected = false; - -// Flag for log streaming status -global._logs = false; - -var Daemon = module.exports = { - connectToPM2 : function() { - return ipm2(); - }, - exit : function() { - var self = this; - - this.opts.pm2_instance.disconnect(function() { - console.log('Connection to PM2 via CLI closed'); - }); - - process.nextTick(function() { - try { - fs.unlinkSync(cst.INTERACTOR_RPC_PORT); - fs.unlinkSync(cst.INTERACTOR_PID_PATH); - } catch(e) {} - - if (self.opts.ipm2) - self.opts.ipm2.disconnect(); - - console.log('Exiting Interactor'); - - if (!this._rpc || !this._rpc.sock) - return process.exit(cst.ERROR_EXIT); - - this._rpc.sock.close(function() { - console.log('RPC closed - Interactor killed'); - process.exit(cst.SUCCESS_EXIT); - }); - }); - }, - activateRPC : function() { - console.log('Launching Interactor exposure'); - - var self = this; - var rep = axon.socket('rep'); - var daemon_server = new rpc.Server(rep); - var sock = rep.bind(cst.INTERACTOR_RPC_PORT); - - daemon_server.expose({ - kill : function(cb) { - console.log('Killing interactor'); - cb(null); - return Daemon.exit(); - }, - passwordSet : function(cb) { - global._pm2_password_protected = true; - return cb(null); - }, - getInfos : function(cb) { - if (self.opts && - self.opts.DAEMON_ACTIVE == true) - return cb(null, { - machine_name : self.opts.MACHINE_NAME, - public_key : self.opts.PUBLIC_KEY, - secret_key : self.opts.SECRET_KEY, - remote_host : cst.REMOTE_HOST, - remote_port : cst.REMOTE_PORT, - reverse_interaction : self.opts.REVERSE_INTERACT, - socket_path : cst.INTERACTOR_RPC_PORT, - pm2_home_monitored : cst.PM2_HOME - }); - else { - return cb(null); - } - } - }); - return daemon_server; - }, - formatMetada : function() { - var cpu, memory; - - var self = this; - - try { - cpu = os.cpus(); - memory = Math.floor(os.totalmem() / 1024 / 1024); - } catch(e) { - cpu = 0; - memory = 0; - }; - - var ciphered_data = Cipher.cipherMessage(JSON.stringify({ - MACHINE_NAME : this.opts.MACHINE_NAME, - PUBLIC_KEY : this.opts.PUBLIC_KEY, - PM2_VERSION : this.opts.PM2_VERSION, - RECYCLE : this.opts.RECYCLE || false, - MEMORY : memory, - HOSTNAME : os.hostname(), - CPUS : cpu.length - }), this.opts.SECRET_KEY); - - return ciphered_data; - }, - pingKeepAlive : function() { - var self = this; - - (function checkInternet() { - require('dns').lookup('google.com',function(err) { - if (err && (err.code == 'ENOTFOUND' || err.code == 'EAI_AGAIN')) { - if (self.opts._connection_is_up == true) - console.error('[CRITICAL] Internet is unreachable (via DNS lookup strategy)'); - self.opts._connection_is_up = false; - } else { - if (self.opts._connection_is_up == false) { - console.log('[TENTATIVE] Reactivating connection'); - PushInteractor.connectRemote(); - } - self.opts._connection_is_up = true; - } - setTimeout(checkInternet, 15000); - }); - })(); - }, - changeUrls : function(push_url, reverse) { - if (push_url) - PushInteractor.connectRemote(push_url); - if (reverse) - ReverseInteractor.changeUrl(reverse); - }, - refreshWorker : function() { - var self = this; - - function refreshMetadata() { - var ciphered_data = Daemon.formatMetada(); - - HttpRequest.post({ - url : self.opts.ROOT_URL, - port : self.opts.ROOT_PORT, - data : { - public_id : self.opts.PUBLIC_KEY, - data : ciphered_data - } - }, function(err, km_data) { - if (err) return console.error(err); - - /** protect against malformated data **/ - if (!km_data || - !km_data.endpoints || - !km_data.endpoints.push || - !km_data.endpoints.reverse) { - console.error('[CRITICAL] Malformated data received, skipping...'); - return false; - } - - if (km_data.disabled == true) { - console.error('Server DISABLED BY ADMINISTRATION contact support contact@keymetrics.io with reference to your public and secret keys)'); - return Daemon.exit(); - } - - /************************************** - * Urls has changed = update workers * - **************************************/ - - if ((Daemon.current_km_data.endpoints.push != km_data.endpoints.push) || - (Daemon.current_km_data.endpoints.reverse != km_data.endpoints.reverse)) { - self.changeUrls(km_data.endpoints.push, km_data.endpoints.reverse); - Daemon.current_km_data = km_data; - } - else { - debug('[REFRESH META] No need to update URL (same)', km_data); - } - return false; - }); - - }; - - // Refresh metadata every minutes - setInterval(function() { - refreshMetadata(); - }, 60000); - }, - validateData : function() { - var opts = {}; - - opts.MACHINE_NAME = process.env.PM2_MACHINE_NAME; - opts.PUBLIC_KEY = process.env.PM2_PUBLIC_KEY; - opts.SECRET_KEY = process.env.PM2_SECRET_KEY; - opts.RECYCLE = process.env.KM_RECYCLE ? JSON.parse(process.env.KM_RECYCLE) : false; - opts.REVERSE_INTERACT = JSON.parse(process.env.PM2_REVERSE_INTERACT); - opts.PM2_VERSION = pkg.version; - - if (!opts.MACHINE_NAME) { - console.error('You must provide a PM2_MACHINE_NAME environment variable'); - process.exit(cst.ERROR_EXIT); - } - else if (!opts.PUBLIC_KEY) { - console.error('You must provide a PM2_PUBLIC_KEY environment variable'); - process.exit(cst.ERROR_EXIT); - } - else if (!opts.SECRET_KEY) { - console.error('You must provide a PM2_SECRET_KEY environment variable'); - process.exit(cst.ERROR_EXIT); - } - return opts; - }, - welcome : function(cb) { - var self = this; - var ciphered_data = Daemon.formatMetada(); - - if (!ciphered_data) { - process.send({ - msg : 'Error while ciphering data', - error : true - }); - return process.exit(1); - } - - var retries = 0; - - function doWelcomeQuery(cb) { - HttpRequest.post({ - url : self.opts.ROOT_URL, - port : self.opts.ROOT_PORT, - data : { - public_id : self.opts.PUBLIC_KEY, - data : ciphered_data - } - }, function(err, km_data) { - self.current_km_data = km_data; - if (err) { - console.error('Got error while connecting: %s', err.message || err); - if (retries < 5) { - retries++; - setTimeout(function() { - doWelcomeQuery(cb); - }, 600); - return false; - } - return cb(err); - } - - if (self.opts.RECYCLE) { - if (!km_data.name) { - console.error('Error no previous machine name for recycle option returned!'); - } - self.opts.MACHINE_NAME = km_data.name; - }; - - // For Human feedback - if (process.send) - process.send({ - error : false, - km_data : km_data, - online : true, - pid : process.pid, - machine_name : self.opts.MACHINE_NAME, - public_key : self.opts.PUBLIC_KEY, - secret_key : self.opts.SECRET_KEY, - reverse_interaction : self.opts.REVERSE_INTERACT - }); - // Return get data - return cb(null, km_data); - }) - } - - doWelcomeQuery(function(err, meta) { - return cb(err, meta); - }); - }, - start : function() { - var self = this; - - self.opts = self.validateData(); - self.opts.ipm2 = null; - self.opts.internal_ip = require('./Utility').network.v4; - self.opts.pm2_instance = PM2; - self.opts._connection_is_up = true; - self.current_km_data = null; - - self.opts.pm2_instance.connect(function() { - console.log('Connected to PM2'); - }); - - self._rpc = self.activateRPC(); - - // Test mode #1 - if (cst.DEBUG) { - self.opts.ROOT_URL = '127.0.0.1'; - if (process.env.NODE_ENV == 'test') - self.opts.ROOT_PORT = 3400; - else - self.opts.ROOT_PORT = 3000; - } - else { - self.opts.ROOT_URL = cst.KEYMETRICS_ROOT_URL; - self.opts.ROOT_PORT = 443; - } - - if (Conf.getSync('pm2:passwd')) - global._pm2_password_protected = true; - - // Test mode #2 - if (process.env.NODE_ENV == 'local_test') { - self.opts.DAEMON_ACTIVE = true; - - self.opts.ipm2 = self.connectToPM2(); - - PushInteractor.start({ - url : 'http://127.0.0.1:4321', - conf : self.opts - }); - - ReverseInteractor.start({ - url : 'http://127.0.0.1:4322', - conf : self.opts - }); - if (process.send) - process.send({ - success : true, - debug : true - }); - return false; - } - - Daemon.welcome(function(err, km_data) { - if (err) { - if (process.send) - process.send({ - error : true, - msg : err.stack || err - }); - console.log(err.stack || err); - return Daemon.exit(); - } - - if (km_data.disabled == true) { - console.error('Interactor disabled'); - return Daemon.exit(); - } - if (km_data.pending == true) { - console.error('Interactor pending'); - return Daemon.exit(); - } - - if (km_data.active == true) { - self.opts.DAEMON_ACTIVE = true; - - self.opts.ipm2 = self.connectToPM2(); - - WatchDog.start({ - conf : self.opts - }); - - PushInteractor.start({ - url : km_data.endpoints.push, - conf : self.opts - }); - - if (self.opts.REVERSE_INTERACT == true) { - ReverseInteractor.start({ - url : km_data.endpoints.reverse, - conf : self.opts - }); - } - Daemon.refreshWorker(); - Daemon.pingKeepAlive(); - } - else { - console.log('Nothing to do, exiting'); - Daemon.exit(); - } - return false; - }); - } -}; - -/** - * MAIN - */ -if (require.main === module) { - console.log(chalk.cyan.bold('[Keymetrics.io]') + ' Launching agent'); - process.title = 'PM2: KM Agent (' + process.env.PM2_HOME + ')'; - - Utility.overrideConsole(); - Daemon.start(); -} diff --git a/lib/Interactor/Filter.js b/lib/Interactor/Filter.js index ee813f939..9670505a5 100644 --- a/lib/Interactor/Filter.js +++ b/lib/Interactor/Filter.js @@ -10,101 +10,101 @@ * @project Interface */ -var os = require('os'); +var os = require('os'); -var cpu_info = { - number : 0, - info : 'no-data' +var cpuMeta = { + number: os.cpus().length, + info: os.cpus().length > 0 ? os.cpus()[0].model : 'no-data' }; -try { - cpu_info = { - number : os.cpus().length, - info : os.cpus()[0].model - }; -} catch(e) { -} - var Filter = {}; -Filter.getProcessID = function(machine_name, name, id) { - return machine_name + ':' + name + ':' + id; +Filter.getProcessID = function (server, name, id) { + return server + ':' + name + ':' + id; }; -Filter.status = function(processes, conf) { - if (!processes) return null; - - var filter_procs = []; - - processes.forEach(function(proc) { - if (proc.pm2_env.pm_id.toString().indexOf('_old_') == -1) - filter_procs.push({ - pid : proc.pid, - name : proc.pm2_env.name, - interpreter : proc.pm2_env.exec_interpreter, - restart_time : proc.pm2_env.restart_time, - created_at : proc.pm2_env.created_at, - exec_mode : proc.pm2_env.exec_mode, - watching : proc.pm2_env.watch, - pm_uptime : proc.pm2_env.pm_uptime, - status : proc.pm2_env.status, - pm_id : proc.pm2_env.pm_id, - - cpu : Math.floor(proc.monit.cpu) || 0, - memory : Math.floor(proc.monit.memory) || 0, - - versioning : proc.pm2_env.versioning || null, - - axm_actions : proc.pm2_env.axm_actions || [], - axm_monitor : proc.pm2_env.axm_monitor || {}, - axm_options : proc.pm2_env.axm_options || {}, - axm_dynamic : proc.pm2_env.dynamic || {} - }); +/** + * Normalize each process metdata + * @param {Object} processes process data extracted from pm2 daemon + * @param {Object} conf interactor configuration + */ +Filter.status = function (processes, conf) { + if (!processes || processes.length === 0) return null; + + var procs = []; + + processes.forEach(function (proc) { + if (proc.pm2_env.pm_id.toString().indexOf('_old_') > -1) return; + + procs.push({ + pid: proc.pid, + name: proc.pm2_env.name, + interpreter: proc.pm2_env.exec_interpreter, + restart_time: proc.pm2_env.restart_time, + created_at: proc.pm2_env.created_at, + exec_mode: proc.pm2_env.exec_mode, + watching: proc.pm2_env.watch, + pm_uptime: proc.pm2_env.pm_uptime, + status: proc.pm2_env.status, + pm_id: proc.pm2_env.pm_id, + + cpu: Math.floor(proc.monit.cpu) || 0, + memory: Math.floor(proc.monit.memory) || 0, + + versioning: proc.pm2_env.versioning || null, + + axm_actions: proc.pm2_env.axm_actions || [], + axm_monitor: proc.pm2_env.axm_monitor || {}, + axm_options: proc.pm2_env.axm_options || {}, + axm_dynamic: proc.pm2_env.dynamic || {} + }); }); - var node_version = process.version || ''; - - if (node_version != '') { - if (node_version.indexOf('v1.') === 0 || node_version.indexOf('v2.') === 0 || node_version.indexOf('v3.') === 0) - node_version = 'iojs ' + node_version; + var nodeVersion = process.version; + if (process.version.indexOf('v1.') === 0 || process.version.indexOf('v2.') === 0 || process.version.indexOf('v3.') === 0) { + nodeVersion = 'iojs ' + nodeVersion; } return { - process : filter_procs, - server : { - loadavg : os.loadavg(), - total_mem : os.totalmem(), - free_mem : os.freemem(), - cpu : cpu_info, - hostname : os.hostname(), - uptime : os.uptime(), - type : os.type(), - platform : os.platform(), - arch : os.arch(), - interaction : conf.REVERSE_INTERACT, - pm2_version : conf.PM2_VERSION, - node_version : node_version + process: procs, + server: { + loadavg: os.loadavg(), + total_mem: os.totalmem(), + free_mem: os.freemem(), + cpu: cpuMeta, + hostname: os.hostname(), + uptime: os.uptime(), + type: os.type(), + platform: os.platform(), + arch: os.arch(), + interaction: conf.REVERSE_INTERACT, + pm2_version: conf.PM2_VERSION, + node_version: nodeVersion } }; }; -Filter.monitoring = function(processes, conf) { - if (!processes) return null; - - var filter_procs = {}; - - processes.forEach(function(proc) { - filter_procs[Filter.getProcessID(conf.MACHINE_NAME, proc.pm2_env.name,proc.pm2_env.pm_id)] = [ +/** + * Normalize each process cpu and memory data + * @param {Object} processes process data extracted from pm2 daemon + * @param {Object} conf interactor configuration + */ +Filter.monitoring = function (processes, conf) { + if (!processes || processes.length === 0) return null; + var procs = {}; + + processes.forEach(function (proc) { + procs[Filter.getProcessID(conf.MACHINE_NAME, proc.pm2_env.name, proc.pm2_env.pm_id)] = [ Math.floor(proc.monit.cpu), Math.floor(proc.monit.memory) ]; }); return { - loadavg : os.loadavg(), - total_mem : os.totalmem(), - free_mem : os.freemem(), - processes : filter_procs + loadavg: os.loadavg(), + total_mem: os.totalmem(), + free_mem: os.freemem(), + processes: procs }; }; diff --git a/lib/Interactor/HttpRequest.js b/lib/Interactor/HttpRequest.js deleted file mode 100644 index 903dab79e..000000000 --- a/lib/Interactor/HttpRequest.js +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Copyright 2013 the PM2 project authors. All rights reserved. - * Use of this source code is governed by a license that - * can be found in the LICENSE file. - */ - -var http = require('http'); -var https = require('https'); -var debug = require('debug')('interface:http'); - -var HttpRequest = module.exports = {}; - -HttpRequest.post = function(opts, cb) { - if (!(opts.port && opts.data && opts.url)) - return cb({msg : 'missing parameters', port : opts.port, data : opts.data, url : opts.url}); - - var port = 0; - - var options = { - hostname : opts.url, - path : '/api/node/verifyPM2', - method : 'POST', - port : opts.port, - rejectUnauthorized: false, - headers: { - 'Content-Type': 'application/json', - 'Content-Length': Buffer.byteLength(JSON.stringify(opts.data)) - } - }; - - var client = (opts.port == 443) ? https : http; - - var req = client.request(options, function(res){ - var dt = ''; - - res.on('data', function (chunk) { - dt += chunk; - }); - - res.on('end',function(){ - try { - cb(null, JSON.parse(dt)); - } catch(e) { - cb(e); - } - }); - - res.on('error', function(e){ - cb(e); - }); - }); - - req.on('socket', function (socket) { - /** - * Configure request timeout - */ - socket.setTimeout(7000); - socket.on('timeout', function() { - debug('Connection timeout when retrieveing PM2 metadata', options); - req.abort(); - }); - }); - - req.on('error', function(e) { - cb(e); - }); - - req.write(JSON.stringify(opts.data)); - - req.end(); -}; diff --git a/lib/Interactor/InteractorDaemonizer.js b/lib/Interactor/InteractorClient.js similarity index 97% rename from lib/Interactor/InteractorDaemonizer.js rename to lib/Interactor/InteractorClient.js index ca6e582dc..5c7278251 100644 --- a/lib/Interactor/InteractorDaemonizer.js +++ b/lib/Interactor/InteractorClient.js @@ -150,11 +150,11 @@ InteractorDaemonizer.launchRPC = function (conf, cb) { * @param {Function} cb invoked with */ var daemonize = function (conf, infos, cb) { - var InteractorJS = path.resolve(path.dirname(module.filename), 'Daemon.js'); + var InteractorJS = path.resolve(path.dirname(module.filename), 'InteractorDaemon.js'); // Redirect PM2 internal err and out // to STDERR STDOUT when running with Travis - var testEnv = process.env.TRAVIS || process.env.NODE_ENV.match(/test/); + var testEnv = process.env.TRAVIS || (process.env.NODE_ENV && process.env.NODE_ENV.match(/test/)); var out = testEnv ? 1 : fs.openSync(conf.INTERACTOR_LOG_FILE_PATH, 'a'); var err = testEnv ? 2 : fs.openSync(conf.INTERACTOR_LOG_FILE_PATH, 'a'); @@ -278,13 +278,13 @@ InteractorDaemonizer.update = function (conf, cb) { }; /** - * Start or Restart the Interaction Daemon depending if its online or not + * Retrieve Interactor configuration from env, params and filesystem. * @param {Object} cst global constants - * @param {Object} infos data used to start the interactor [can be recovered from FS] - * @param {String} infos.secret_key the secret key used to cipher data - * @param {String} infos.public_key the public key used identify the user - * @param {String} infos.machine_name [optional] override name of the machine - * @param {Function} cb invoked with + * @param {Object} infos data used to start the interactor [optional] + * @param {String} infos.secret_key the secret key used to cipher data [optional] + * @param {String} infos.public_key the public key used identify the user [optional] + * @param {String} infos.machine_name override name of the machine [optional] + * @param {Function} cb invoked with */ InteractorDaemonizer.getOrSetConf = function (cst, infos, cb) { infos = infos || {}; diff --git a/lib/Interactor/InteractorDaemon.js b/lib/Interactor/InteractorDaemon.js new file mode 100644 index 000000000..162332838 --- /dev/null +++ b/lib/Interactor/InteractorDaemon.js @@ -0,0 +1,308 @@ + +'use strict'; + +var fs = require('fs'); +var rpc = require('pm2-axon-rpc'); +var axon = require('pm2-axon'); +var log = require('debug')('interactor:daemon'); +var os = require('os'); +var pkg = require('../../package.json'); +var dns = require('dns'); +var cst = require('../../constants.js'); +var ReverseInteractor = require('./ReverseInteractor.js'); +var PushInteractor = require('./PushInteractor.js'); +var Utility = require('./Utility.js'); +var Conf = require('../Configuration.js'); +var PM2Client = require('./PM2Client.js'); +var WebsocketTransport = require('./WebsocketTransport.js'); + +// use noop if not launched via IPC +if (!process.send) { + process.send = function () {}; +} + +var InteractorDaemon = module.exports = function () { + this.opts = this.retrieveConf(); + this.DAEMON_ACTIVE = false; + this.transport = new WebsocketTransport(this.opts, this); + this.transport.on('error', function (err) { + return console.error('[TRANSPORT] Error : ' + err.message || err); + }); + this.httpClient = new Utility.HTTPClient(); + this._online = true; +}; + +/** + * Get an interface for communicating with PM2 daemon + * @private + * @return {PM2Client} + */ +InteractorDaemon.prototype.getPM2Client = function () { + if (!this._ipm2) { + this._ipm2 = new PM2Client(); + } + return this._ipm2; +}; + +/** + * Terminate connections and exit + * @param {Error} err if provided, the exit code will be set to cst.ERROR_EXIT + */ +InteractorDaemon.prototype.exit = function (err) { + // clear workers + if (this._workerEndpoint) clearInterval(this._workerEndpoint); + if (this._workerConnectivity) clearInterval(this._workerConnectivity); + + // stop interactors + if (this.reverse) this.reverse.stop(); + if (this.push) this.push.stop(); + + // stop transport + if (this.transport) this.transport.disconnect(); + + this._ipm2.disconnect(function () { + console.log('Closed connection to PM2 bus and RPC server'); + }); + + this.pm2.disconnect(function () { + console.log('Closed connection to PM2 API'); + }); + + try { + fs.unlinkSync(cst.INTERACTOR_RPC_PORT); + fs.unlinkSync(cst.INTERACTOR_PID_PATH); + } catch (err) {} + + console.log('Exiting Interactor'); + + if (!this._rpc || !this._rpc.sock) { + return process.exit(cst.ERROR_EXIT); + } + + this._rpc.sock.close(function () { + console.log('RPC server closed'); + process.exit(err ? cst.ERROR_EXIT : cst.SUCCESS_EXIT); + }); +}; + +/** + * Start a RPC server and expose it throught a socket file + */ +InteractorDaemon.prototype.startRPC = function (opts) { + console.log('Launching Interactor RPC server (bind to %s)', cst.INTERACTOR_RPC_PORT); + + var self = this; + var rep = axon.socket('rep'); + var rpcServer = new rpc.Server(rep); + rep.bind(cst.INTERACTOR_RPC_PORT); + + rpcServer.expose({ + kill: function (cb) { + console.log('Shutdown request received via RPC'); + cb(null); + return self.exit(); + }, + passwordSet: function (cb) { + global._pm2_password_protected = true; + return cb(null); + }, + getInfos: function (cb) { + if (self.opts && self.DAEMON_ACTIVE === true) { + return cb(null, { + machine_name: self.opts.MACHINE_NAME, + public_key: self.opts.PUBLIC_KEY, + secret_key: self.opts.SECRET_KEY, + remote_host: self.transport._host, + connected: self.transport.isConnected(), + socket_path: cst.INTERACTOR_RPC_PORT, + pm2_home_monitored: cst.PM2_HOME + }); + } else { + return cb(null); + } + } + }); + return rpcServer; +}; + +/** + * Retrieve metadata about the system + */ +InteractorDaemon.prototype.getSystemMetadata = function () { + return { + MACHINE_NAME: this.opts.MACHINE_NAME, + PUBLIC_KEY: this.opts.PUBLIC_KEY, + RECYCLE: this.opts.RECYCLE || false, + PM2_VERSION: pkg.version, + MEMORY: os.totalmem() / 1000 / 1000, + HOSTNAME: os.hostname(), + CPUS: os.cpus() + }; +}; + +/** + * Is internet reachable via DNS + * @private + * @param {Function} cb invoked with [optional] + */ +InteractorDaemon.prototype._checkInternet = function (cb) { + var self = this; + dns.lookup('google.com', function (err) { + if (err && (err.code === 'ENOTFOUND' || err.code === 'EAI_AGAIN')) { + if (self._online) { + console.error('[CRITICAL] Internet is unreachable (via DNS lookup strategy)'); + } + self._online = false; + } else { + if (!self._online) { + console.log('[TENTATIVE] Internet is reachable again, re-connecting'); + // reconnect only if the connection is actually disconnected + if (!self.transport.isConnected()) { + self.transport.reconnect(); + } + } + self._online = true; + } + return typeof cb === 'function' ? cb(self._online) : 0; + }); +}; + +/** + * Ping root url to retrieve node info + * @private + * @param {Function} cb invoked with where Object is the response sended by the server + */ +InteractorDaemon.prototype._pingRoot = function (cb) { + var data = this.getSystemMetadata(); + data = Utility.Cipher.cipherMessage(JSON.stringify(data), this.opts.SECRET_KEY); + if (!data) return cb(new Error('Failed to retrieve/cipher system metadata')); + + this.httpClient.open({ + url: this.opts.ROOT_URL + '/api/node/verifyPM2', + method: 'POST', + data: { + public_id: this.opts.PUBLIC_KEY, + data: data + } + }, cb); +}; + +/** + * Ping root to verify retrieve and connect to the km endpoint + * @private + * @param {Function} cb invoked with + */ +InteractorDaemon.prototype._verifyEndpoint = function (cb) { + if (typeof cb !== 'function') cb = function () {}; + + var self = this; + this._pingRoot(function (err, data) { + if (err) return cb(err); + self.km_data = data; + + if (data.disabled === true || data.pending === true) { + return cb(new Error('Interactor disabled, contact us at contact@keymetrics.io for more informatios')); + } + if (data.active === false) return cb(null, false); + + if (!self.transport.isConnected()) { + self.transport.connect(data.endpoints.push, cb); + } else if (data.endpoints.push !== self.km_data.endpoints.push) { + self.transport.reconnect(cb); + } else { + return cb(null, true); + } + }); +}; + +/** + * Retrieve configuration from environnement + */ +InteractorDaemon.prototype.retrieveConf = function () { + var opts = {}; + + opts.MACHINE_NAME = process.env.PM2_MACHINE_NAME; + opts.PUBLIC_KEY = process.env.PM2_PUBLIC_KEY; + opts.SECRET_KEY = process.env.PM2_SECRET_KEY; + opts.RECYCLE = process.env.KM_RECYCLE ? JSON.parse(process.env.KM_RECYCLE) : false; + opts.PM2_VERSION = pkg.version; + + if (!opts.MACHINE_NAME) { + console.error('You must provide a PM2_MACHINE_NAME environment variable'); + process.exit(cst.ERROR_EXIT); + } else if (!opts.PUBLIC_KEY) { + console.error('You must provide a PM2_PUBLIC_KEY environment variable'); + process.exit(cst.ERROR_EXIT); + } else if (!opts.SECRET_KEY) { + console.error('You must provide a PM2_SECRET_KEY environment variable'); + process.exit(cst.ERROR_EXIT); + } + return opts; +}; + +/** + * Ping root url to retrieve node info + * @private + * @param {Function} cb invoked with [optional] + */ +InteractorDaemon.prototype.start = function (cb) { + var self = this; + this._ipm2 = new PM2Client(); + this.pm2 = require('../..'); + + this.pm2.connect(function (err) { + return err ? console.error(err) : console.log('Connected to PM2'); + }); + + this._rpc = this.startRPC(); + + this.opts.ROOT_URL = cst.KEYMETRICS_ROOT_URL; + if (cst.DEBUG) { + this.opts.ROOT_URL = 'http://127.0.0.1' + (process.env.NODE_ENV === 'test' ? ':3400' : ':3000'); + } + + if (Conf.getSync('pm2:passwd')) { + global._pm2_password_protected = true; + } + + this._verifyEndpoint(function (err, result) { + if (err) { + console.error('Error while trying to retrieve endpoints : ' + (err.message || err)); + process.send({ error: true, msg: err.message || err }); + return self.exit(); + } + if (result === false) return self.exit(); + + // send data over IPC for CLI feedback + process.send({ + error: false, + km_data: self.km_data, + online: true, + pid: process.pid, + machine_name: self.opts.MACHINE_NAME, + public_key: self.opts.PUBLIC_KEY, + secret_key: self.opts.SECRET_KEY, + reverse_interaction: true + }); + + // start workers + self._workerConnectivity = setInterval(self._checkInternet.bind(self), 10000); + self._workerEndpoint = setInterval(self._verifyEndpoint.bind(self), 60000 * 10); + // start interactors + self.push = new PushInteractor(self.opts, self._ipm2, self.transport); + self.reverse = new ReverseInteractor(self.opts, self.pm2, self.transport); + self.push.start(); + self.reverse.start(); + // TODO: start Watchdog + }); +}; + +// If its the entry file launch the daemon +// otherwise we just required it to use a function +if (require.main === module) { + process.title = 'PM2: KM Agent (' + process.env.PM2_HOME + ')'; + require('../Utility.js').overrideConsole(); + console.log('[Keymetrics.io] Launching agent'); + new InteractorDaemon().start(); +} + diff --git a/lib/Interactor/pm2-interface.js b/lib/Interactor/PM2Client.js similarity index 50% rename from lib/Interactor/pm2-interface.js rename to lib/Interactor/PM2Client.js index 17d40303a..3bae197c0 100644 --- a/lib/Interactor/pm2-interface.js +++ b/lib/Interactor/PM2Client.js @@ -4,80 +4,68 @@ * can be found in the LICENSE file. */ -/** - * Dependencies - */ +'use strict'; var axon = require('pm2-axon'); -var cst = require('../../constants.js'); -var util = require('util'); -var rpc = require('pm2-axon-rpc'); -var log = require('debug')('pm2:interface'); +var cst = require('../../constants.js'); +var util = require('util'); +var rpc = require('pm2-axon-rpc'); +var log = require('debug')('pm2:interface'); var EventEmitter = require('events').EventEmitter; /** - * Export with conf + * PM2 API Wrapper used to setup connection with the daemon + * @param {Object} opts options + * @param {String} opts.sub_port socket file of the PM2 bus [optionnal] + * @param {String} opts.rpc_port socket file of the PM2 RPC server [optionnal] */ -module.exports = function(opts){ - var sub_port = opts && opts.sub_port || cst.DAEMON_PUB_PORT; - var rpc_port = opts && opts.rpc_port || cst.DAEMON_RPC_PORT; - - return new IPM2(sub_port, rpc_port); -}; +var PM2Wrapper = function (opts) { + var subSocket = opts && opts.sub_port || cst.DAEMON_PUB_PORT; + var rpcSocket = opts && opts.rpc_port || cst.DAEMON_RPC_PORT; -/** - * IPM2, Pm2 Interface - */ - -var IPM2 = function(sub_port, rpc_port) { - if (!(this instanceof IPM2)) return new IPM2(sub_port, rpc_port); var self = this; EventEmitter.call(this); - this.sub_port = sub_port; - this.rpc_port = rpc_port; - - var sub = axon.socket('sub-emitter'); - var sub_sock = this.sub_sock = sub.connect(sub_port); - this.bus = sub; + this.sub_sock = sub.connect(subSocket); + this.bus = sub; var req = axon.socket('req'); - var rpc_sock = this.rpc_sock = req.connect(rpc_port); + this.rpc_sock = req.connect(rpcSocket); this.rpc_client = new rpc.Client(req); this.rpc = {}; - rpc_sock.on('connect', function() { - log('rpc_sock:ready'); + this.rpc_sock.on('connect', function () { + log('PM2 API Wrapper connected to PM2 Daemon via RPC'); self.emit('rpc_sock:ready'); - generateMethods(function() { + generateMethods(function () { self.emit('ready'); }); }); - rpc_sock.on('close', function() { + this.rpc_sock.on('close', function () { log('rpc_sock:closed'); self.emit('close'); }); - rpc_sock.on('reconnect attempt', function() { + this.rpc_sock.on('reconnect attempt', function () { log('rpc_sock:reconnecting'); self.emit('reconnecting'); }); - sub_sock.on('connect', function() { + this.sub_sock.on('connect', function () { log('sub_sock ready'); self.emit('sub_sock:ready'); }); - sub_sock.on('close', function() { + this.sub_sock.on('close', function () { log('sub_sock:closed'); self.emit('closed'); }); - sub_sock.on('reconnect attempt', function() { + this.sub_sock.on('reconnect attempt', function () { log('sub_sock:reconnecting'); self.emit('reconnecting'); }); @@ -95,28 +83,28 @@ var IPM2 = function(sub_port, rpc_port) { * Generate method by requesting exposed methods by PM2 * You can now control/interact with PM2 */ - var generateMethods = function(cb) { + var generateMethods = function (cb) { log('Requesting and generating RPC methods'); - self.rpc_client.methods(function(err, methods) { - Object.keys(methods).forEach(function(key) { - var method_signature, md; - method_signature = md = methods[key]; + self.rpc_client.methods(function (err, methods) { + if (err) return cb(err); + Object.keys(methods).forEach(function (key) { + var method = methods[key]; - log('+-- Creating %s method', md.name); + log('+-- Creating %s method', method.name); - (function(name) { - self.rpc[name] = function() { - log(name); + (function (name) { + self.rpc[name] = function () { var args = Array.prototype.slice.call(arguments); args.unshift(name); self.rpc_client.call.apply(self.rpc_client, args); }; - })(md.name); - + })(method.name); }); return cb(); }); }; }; -util.inherits(IPM2, EventEmitter); +util.inherits(PM2Wrapper, EventEmitter); + +module.exports = PM2Wrapper; diff --git a/lib/Interactor/PushInteractor.js b/lib/Interactor/PushInteractor.js index e12cf73bf..9b2a57b88 100644 --- a/lib/Interactor/PushInteractor.js +++ b/lib/Interactor/PushInteractor.js @@ -1,369 +1,187 @@ -/** - * Copyright 2013 the PM2 project authors. All rights reserved. - * Use of this source code is governed by a license that - * can be found in the LICENSE file. - */ -var axon = require('pm2-axon'); -var os = require('os'); -var debug = require('debug')('interface:push-interactor'); -var debugInfo = require('debug')('interface:push:delay'); -var util = require('util'); -var Url = require('url'); -var fs = require('fs'); -var path = require('path'); +'use strict'; -var pkg = require('../../package.json'); -var cst = require('../../constants.js'); -var Filter = require('./Filter.js'); -var Cipher = require('./Cipher.js'); -var Utility = require('../Utility.js'); -var InteractorUtility = require('./Utility.js'); +var debug = require('debug')('interactor:push-interactor'); +var fs = require('fs'); +var path = require('path'); +var cst = require('../../constants.js'); +var Filter = require('./Filter.js'); +var Utility = require('./Utility.js'); var Aggregator = require('./TransactionAggregator.js'); /** - * Instanciate a new axon connection + * PushInteractor is the class that handle pushing data to KM + * @param {Object} opts interactor options + * @param {PM2Client} ipm2 pm2 daemon client used to listen on bus + * @param {WebsocketTransport} transport websocket transport used to send data to KM */ -function setupConnection(host) { - var that = this; - - this._setup = function(host) { - console.log('[PUSH] Connecting %s:%s', host, cst.REMOTE_PORT_TCP); - - var client = this.client = axon.socket('pub'); - - this.host = host; - - client.on('connect', function() { - console.log('[PUSH] Connected'); - }); - - client.on('error', function(e) { - console.log('[PUSH] Client got error', e.message); - }); - - client.on('close', function(e) { - console.log('[PUSH] Connection closed'); - }); - - client.on('reconnect attempt', function(e) { - console.log('[PUSH] Reconnecting'); - }); - - client.connect(cst.REMOTE_PORT_TCP, host); - }; - - this.destroy = function() { - this.client.close(); - this.client.removeAllListeners(); - }; - - this.reconnect = function() { - this.destroy(); - this._setup(this.host); - }; - - this._setup(host); - - return this; -}; - -var PushInteractor = module.exports = { - /** - * Connect to target host or reconnect if null is passed - * the host param must be formated like (http://HOST:PORT) - */ - connectRemote : function(hostname) { - if (hostname) - hostname = Url.parse(hostname).hostname; - else if (this.socket && this.socket.host) - hostname = this.socket.host; - else - return console.error('NO HOST DEFINED'); - - if (this.socket) - this.socket.destroy(); - - this.socket = setupConnection(hostname); - }, - /** - * Start the PushInteractor Singleton - */ - start : function(p) { - if (!p.url) - throw new Error('missing endpoint url'); - if (!p.conf || !p.conf.ipm2) - throw new Error('ipm2 is not initialized'); - - var self = this; - - this.conf = p.conf; - this.ipm2 = p.conf.ipm2; - this.send_buffer = []; - this._reconnect_counter = 0; - - if (process.env.PM2_DEBUG) - cst.REMOTE_PORT_TCP = 3900; - if (process.env.NODE_ENV == 'local_test') - cst.REMOTE_PORT_TCP = 8080; - - this.resetPacket(); - - this.connectRemote(p.url); - - this.ipm2.on('ready', function() { - console.log('[PUSH] PM2 interface ready, listening to PM2'); - self.listenToPM2Events(); - }); - - self.startPoolingWorker(); - self.cache = new InteractorUtility.Cache({ - miss: function (key) { - try { - var content = fs.readFileSync(path.resolve(key)); - return content.toString().split(/\r?\n/); - } catch (err) { - debug('Error while trying to get file from FS : %s', err.message || err) - return undefined; - } - } - }) - self.stackParser = new InteractorUtility.StackTraceParser({ cache: self.cache, context: cst.CONTEXT_ON_ERROR}); - self.aggregator = new Aggregator(self); - }, - /** - * Send bufferized data at regular interval - */ - startPoolingWorker : function() { - var self = this; - - setInterval(function() { - debug('[PUSH] +---- Pooling: sending data ----+'); - PushInteractor.sendData(); - }, cst.SEND_INTERVAL); - }, - /** - * Send profiling file asynchronously - */ - sendFile : function(packet) { - var self = this; - var file = JSON.parse(JSON.stringify(packet.data.return.dump_file)); - - var meta = { - pm_id : packet.process.pm_id, - name : packet.process.name, - server_name : PushInteractor.conf.MACHINE_NAME, - public_key : self.conf.PUBLIC_KEY - }; - - if (packet.data.return.heapdump === true) - meta.heapdump = true; - if (packet.data.return.cpuprofile === true) - meta.cpuprofile = true; - - fs.readFile(file, function(err, data) { - if (err) return console.error(err.stack || err); - fs.unlink(file, function(e) { if (e) console.error(e.stack || e);}); - return self.socket.client.send(JSON.stringify(meta), data); - }); - }, - listenToPM2Events : function() { - var self = this; - - self.log_buffer = {}; - - this.ipm2.bus.on('*', function(event, packet) { - if (event == 'axm:action') return false; - - // keep log in a buffer - if (event.match(/^log:/)) { - if (!self.log_buffer[packet.process.pm_id]) - self.log_buffer[packet.process.pm_id] = []; - // push the log data - self.log_buffer[packet.process.pm_id].push(packet.data); - // delete the last one if too long - if (self.log_buffer[packet.process.pm_id].length >= cst.LOGS_BUFFER) - self.log_buffer[packet.process.pm_id].pop(); - - // don't buffer if not asked - if (!global._logs) return false; - } - - // attach additional info on exception - if (event === 'process:exception') { - packet.data.last_logs = self.log_buffer[packet.process.pm_id]; - - // try to parse stacktrace and attach callsite + context if available - if (typeof(packet.data.stackframes) === 'object') { - var result = self.stackParser.parse(packet.data.stackframes); - // no need to send it since there is already the stacktrace - delete packet.data.stackframes; - if (result) { - packet.data.callsite = result.callsite || undefined; - packet.data.context = result.context || undefined; - } - } - } - - /** - * This is a heapdump action - */ - if (event == 'axm:reply' && packet.data && packet.data.return && (packet.data.return.heapdump || packet.data.return.cpuprofile)) { - PushInteractor.sendFile(packet); - return false; - } - - if (event == 'human:event') { - packet.name = packet.data.__name; - delete packet.data.__name; +var PushInteractor = module.exports = function (opts, ipm2, transport) { + this._ipm2 = ipm2; + this.transport = transport; + this.opts = opts; + this.log_buffer = {}; + this.broadcast_logs = false; + + this._cacheFS = new Utility.Cache({ + miss: function (key) { + try { + var content = fs.readFileSync(path.resolve(key)); + return content.toString().split(/\r?\n/); + } catch (err) { + return debug('Error while trying to get file from FS : %s', err.message || err); } + } + }); + this._stackParser = new Utility.StackTraceParser({ cache: this._cacheFS, context: cst.CONTEXT_ON_ERROR }); + // start transaction aggregator + this.aggregator = new Aggregator(this); +}; - if (!packet.process) - return console.error('No process field [%s]', event); - - /** - * Process specific messages - * -- Reformat raw output of pm2-interface - */ - packet.process = { - pm_id : packet.process.pm_id, - name : packet.process.name, - rev : packet.process.rev || ((packet.process.versioning && packet.process.versioning.revision) ? packet.process.versioning.revision : null), - server: PushInteractor.conf.MACHINE_NAME - }; - - // agregate transaction data before sending them - if (event.indexOf('axm:trace') > -1) - return self.aggregator.aggregate(packet); - - if (event.match(/^log:/)) { - packet.log_type = event.split(':')[1]; - event = 'logs'; - } - return PushInteractor.bufferData(event, packet); - }); - }, - resetPacket : function() { - var self = this; - - this._packet = { - 'server_name' : self.conf.MACHINE_NAME, - 'status' : {}, - 'monitoring' : {} - }; - }, - bufferData : function(event, packet) { - var self = this; - var logs_limit_size = 1024 * 50; +/** + * Start the interactor by starting all workers and listeners + */ +PushInteractor.prototype.start = function () { + // stop old running task + if (this._worker_executor !== undefined) { + this.stop(); + } + this._worker(); + this._worker_executor = setInterval(this._worker.bind(this), cst.STATUS_INTERVAL); + this._ipm2.bus.on('*', this._onPM2Event.bind(this)); +}; - // if (Object.keys(self._packet).indexOf(event) == -1) { - // return console.error('SKIP unknown field name [%s]', event); - // } - debug('Buffering one more event %s', event); +/** + * Stop the interactor by removing all running worker and listeners + */ +PushInteractor.prototype.stop = function () { + if (this._worker_executor !== undefined) { + clearInterval(this._worker_executor); + this._worker_executor = null; + } +}; - if (!(event in self._packet)) - self._packet[event] = []; +/** + * Listener of pm2 bus + * @param {String} event channel + * @param {Object} packet data + */ +PushInteractor.prototype._onPM2Event = function (event, packet) { + if (event === 'axm:action') return false; - if (packet.process && !packet.server) { - if (event === 'logs' - && (JSON.stringify(self._packet[event]).length > logs_limit_size - || self._packet[event].length > 100)) - return console.error('Logs packet larger than 50KB limit'); + // Drop transitional state processes (_old_*) + if (packet && packet.process && packet.process.pm_id && typeof packet.process.pm_id === 'string' && + packet.process.pm_id.indexOf('_old') > -1) return false; - self._packet[event].push(packet); + // bufferize logs + if (event.match(/^log:/)) { + if (!this.log_buffer[packet.process.pm_id]) { + this.log_buffer[packet.process.pm_id] = []; } - else { - console.error('Got packet without any process'); + // push the log data + this.log_buffer[packet.process.pm_id].push(packet.data); + // delete the last one if too long + if (this.log_buffer[packet.process.pm_id].length >= cst.LOGS_BUFFER) { + this.log_buffer[packet.process.pm_id].pop(); } - return false; - }, - preparePacket : function(cb) { - var self = this; - this.ipm2.rpc.getMonitorData({}, function(err, processes) { - if (!processes) - return console.error('Cant access to getMonitorData RPC PM2 method'); - - var ret = null; + // don't send logs if not enabled + if (!this.broadcast_logs) return false; + } - if ((ret = Filter.monitoring(processes, PushInteractor.conf))) { - self._packet['monitoring'] = ret; + // attach additional info for exception + if (event === 'process:exception') { + packet.data.last_logs = this.log_buffer[packet.process.pm_id]; + + // try to parse stacktrace and attach callsite + context if available + if (typeof packet.data.stackframes === 'object') { + var result = this.stackParser.parse(packet.data.stackframes); + // no need to send it since there is already the stacktrace + delete packet.data.stackframes; + if (result) { + packet.data.callsite = result.callsite || undefined; + packet.data.context = result.context || undefined; } + } + } - if ((ret = Filter.status(processes, PushInteractor.conf))) { - self._packet['status'] = { - data : ret, - server_name : self.conf.MACHINE_NAME, - internal_ip : self.conf.internal_ip, - protected : global._pm2_password_protected, - rev_con : self.conf.rev_con - }; - } + if (event === 'axm:reply' && packet.data && packet.data.return && (packet.data.return.heapdump || packet.data.return.cpuprofile)) { + return this._sendFile(packet); + } - return cb ? cb(null, ret) : false; - }); - }, - /** - * Description - * @method send_data - * @return - */ - sendData : function() { - var self = this; + if (event === 'human:event') { + packet.name = packet.data.__name; + delete packet.data.__name; + } - if (self.socket.client && - self.socket.client.socks[0] && - self.socket.client.socks[0].bufferSize > 290000) { - self.resetPacket(); - self._reconnect_counter++; - console.log('Buffer size too high (%d), stopping buffering and sending', self.socket.client.socks[0].bufferSize); + if (!packet.process) return console.error('No process field [%s]', event); - if (self._reconnect_counter > 20) { - console.log('[PUSH] Forcing reconnection'); - self._reconnect_counter = 0; - self.socket.reconnect(); - } - return false; - } + // Normalize data + packet.process = { + pm_id: packet.process.pm_id, + name: packet.process.name, + rev: packet.process.rev || ((packet.process.versioning && packet.process.versioning.revision) ? packet.process.versioning.revision : null), + server: this.opts.MACHINE_NAME + }; - this.preparePacket(function() { - var data = {}; + // agregate transaction data before sending them + if (event.indexOf('axm:trace') > -1) return this.aggregator.aggregate(packet); - if (process.env.NODE_ENV && - (process.env.NODE_ENV == 'test' || process.env.NODE_ENV == 'local_test')) { - data = { - public_key : PushInteractor.conf.PUBLIC_KEY, - sent_at : Utility.getDate(), - data : self._packet - }; - } - else { - var cipheredData = Cipher.cipherMessage(JSON.stringify(self._packet), - PushInteractor.conf.SECRET_KEY); - data = { - public_key : self.conf.PUBLIC_KEY, - sent_at : Utility.getDate(), - data : cipheredData - }; - } + if (event.match(/^log:/)) { + packet.log_type = event.split(':')[1]; + event = 'logs'; + } - var str = JSON.stringify(data); - var t1 = new Date(); + return this.transport.send(event, packet); +}; - self.resetPacket(); +/** + * Worker function that will retrieve process metadata and send them to KM + */ +PushInteractor.prototype._worker = function () { + var self = this; + this._ipm2.rpc.getMonitorData({}, function (err, processes) { + if (err || !processes) { + return console.error(err || 'Cant access to getMonitorData RPC PM2 method'); + } - if (!self.socket) return false; + var monitoring = Filter.monitoring(processes, self.opts); + if (monitoring) { + self.transport.send('monitoring', monitoring); + } - self.socket.client.sendv2(str, function() { - var duration_sec = (new Date() - t1) / 1000; - debugInfo('Time to flush data %ds (buffer size %d)', duration_sec); + var status = Filter.status(processes, self.opts); + if (status) { + self.transport.send('status', { + data: status, + server_name: self.opts.MACHINE_NAME, + internal_ip: self.opts.internal_ip, + protected: global._pm2_password_protected, + rev_con: true + }); + } + }); +}; - if (duration_sec > 1) - console.info('[WARN] Time to send data over TCP took %dseconds!', duration_sec); +/** + * Handle packet containing file metadata to send to KM + */ +PushInteractor.prototype._sendFile = function (packet) { + var self = this; + var filePath = JSON.parse(JSON.stringify(packet.data.return.dump_file)); + var type = packet.data.return.heapdump ? 'heapdump' : 'cpuprofile'; + + packet = { + pm_id: packet.process.pm_id, + name: packet.process.name, + server_name: PushInteractor.conf.MACHINE_NAME, + public_key: self.conf.PUBLIC_KEY, + type: type + }; - data = null; - str = null; - }); - }); - } + fs.readFile(filePath, 'base64', function (err, data) { + if (err) return console.error(err); + fs.unlink(filePath, console.error); + packet.data = data; + return self.transport.send(type, packet); + }); }; diff --git a/lib/Interactor/RemoteActions/CustomActions.js b/lib/Interactor/RemoteActions/CustomActions.js deleted file mode 100644 index 697493971..000000000 --- a/lib/Interactor/RemoteActions/CustomActions.js +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Copyright 2013 the PM2 project authors. All rights reserved. - * Use of this source code is governed by a license that - * can be found in the LICENSE file. - */ - -var debug = require('debug')('interface:driver'); -var Cipher = require('../Cipher.js'); - -var CustomActions = module.exports = { - /** - * Method to trigger custom actions (axm actions) - */ - axmCustomActions : function() { - var self = this; - - this.socket.data('trigger:action', function(raw_msg) { - var msg = {}; - - if (process.env.NODE_ENV && (process.env.NODE_ENV == 'test' || - process.env.NODE_ENV == 'local_test')) - msg = raw_msg; - else - msg = Cipher.decipherMessage(raw_msg, self.conf.SECRET_KEY); - - if (!msg) return console.error('Error while receiving message! #axmCustomActions'); - - console.log('New remote action %s triggered for process %s', msg.action_name, msg.process_id); - self.pm2_instance.msgProcess({ - id : msg.process_id, - msg : msg.action_name, - opts: msg.opts || null - }, function(err, data) { - if (err) { - return self.socket.send('trigger:action:failure', { - success : false, - err : err.message, - id : msg.process_id, - action_name : msg.action_name - }); - } - console.log('[REVERSE INTERACTOR] Message received from AXM for proc_id : %s and action name %s', - msg.process_id, msg.action_name); - - return self.socket.send('trigger:action:success', { - success : true, - id : msg.process_id, - action_name : msg.action_name - }); - }); - }); - - this.socket.data('trigger:scoped_action', function(raw_msg) { - var msg = {}; - - if (process.env.NODE_ENV && (process.env.NODE_ENV == 'test' || - process.env.NODE_ENV == 'local_test')) - msg = raw_msg; - else - msg = Cipher.decipherMessage(raw_msg, self.conf.SECRET_KEY); - - if (!msg) return console.error('Error while receiving message! #axmCustomActions'); - - console.log('New SCOPED action %s triggered for process %s', msg.action_name, msg.process.pm_id); - - self.pm2_instance.msgProcess({ - id : msg.process.pm_id, - action_name : msg.action_name, - msg : msg.action_name, - opts : msg.options || {}, - uuid : msg.uuid - }, function(err, data) { - if (err) { - return self.socket.send('trigger:action:failure', { - success : false, - err : err.message, - id : msg.process.pm_id, - action_name : msg.action_name - }); - } - console.log('[REVERSE INTERACTOR] Message received from AXM for proc_id : %s and action name %s', - msg.process_id, msg.action_name); - - return self.socket.send('trigger:action:success', { - success : true, - id : msg.process.pm_id, - action_name : msg.action_name - }); - }); - }); - } -}; diff --git a/lib/Interactor/RemoteActions/Pm2Actions.js b/lib/Interactor/RemoteActions/Pm2Actions.js deleted file mode 100644 index 3d62fd809..000000000 --- a/lib/Interactor/RemoteActions/Pm2Actions.js +++ /dev/null @@ -1,336 +0,0 @@ -/** - * Copyright 2013 the PM2 project authors. All rights reserved. - * Use of this source code is governed by a license that - * can be found in the LICENSE file. - */ - -var debug = require('debug')('interface:driver'); -var Url = require('url'); -var Cipher = require('../Cipher.js'); -var PushInteractor = require('../PushInteractor'); -var Conf = require('../../Configuration.js'); -var Password = require('../Password.js'); - -/** - * Allowed remote PM2 methods - * with options - * - password_required : force to pass a password in parameter - * - password_optional : if a password is set, force it - * - lock : enable the locking system (block parallel commands) - */ -var PM2_REMOTE_METHOD_ALLOWED = { - 'restart' : {}, - 'reload' : {}, - 'gracefulReload' : {}, - 'reset' : {}, - 'scale' : {}, - - 'install' : { password_required : true }, - 'uninstall' : { password_required : true }, - 'stop' : { password_required : true }, - 'delete' : { password_required : true }, - 'set' : {}, - 'multiset' : {}, - 'deepUpdate' : { password_required : true }, - - 'pullAndRestart' : { password_optional : true }, - 'forward' : { password_optional : true }, - 'backward' : { password_optional : true }, - - 'startLogging' : {}, - 'stopLogging' : {}, - - // This is just for testing purproses - 'ping' : { password_required : true } -}; - -var Pm2Actions = module.exports = { - /** - * Methods to trigger PM2 actions from remote - */ - pm2Actions : function() { - var self = this; - - function executionBox(msg, cb) { - /** - * Exemple - * msg = { - * method_name : 'restart', - * parameters : {} - * } - */ - console.log('PM2 action from remote triggered "pm2 %s %j"', - msg.method_name, - msg.parameters); - - var method_name = JSON.parse(JSON.stringify(msg.method_name)); - - var parameters = ''; - - try { - parameters = JSON.parse(JSON.stringify(msg.parameters)); - } - catch(e) { - console.error(e.stack || e); - parameters = msg.parameters; - } - - if (!method_name) { - console.error('no method name'); - return cb(new Error('no method name defined')); - } - - if (!PM2_REMOTE_METHOD_ALLOWED[method_name]) { - console.error('method %s not allowed', method_name); - return cb(new Error('method ' + method_name + ' not allowed')); - } - - if (method_name === 'startLogging') { - global._logs = true; - // Stop streaming logs automatically after timeout - setTimeout(function() { - global._logs = false; - }, 120000); - return cb(null, 'Log streaming enabled'); - } else if (method_name === 'stopLogging') { - global._logs = false; - return cb(null, 'Log streaming disabled'); - } - - self.pm2_instance.remote(method_name, parameters, cb); - return false; - } - - function sendBackResult(data) { - self.socket.send('trigger:pm2:result', data); - }; - - this.socket.data('trigger:pm2:action', function(raw_msg) { - var d = require('domain').create(); - - var msg = {}; - - /** - * Uncipher Data - */ - if (process.env.NODE_ENV && - (process.env.NODE_ENV == 'test' || - process.env.NODE_ENV == 'local_test')) - msg = raw_msg; - else - msg = Cipher.decipherMessage(raw_msg, self.conf.SECRET_KEY); - - d.on('error', function(e) { - console.error('Error caught in domain'); - console.error(e.stack || e); - - /** - * Send error back to - */ - sendBackResult({ - ret : { - err : e, - data : null - }, - meta : { - method_name : msg.method_name, - app_name : msg.parameters.name, - machine_name : self.conf.MACHINE_NAME, - public_key : self.conf.PUBLIC_KEY - } - }); - }); - - d.run(function() { - if (!msg) - throw new Error('Wrong SECRET KEY to uncipher package'); - - /** - * Execute command - */ - executionBox(msg, function(err, data) { - if (err) console.error(err.stack || JSON.stringify(err)); - - /** - * Send back the result - */ - sendBackResult({ - ret : { - err : err, - data : data || null - }, - meta : { - method_name : msg.method_name, - app_name : msg.parameters.name, - machine_name : self.conf.MACHINE_NAME, - public_key : self.conf.PUBLIC_KEY - } - }); - }); - }); - - }); - }, - - /**************************************************** - * - * - * Scoped PM2 Actions with streaming and multi args - * - * - ****************************************************/ - pm2ScopedActions : function() { - var self = this; - - this.socket.data('trigger:pm2:scoped:action', function(raw_msg) { - var msg = {}; - - if (process.env.NODE_ENV && (process.env.NODE_ENV == 'test' || - process.env.NODE_ENV == 'local_test')) - msg = raw_msg; - else { - /** - * Uncipher Data - */ - msg = Cipher.decipherMessage(raw_msg, self.conf.SECRET_KEY); - } - - if (!msg.uuid || - !msg.action_name) { - console.error('PM2 Scoped: Parameter missing!'); - return sendEvent('pm2:scoped:error', { - at : Date.now(), - out : 'Parameter missing', - msg : msg.uuid || null - }); - } - - sendEvent('pm2:scoped:stream', { - at : Date.now(), - out : 'Action ' + msg.action_name + ' received', - uuid : msg.uuid - }); - - executionBox(msg, function(err, data) { - if (err) { - console.error(err.stack || err); - return sendEvent('pm2:scoped:error', { - at : Date.now(), - out : err.stack || err, - uuid : msg.uuid - }); - } - return sendEvent('pm2:scoped:end', { - at : Date.now(), - out : data, - uuid : msg.uuid - }); - }); - }); - - /** - * Compact event in Push Interactor *pipe* - */ - function sendEvent(event, data) { - var packet = { - at : Date.now(), - data : { - data : data.out, - uuid : data.uuid - } - }; - - if (!PushInteractor._packet[event]) - PushInteractor._packet[event] = []; - - PushInteractor._packet[event].push(packet); - - if (process.env.NODE_ENV == 'local_test') - process.send({event : event, data : data}); - }; - - /** - * Processing - */ - function executionBox(msg, cb) { - var action_name = msg.action_name; - var opts = msg.options; - - if (!PM2_REMOTE_METHOD_ALLOWED[action_name]) { - console.error('method %s not allowed', action_name); - return cb(new Error('method ' + action_name + ' not allowed')); - } - - var action_conf = PM2_REMOTE_METHOD_ALLOWED[action_name]; - - /** - * Password checking - */ - if (action_conf.password_required === true) { - if (!msg.password) { - console.error('Missing password in query'); - return cb('Missing password in query'); - } - - var passwd = Conf.getSync('pm2:passwd'); - - if (passwd === null) { - console.error('Password at PM2 level is missing'); - return cb('Password at PM2 level is missing please set password via pm2 set pm2:passwd '); - } - - if (Password.verify(msg.password, passwd) != true) { - console.error('Password does not match'); - return cb('Password does not match'); - } - } - - if (action_conf.lock === false) - opts.lock = false; - - /** - * Fork the remote action in another process - * so we can catch the stdout/stderr and emit it - */ - var fork = require('child_process').fork; - - process.env.fork_params = JSON.stringify({ action : action_name, opts : opts}); - - console.log('Executing: pm2 %s %s', action_name, opts.args ? opts.args.join(' ') : ''); - - var app = fork(__dirname + '/ScopedExecution.js', [], { - silent : true - }); - - app.stdout.on('data', function(dt) { - console.log(dt.toString()); - sendEvent('pm2:scoped:stream', { - at : Date.now(), - out : dt.toString(), - uuid : msg.uuid - }); - }); - - app.once('error', function(dt) { - console.error('Error got?', dt); - sendEvent('pm2:scoped:error', { - at : Date.now(), - out : 'Shit happening ' + JSON.stringify(dt), - msg : msg.uuid - }); - }); - - app.on('message', function(dt) { - var ret = JSON.parse(dt); - if (ret.isFinished != true) return false; - - console.log('Action %s finished (err= %s)', - action_name, ret.err); - return cb(ret.err, ret.dt); - }); - - return false; - } - - } -}; diff --git a/lib/Interactor/RemoteActions/ScopedExecution.js b/lib/Interactor/RemoteActions/ScopedExecution.js deleted file mode 100644 index 08c962e36..000000000 --- a/lib/Interactor/RemoteActions/ScopedExecution.js +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Copyright 2013 the PM2 project authors. All rights reserved. - * Use of this source code is governed by a license that - * can be found in the LICENSE file. - */ - -var pm2 = require('../../..'); -var domain = require('domain'); -var Utility = require('../../Utility.js'); - -var d = domain.create(); - -d.once('error', function(err) { - process.send(JSON.stringify({err: err.stack, isFinished : true})); -}); - -d.run(function() { - var params = JSON.parse(process.env.fork_params); - - console.log('Executing: pm2 %s %s', - params.action, - params.opts.args ? params.opts.args.join(' ') : ''); - - pm2.connect(function() { - pm2.remoteV2(params.action, params.opts, function(err, dt) { - process.send(JSON.stringify(Utility.clone({ - err: err, - dt: dt, - isFinished : true - }))); - pm2.disconnect(process.exit); - }); - }); -}); diff --git a/lib/Interactor/ReverseInteractor.js b/lib/Interactor/ReverseInteractor.js index 288670c01..2b1e29b2f 100644 --- a/lib/Interactor/ReverseInteractor.js +++ b/lib/Interactor/ReverseInteractor.js @@ -1,124 +1,253 @@ + +'use strict'; + +var log = require('debug')('interactor:reverse-interactor'); +var path = require('path'); +var Conf = require('../Configuration.js'); +var Password = require('./Password.js'); +var fork = require('child_process').fork; + +var PM2_REMOTE_METHOD_ALLOWED = { + 'restart': false, + 'reload': false, + 'gracefulReload': false, + 'reset': false, + 'scale': false, + + 'install': true, + 'uninstall': true, + 'stop': true, + 'delete': true, + 'set': false, + 'multiset': false, + 'deepUpdate': true, + + 'pullAndRestart': true, + 'forward': true, + 'backward': true, + + 'startLogging': false, + 'stopLogging': false, + + // This is just for testing purproses + 'ping': true +}; + /** - * Copyright 2013 the PM2 project authors. All rights reserved. - * Use of this source code is governed by a license that - * can be found in the LICENSE file. + * ReverseInteractor is the class that handle receiving event from KM + * @param {Object} opts interactor options + * @param {PM2} pm2 pm2 api + * @param {WebsocketTransport} transport websocket transport used to receive data to KM */ +var ReverseInteractor = module.exports = function (opts, pm2, transport) { + this.pm2 = pm2; + this.transport = transport; + this.opts = opts; +}; -var debug = require('debug')('interface:driver'); -var nssocket = require('nssocket'); -var Url = require('url'); -var Cipher = require('./Cipher.js'); -var util = require('util'); - -var ReverseInteract = { - changeUrl : function(url) { - if (!this.connected) return; - console.log('[REV] Changing URL to %s', url); - - this.network = Url.parse(url); - this.socket.connect(parseInt(this.network.port), this.network.hostname); - this.socket.reconnect(); - }, - destroy : function() { - this.socket.destroy(); - }, - start : function(opts) { - var self = this; - - if (!opts.url) - throw new Error('url not declared'); - if (!opts.conf) - throw new Error('Conf not passed to ReverseInteractor'); - - this.connected = false; - this.conf = opts.conf; - this.network = Url.parse(opts.url); - this.pm2_instance = opts.conf.pm2_instance; - - this.socket = new nssocket.NsSocket({ - type : 'tcp4', - reconnect : true, - retryInterval : 100, - max : Infinity, - maxListeners : 50 - }); +ReverseInteractor.prototype.stop = function () { + this.transport.removeAllListeners('trigger:scoped_action'); + this.transport.removeAllListeners('trigger:actio'); + this.transport.removeAllListeners('trigger:pm2:action'); + this.transport.removeAllListeners('trigger:pm2:scoped:action'); +}; - this.socket.on('error', function(e) { - self.connected = false; - console.error('[REV] %s', e.message || e); - }); +ReverseInteractor.prototype.start = function () { + // action that trigger custom actions inside the code + this.transport.on('trigger:action', this._onCustomAction.bind(this)); + this.transport.on('trigger:scoped_action', this._onCustomAction.bind(this)); + // action that call pm2 api + this.transport.on('trigger:pm2:scoped:action', this._onPM2Action.bind(this)); + this.transport.on('trigger:pm2:action', this._onPM2ScopedAction.bind(this)); +}; - this.socket.on('close', function(dt) { - self.connected = false; +/** + * Listener for custom actions that can be triggered by KM, either scoped or normal + * @param {Object} data + * @param {Object} data.action_name name of the action triggered + * @param {Object} data.process_id id of the process where the action need to be run + * @param {Object} data.opts [optional] parameters used to call the method + * @param {Object} data.uuid [for scoped action] uuid used to recognized the scoped action + */ +ReverseInteractor.prototype._onCustomAction = function (data) { + var self = this; + var type = data.uuid ? 'SCOPED' : 'REMOTE'; + + console.log('[REVERSE] New %s action %s triggered for process %s', type, data.action_name, data.process_id); + // send the request to pmx via IPC + this.pm2.msgProcess({ + id: data.process_id, + msg: data.action_name, + opts: data.opts || data.options || null, + action_name: data.action_name, + uuid: data.uuid + }, function (err, data) { + if (err) { + return self.transport.send('trigger:action:failure', { + success: false, + err: err.message || err, + id: data.process_id, + action_name: data.action_name + }); + } + console.log('[REVERSE] Message received from AXM for proc_id : %s and action name %s', data.process_id, data.action_name); + + return self.transport.send('trigger:action:success', { + success: true, + id: data.process_id, + action_name: data.action_name }); + }); +}; - this.socket.on('start', function() { - self.connected = true; - opts.conf.rev_con = true; - console.log('[REV] Connected to %s:%s', self.network.hostname, self.network.port); +/** + * Handle when KM call a pm2 action + * @param {Object} data + * @param {Object} data.method_name the name of the pm2 method + * @param {Object} data.parameters optional parameters used to call the method + */ +ReverseInteractor.prototype._onPM2Action = function (data) { + var self = this; + // callback when the action has been executed + function callback (err, data) { + console.log('[REVERSE] PM2 action ended : pm2 %s (%s)', data.method_name, err ? 'no error' : err.message || err); + self.transport.send('trigger:pm2:result', { + ret: { err: err, data: data }, + meta: { + method_name: data.method_name, + app_name: data.parameters.name, + machine_name: self.conf.MACHINE_NAME, + public_key: self.conf.PUBLIC_KEY + } }); + } + + console.log('[REVERSE] New PM2 action triggered : pm2 %s %j', data.method_name, data.parameters); + + var method = JSON.parse(JSON.stringify(data.method_name)); + var parameters = data.parameters; + try { + parameters = JSON.parse(JSON.stringify(data.parameters)); + } catch (err) { + console.error(err); + } + + if (!method || PM2_REMOTE_METHOD_ALLOWED[method] === undefined) { + return callback(new Error(method ? 'Method not allowed' : 'invalid method')); + } + + // verify that if a password is required, they actually match + if (PM2_REMOTE_METHOD_ALLOWED[method] === true) { + var passwd = Conf.getSync('pm2:passwd'); + if (!passwd) return callback(new Error('Not password is configured for pm2, please set one : pm2 set pm2:passwd ')); + + var err = new Error('you need to use the configured password in order to use this method'); + if (!data.password) return callback(err); + if (Password.verify(data.password, passwd) !== true) return callback(err); + } + + if (method === 'startLogging') { + global._logs = true; + // Stop streaming logs automatically after timeout + setTimeout(function () { + global._logs = false; + }, 120000); + return callback(null, 'Log streaming enabled'); + } else if (method === 'stopLogging') { + global._logs = false; + return callback(null, 'Log streaming disabled'); + } - console.log('[REV] Connecting to %s:%s', this.network.hostname, this.network.port); - - this.socket.connect(parseInt(this.network.port), this.network.hostname); - this.onMessage(); - }, - /** - * Listening to remote events from Keymetrics - */ - onMessage : function() { - if (!this.socket) return console.error('Reverse interaction not initialized'); - - /** - * Identify this agent to Keymetrics - * via PUBLIC/PRIVATE key exchange - */ - ReverseInteract.introduceToKeymetrics(); - - ReverseInteract.axmCustomActions(); - - /** - * From Pm2Actions.js - */ - ReverseInteract.pm2Actions(); - - ReverseInteract.pm2ScopedActions(); - - return false; - }, - /** - * First method called to identify this agent - */ - introduceToKeymetrics : function() { - var self = this; - - this.socket.data('ask', function(raw_msg) { - if (process.env.NODE_ENV && process.env.NODE_ENV == 'test') { - // Dont cipher data in test environment - self.socket.send('ask:rep', { - success : true, - machine_name : self.conf.MACHINE_NAME, - public_key : self.conf.PUBLIC_KEY - }); - } else { - var ciphered_data = Cipher.cipherMessage(JSON.stringify({ - machine_name : self.conf.MACHINE_NAME - }), self.conf.SECRET_KEY); - - if (!ciphered_data) - return console.error('Got wrong ciphering data %s %s', self.conf.MACHINE_NAME, self.conf.SECRET_KEY); - - self.socket.send('ask:rep', { - data : ciphered_data, - public_key : self.conf.PUBLIC_KEY - }); + return self.pm2.remote(method, parameters, callback); +}; + +/** + * Listen for pm2 scoped action and run them + * @param {Object} data + * @param {Object} data.method_name the name of the pm2 method + * @param {Object} data.parameters optional parameters used to call the method + */ +ReverseInteractor.prototype._onPM2ScopedAction = function (data) { + var self = this; + // callback when the action has been executed + function callback (err, res) { + console.log('[REVERSE] PM2 scoped action ended (id: %s): pm2 %s (%s)', data.uuid, data.action_name, + err ? 'no error' : err.message || err); + self.transport.send('pm2:scoped:' + err ? 'error' : 'end', { + at: Date.now(), + data: { + out: err ? err.message || err : res, + uuid: data.uuid, + action_name: data.action_name, + machine_name: self.conf.MACHINE_NAME, + public_key: self.conf.PUBLIC_KEY } - return false; }); } -}; -util._extend(ReverseInteract, require('./RemoteActions/Pm2Actions.js')); -util._extend(ReverseInteract, require('./RemoteActions/CustomActions.js')); + console.log('[REVERSE] New PM2 scoped action triggered (id: %s) : pm2 %s ', data.uuid, data.action_name); + + var actionName = data.action_name; + var opts = data.options; + + if (!data.uuid || !actionName) { + return callback(new Error('Missing parameters')); + } + + if (!actionName || PM2_REMOTE_METHOD_ALLOWED[actionName] === undefined) { + return callback(new Error(actionName ? 'Method not allowed' : 'invalid method')); + } + + // verify that if a password is required, they actually match + if (PM2_REMOTE_METHOD_ALLOWED[actionName] === true) { + var passwd = Conf.getSync('pm2:passwd'); + if (!passwd) return callback(new Error('Not password is configured for pm2, please set one : pm2 set pm2:passwd ')); -module.exports = ReverseInteract; + var err = new Error('you need to use the configured password in order to use this method'); + if (!data.password) return callback(err); + if (Password.verify(data.password, passwd) !== true) return callback(err); + } + + // send that the action has begun + this.transport.send('pm2:scoped:stream', { + at: Date.now(), + data: { + out: 'Action ' + actionName + ' started', + uuid: data.uuid + } + }); + + process.env.fork_params = JSON.stringify({ action: actionName, opts: opts }); + var app = fork(path.resolve(__dirname, '/ScopedExecution.js'), [], { + silent: true + }); + app.once('error', callback); + + app.stdout.on('data', function (out) { + self.transport.send('pm2:scoped:stream', { + at: Date.now(), + data: { + type: 'out', + out: out instanceof Buffer ? out.toString() : out, + uuid: data.uuid + } + }); + }); + + app.stderr.on('data', function (err) { + self.transport.send('pm2:scoped:stream', { + at: Date.now(), + data: { + type: 'err', + out: err instanceof Buffer ? err.toString() : err, + uuid: data.uuid + } + }); + }); + + app.on('message', function (data) { + data = JSON.parse(data); + if (data.isFinished !== true) return false; + return callback(data.err, data.dt); + }); +}; diff --git a/lib/Interactor/ScopedExecution.js b/lib/Interactor/ScopedExecution.js new file mode 100644 index 000000000..2bb90bdfe --- /dev/null +++ b/lib/Interactor/ScopedExecution.js @@ -0,0 +1,31 @@ +/** + * Copyright 2013 the PM2 project authors. All rights reserved. + * Use of this source code is governed by a license that + * can be found in the LICENSE file. + */ + +var pm2 = require('../../..'); +var domain = require('domain'); +var Utility = require('../../Utility.js'); + +var d = domain.create(); + +d.once('error', function (err) { + process.send(JSON.stringify({ err: err.message || err, isFinished: true })); +}); + +d.run(function () { + var params = JSON.parse(process.env.fork_params); + console.log('Executing: pm2 %s %s', params.action, params.opts.args ? params.opts.args.join(' ') : ''); + + pm2.connect(function () { + pm2.remoteV2(params.action, params.opts, function (err, dt) { + process.send(JSON.stringify(Utility.clone({ + err: err, + dt: dt, + isFinished: true + }))); + pm2.disconnect(process.exit); + }); + }); +}); diff --git a/lib/Interactor/TransactionAggregator.js b/lib/Interactor/TransactionAggregator.js index da3a2fe11..975b78fd5 100644 --- a/lib/Interactor/TransactionAggregator.js +++ b/lib/Interactor/TransactionAggregator.js @@ -7,28 +7,24 @@ /** * Dependencies */ -var cst = require('../../constants.js'); -var log = require('debug')('pm2:aggregator'); -var async = require('async'); +var cst = require('../../constants.js'); +var log = require('debug')('pm2:aggregator'); var Utility = require('./Utility.js'); -var fclone = require('fclone'); -var fs = require('fs'); -var path = require('path'); +var fclone = require('fclone'); var FLUSH_INTERVAL = process.env.NODE_ENV === 'local_test' || process.env.PM2_DEBUG ? 1000 : 30000; var LABELS = { - "HTTP_RESPONSE_CODE_LABEL_KEY": 'http/status_code', - "HTTP_URL_LABEL_KEY": 'http/url', - "HTTP_METHOD_LABEL_KEY": 'http/method', - "HTTP_RESPONSE_SIZE_LABEL_KEY": 'http/response/size', - "STACK_TRACE_DETAILS_KEY": 'stacktrace', - "ERROR_DETAILS_NAME": 'error/name', - "ERROR_DETAILS_MESSAGE": 'error/message', - "HTTP_SOURCE_IP": 'http/source/ip', - "HTTP_PATH_LABEL_KEY": "http/path" -} - + 'HTTP_RESPONSE_CODE_LABEL_KEY': 'http/status_code', + 'HTTP_URL_LABEL_KEY': 'http/url', + 'HTTP_METHOD_LABEL_KEY': 'http/method', + 'HTTP_RESPONSE_SIZE_LABEL_KEY': 'http/response/size', + 'STACK_TRACE_DETAILS_KEY': 'stacktrace', + 'ERROR_DETAILS_NAME': 'error/name', + 'ERROR_DETAILS_MESSAGE': 'error/message', + 'HTTP_SOURCE_IP': 'http/source/ip', + 'HTTP_PATH_LABEL_KEY': 'http/path' +}; /** * @@ -70,19 +66,20 @@ var LABELS = { */ var TransactionAggregator = module.exports = function (pushInteractor) { - if (!(this instanceof TransactionAggregator)) + if (!(this instanceof TransactionAggregator)) { return new TransactionAggregator(pushInteractor); + } var self = this; this.processes = {}; - this.stackParser = pushInteractor.stackParser; + this.stackParser = pushInteractor._stackParser; - // clean aggregated data on restart + start a aggregation period where no data are send - if (pushInteractor.ipm2) { - pushInteractor.ipm2.bus.on('process:event', function (data) { - if (data.event !== 'exit') return ; - if (!self.processes[data.process.name]) return ; + // clean aggregated data on restart + start a aggregation period where no data are send + if (pushInteractor._pm2) { + pushInteractor._pm2.bus.on('process:event', function (data) { + if (data.event !== 'exit') return; + if (!self.processes[data.process.name]) return; log('Restart triggered a data clear for process %s', data.process.name); self.processes[data.process.name] = initializeRouteMeta({ @@ -101,7 +98,7 @@ var TransactionAggregator = module.exports = function (pushInteractor) { self.processes[data.process.name].learning = true; self.processes[data.process.name].learning_timeout = setTimeout(function () { self.processes[data.process.name].learning = false; - }, cst.AGGREGATION_DURATION) + }, cst.AGGREGATION_DURATION); }); } @@ -110,20 +107,20 @@ var TransactionAggregator = module.exports = function (pushInteractor) { * * @param {Object} process process meta */ - function initializeRouteMeta(process) { + function initializeRouteMeta (process) { return { routes: {}, meta: { - trace_count : 0, - mean_latency : 0, - http_meter : new Utility.EWMA(), - db_meter : new Utility.EWMA() + trace_count: 0, + mean_latency: 0, + http_meter: new Utility.EWMA(), + db_meter: new Utility.EWMA() }, process: process }; } - this.getAggregation = function() { + this.getAggregation = function () { return this.processes; }; @@ -134,69 +131,60 @@ var TransactionAggregator = module.exports = function (pushInteractor) { * @param {Object} packet.process process metadata * @param {Object} packet.data trace */ - this.aggregate = function(packet) { - if (!packet) - return log('No any data passed'); - if (!packet.data) - return log('Got packet without trace: %s', JSON.stringify(Object.keys(packet))); - if (!packet.process) - return log('Got packet without process: %s', JSON.stringify(Object.keys(packet))); - - var new_trace = packet.data; - - if (!new_trace.spans || !new_trace.spans[0]) - return log('Trace without spans: %s', Object.keys(new_trace)); - if (!new_trace.spans[0].labels) - return log('Trace spans without labels: %s', Object.keys(new_trace.spans)); - - if (!self.processes[packet.process.name]) + this.aggregate = function (packet) { + if (!packet) return log('No any data passed'); + if (!packet.data) return log('Got packet without trace: %s', JSON.stringify(Object.keys(packet))); + if (!packet.process) return log('Got packet without process: %s', JSON.stringify(Object.keys(packet))); + + var newTrace = packet.data; + + if (!newTrace.spans || !newTrace.spans[0]) return log('Trace without spans: %s', Object.keys(newTrace)); + if (!newTrace.spans[0].labels) return log('Trace spans without labels: %s', Object.keys(newTrace.spans)); + + if (!self.processes[packet.process.name]) { self.processes[packet.process.name] = initializeRouteMeta(packet.process); + } var process = self.processes[packet.process.name]; // Get http path of current span - var path = new_trace.spans[0].labels[LABELS.HTTP_PATH_LABEL_KEY]; + var path = newTrace.spans[0].labels[LABELS.HTTP_PATH_LABEL_KEY]; // Cleanup spans - self.censorSpans(new_trace.spans); + self.censorSpans(newTrace.spans); // Update app meta (mean_latency, http_meter, db_meter, trace_count) - new_trace.spans.forEach(function (span) { - if (!span.name || !span.kind) + newTrace.spans.forEach(function (span) { + if (!span.name || !span.kind) { return false; - // update http latency/meter - else if (span.kind === 'RPC_SERVER') { + } else if (span.kind === 'RPC_SERVER') { var duration = Math.round(new Date(span.endTime) - new Date(span.startTime)); - process.meta.mean_latency = process.meta.trace_count > 0 ? - (duration + (process.meta.mean_latency * process.meta.trace_count)) / (process.meta.trace_count + 1) : duration; + process.meta.mean_latency = process.meta.trace_count > 0 + ? (duration + (process.meta.mean_latency * process.meta.trace_count)) / (process.meta.trace_count + 1) : duration; return process.meta.http_meter.update(); - } - // update db_meter - else if (span.name.indexOf('mongo') > -1 || - span.name.indexOf('redis') > -1 || - span.name.indexOf('sql') > -1) + } else if (span.name.indexOf('mongo') > -1 || span.name.indexOf('redis') > -1 || span.name.indexOf('sql') > -1) { return process.meta.db_meter.update(); - }) + } + }); process.meta.trace_count++; - // remove the last slash if exist - if (path[0] === '/' && path !== '/') - path = path.substr(1, path.length - 1) + if (path[0] === '/' && path !== '/') { + path = path.substr(1, path.length - 1); + } // Find var matched = self.matchPath(path, process.routes); if (!matched) { process.routes[path] = []; - log('Path %s isnt aggregated yet, creating new entry', path) - self.mergeTrace(process.routes[path], new_trace); - } - else { - log('Path %s already aggregated under %s', path, matched) - self.mergeTrace(process.routes[matched], new_trace); + log('Path %s isnt aggregated yet, creating new entry', path); + self.mergeTrace(process.routes[path], newTrace); + } else { + log('Path %s already aggregated under %s', path, matched); + self.mergeTrace(process.routes[matched], newTrace); } return self.processes; - } + }; /** * Merge new trace and compute mean, min, max, count @@ -207,38 +195,39 @@ var TransactionAggregator = module.exports = function (pushInteractor) { this.mergeTrace = function (aggregated, trace) { var self = this; - if (!aggregated || !trace) - return ; + if (!aggregated || !trace) return; // remove spans with startTime == endTime - trace.spans = trace.spans.filter(function(span) { + trace.spans = trace.spans.filter(function (span) { return span.endTime !== span.startTime; - }) + }); // if the trace doesn't any spans stop aggregation here - if (trace.spans.length == 0) - return ; + if (trace.spans.length === 0) return; // create data structure if needed - if (!aggregated.variances) + if (!aggregated.variances) { aggregated.variances = []; - if (!aggregated.meta) + } + if (!aggregated.meta) { aggregated.meta = { count: 0, min: 100000, max: 0 - } + }; + } // compute duration of child spans - trace.spans.forEach(function(span) { + trace.spans.forEach(function (span) { span.min = span.max = span.mean = Math.round(new Date(span.endTime) - new Date(span.startTime)); delete span.endTime; - }) + }); // Calculate/Update mean - if (aggregated.meta.count > 0) - aggregated.meta.mean = (trace.spans[0].mean + (aggregated.meta.mean * aggregated.meta.count)) / (aggregated.meta.count + 1) - else + if (aggregated.meta.count > 0) { + aggregated.meta.mean = (trace.spans[0].mean + (aggregated.meta.mean * aggregated.meta.count)) / (aggregated.meta.count + 1); + } else { aggregated.meta.mean = trace.spans[0].mean; + } // update min/max aggregated.meta.min = aggregated.meta.min > trace.spans[0].mean ? trace.spans[0].mean : aggregated.meta.min; @@ -260,13 +249,11 @@ var TransactionAggregator = module.exports = function (pushInteractor) { // parse strackrace self.parseStacktrace(trace.spans); aggregated.variances.push(trace); - } - // variance found, merge spans - else { + } else { // delete stacktrace before merging trace.spans.forEach(function (span) { delete span.labels.stacktrace; - }) + }); variance.min = variance.min > trace.spans[0].mean ? trace.spans[0].mean : variance.min; variance.max = variance.max < trace.spans[0].mean ? trace.spans[0].mean : variance.max; variance.mean = (trace.spans[0].mean + (variance.mean * variance.count)) / (variance.count + 1); @@ -277,29 +264,30 @@ var TransactionAggregator = module.exports = function (pushInteractor) { variance.meter.update(); variance.count++; } - } + }; // for every variance, check spans same variance for (var i = 0; i < aggregated.variances.length; i++) { - if (self.compareList(aggregated.variances[i].spans, trace.spans)) - return merge(aggregated.variances[i]) + if (self.compareList(aggregated.variances[i].spans, trace.spans)) { + return merge(aggregated.variances[i]); + } } // else its a new variance return merge(null); - } + }; /** * Parkour simultaneously both spans list to update value of the first one using value of the second one * The first should be variance already aggregated for which we want to merge the second one * The second one is a new trace, so we need to re-compute mean/min/max time for each spans */ - this.updateSpanDuration = function (ref_spans, spans, count) { - for (var i = 0, len = ref_spans.length; i < len; i++) { - ref_spans[i].mean = Math.round((spans[i].mean + (ref_spans[i].mean * count)) / (count + 1) * 100) / 100; - ref_spans[i].min = ref_spans[i].min > spans[i].mean ? spans[i].mean : ref_spans[i].min; - ref_spans[i].max = ref_spans[i].max < spans[i].mean ? spans[i].mean : ref_spans[i].max; + this.updateSpanDuration = function (aggregatedSpans, spans, count) { + for (var i = 0, len = aggregatedSpans.length; i < len; i++) { + aggregatedSpans[i].mean = Math.round((spans[i].mean + (aggregatedSpans[i].mean * count)) / (count + 1) * 100) / 100; + aggregatedSpans[i].min = aggregatedSpans[i].min > spans[i].mean ? spans[i].mean : aggregatedSpans[i].min; + aggregatedSpans[i].max = aggregatedSpans[i].max < spans[i].mean ? spans[i].mean : aggregatedSpans[i].max; } - } + }; /** * Compare two spans list by going down on each span and comparing child and attribute @@ -315,31 +303,31 @@ var TransactionAggregator = module.exports = function (pushInteractor) { if (one[i].labels.length !== two[i].labels.length) return false; } return true; - } + }; /** * Will return the route if we found an already matched route */ this.matchPath = function (path, routes) { // empty route is / without the fist slash - if (path === '/') - return routes[path] ? path : null; + if (path === '/') return routes[path] ? path : null; // remove the last slash if exist - if (path[path.length - 1] === '/') - path = path.substr(0, path.length - 1) + if (path[path.length - 1] === '/') { + path = path.substr(0, path.length - 1); + } // split to get array of segment path = path.split('/'); // if the path has only one segment, we just need to compare the key - if (path.length === 1) - return routes[path[0]] ? routes[path[0]] : null; + if (path.length === 1) return routes[path[0]] ? routes[path[0]] : null; // check in routes already stored for match var keys = Object.keys(routes); for (var i = 0, len = keys.length; i < len; i++) { - var route = keys[i], segments = route.split('/'); + var route = keys[i]; + var segments = route.split('/'); if (segments.length !== path.length) continue; @@ -347,27 +335,25 @@ var TransactionAggregator = module.exports = function (pushInteractor) { // different segment, try to find if new route or not if (path[j] !== segments[j]) { // if the aggregator already have matched that segment with a wildcard and the next segment is the same - if (self.isIdentifier(path[j]) && segments[j] === '*' && path[j - 1] === segments[j - 1]) + if (self.isIdentifier(path[j]) && segments[j] === '*' && path[j - 1] === segments[j - 1]) { return segments.join('/'); - // case a var in url match, so we continue because they must be other var in url - else if (path[j - 1] !== undefined && path[j - 1] === segments[j - 1] && self.isIdentifier(path[j]) && self.isIdentifier(segments[j])) { + } else if (path[j - 1] !== undefined && path[j - 1] === segments[j - 1] && self.isIdentifier(path[j]) && self.isIdentifier(segments[j])) { + // case a var in url match, so we continue because they must be other var in url segments[j] = '*'; // update routes in cache routes[segments.join('/')] = routes[route]; - delete routes[keys[i]] + delete routes[keys[i]]; return segments.join('/'); + } else { + break; } - else - break ; } // if finish to iterate over segment of path, we must be on the same route - if (j == 0) - return segments.join('/') + if (j === 0) return segments.join('/'); } - } - } + }; /** * Check if the string can be a id of some sort @@ -377,20 +363,11 @@ var TransactionAggregator = module.exports = function (pushInteractor) { this.isIdentifier = function (id) { id = typeof (id) !== 'string' ? id + '' : id; - // uuid v1/v4 with/without dash - if (id.match(/[0-9a-f]{8}-[0-9a-f]{4}-[14][0-9a-f]{3}-[0-9a-f]{4}-[0-9a-f]{12}|[0-9a-f]{12}[14][0-9a-f]{19}/i)) - return true; - // if number - else if (id.match(/\d+/)) - return true; - // if suit of nbr/letters - else if (id.match(/[0-9]+[a-z]+|[a-z]+[0-9]+/)) - return true; - else - return false; - } + return id.match(/[0-9a-f]{8}-[0-9a-f]{4}-[14][0-9a-f]{3}-[0-9a-f]{4}-[0-9a-f]{12}|[0-9a-f]{12}[14][0-9a-f]{19}/i) || + id.match(/\d+/) || id.match(/[0-9]+[a-z]+|[a-z]+[0-9]+/); + }; - var REGEX_JSON_CLEANUP = /":(?!\[|{)\\"[^"]*\\"|":(["'])(?:(?=(\\?))\2.)*?\1|":(?!\[|{)[^,}\]]*|":\[[^{]*]/g + var REGEX_JSON_CLEANUP = /":(?!\[|{)\\"[^"]*\\"|":(["'])(?:(?=(\\?))\2.)*?\1|":(?!\[|{)[^,}\]]*|":\[[^{]*]/g; /** * Cleanup trace data * - delete result(s) @@ -398,13 +375,11 @@ var TransactionAggregator = module.exports = function (pushInteractor) { * * @param {Object} spans list of span for a trace */ - this.censorSpans = function(spans) { - if (!spans) - return log('spans is null'); + this.censorSpans = function (spans) { + if (!spans) return log('spans is null'); - spans.forEach(function(span) { - if (!span.labels) - return; + spans.forEach(function (span) { + if (!span.labels) return; delete span.labels.results; delete span.labels.result; @@ -412,36 +387,35 @@ var TransactionAggregator = module.exports = function (pushInteractor) { delete span.parentSpanId; delete span.labels.values; - Object.keys(span.labels).forEach(function(key) { - if (typeof(span.labels[key]) === 'string' && key !== 'stacktrace') + Object.keys(span.labels).forEach(function (key) { + if (typeof (span.labels[key]) === 'string' && key !== 'stacktrace') { span.labels[key] = span.labels[key].replace(REGEX_JSON_CLEANUP, '\": \"?\"'); + } }); }); - } + }; /** * Parse stackrace of spans to extract and normalize data - * + * * @param {Object} spans list of span for a trace */ this.parseStacktrace = function (spans) { var self = this; - if (!spans) - return log('spans is null'); + if (!spans) return log('spans is null'); spans.forEach(function (span) { // if empty make sure that it doesnt exist - if (!span.labels.stacktrace || typeof(span.labels.stacktrace) !== 'string') - return; + if (!span.labels.stacktrace || typeof (span.labels.stacktrace) !== 'string') return; // you never know what come through that door try { span.labels.stacktrace = JSON.parse(span.labels.stacktrace); } catch (e) { - return ; + return; } - if (!span.labels.stacktrace || !(span.labels.stacktrace.stack_frame instanceof Array) ) return ; + if (!span.labels.stacktrace || !(span.labels.stacktrace.stack_frame instanceof Array)) return; // parse the stacktrace var result = self.stackParser.parse(span.labels.stacktrace.stack_frame); if (result) { @@ -452,61 +426,60 @@ var TransactionAggregator = module.exports = function (pushInteractor) { spans.forEach(function (span) { delete span.labels.stacktrace; - }) - } + }); + }; /** * Normalize aggregation * * @param {Function} cb callback */ - this.prepareAggregationforShipping = function(cb) { + this.prepareAggregationforShipping = function (cb) { var normalized = {}; // Iterate each applications - Object.keys(self.processes).forEach(function(app_name) { - var process = self.processes[app_name]; - var routes = process.routes; + Object.keys(self.processes).forEach(function (appName) { + var process = self.processes[appName]; + var routes = process.routes; - if (self.processes[app_name].learning === true) - return log('Process %s currently in aggregation mode, dont send any data for now.', app_name); + if (self.processes[appName].learning === true) { + return log('Process %s currently in aggregation mode, dont send any data for now.', appName); + } - normalized[app_name] = { + normalized[appName] = { data: { routes: [], meta: fclone({ - trace_count : process.meta.trace_count, - mean_latency : Math.round(process.meta.mean_latency * 100) / 100, - http_meter : Math.round(process.meta.http_meter.rate(1000) * 100) / 100, - db_meter : Math.round(process.meta.db_meter.rate(1000) * 100) / 100 + trace_count: process.meta.trace_count, + mean_latency: Math.round(process.meta.mean_latency * 100) / 100, + http_meter: Math.round(process.meta.http_meter.rate(1000) * 100) / 100, + db_meter: Math.round(process.meta.db_meter.rate(1000) * 100) / 100 }) }, process: process.process }; - Object.keys(routes).forEach(function(route_path) { - var data = routes[route_path]; + Object.keys(routes).forEach(function (routePath) { + var data = routes[routePath]; // hard check for invalid data - if (!data.variances || data.variances.length == 0) - return ; + if (!data.variances || data.variances.length === 0) return; // get top 5 variances of the same route - var variances = data.variances.sort(function(a, b) { + var variances = data.variances.sort(function (a, b) { return b.count - a.count; }).slice(0, 5); // create a copy without reference to stored one var routeCopy = { - path: route_path === '/' ? '/' : '/' + route_path, + path: routePath === '/' ? '/' : '/' + routePath, meta: fclone(data.meta), variances: [] - } + }; variances.forEach(function (variance) { // hard check for invalid data - if (!variance.spans || variance.spans.length == 0) - return ; + if (!variance.spans || variance.spans.length === 0) return; // deep copy of variances data var tmp = fclone({ @@ -520,16 +493,16 @@ var TransactionAggregator = module.exports = function (pushInteractor) { tmp.meter = Math.round(variance.meter.rate(1000) * 100) / 100; // push serialized into normalized data routeCopy.variances.push(tmp); - }) + }); // push the route into normalized data - normalized[app_name].data.routes.push(routeCopy); + normalized[appName].data.routes.push(routeCopy); }); }); return normalized; }; - this.launchWorker = function() { + this.launchWorker = function () { log('Worker launched'); setInterval(function () { var normalized = self.prepareAggregationforShipping(); @@ -538,7 +511,7 @@ var TransactionAggregator = module.exports = function (pushInteractor) { pushInteractor.bufferData('axm:transaction', normalized[key]); }); }, FLUSH_INTERVAL); - } + }; this.launchWorker(); }; diff --git a/lib/Interactor/Utility.js b/lib/Interactor/Utility.js index b9cb6c7f5..074b00a85 100644 --- a/lib/Interactor/Utility.js +++ b/lib/Interactor/Utility.js @@ -1,38 +1,6 @@ var path = require('path'); var os = require('os'); -// EWMA = ExponentiallyWeightedMovingAverage from -// https://github.com/felixge/node-measured/blob/master/lib/util/ExponentiallyMovingWeightedAverage.js -// used to compute the nbr of time per minute that a variance is hit by a new trace -function EWMA () { - this._timePeriod = 60000; - this._tickInterval = 5000; - this._alpha = 1 - Math.exp(-this._tickInterval / this._timePeriod); - this._count = 0; - this._rate = 0; - - var self = this; - this._interval = setInterval(function () { - self.tick(); - }, this._tickInterval); - this._interval.unref(); -} - -EWMA.prototype.update = function (n) { - this._count += n || 1; -}; - -EWMA.prototype.tick = function () { - var instantRate = this._count / this._tickInterval; - this._count = 0; - - this._rate += (this._alpha * (instantRate - this._rate)); -}; - -EWMA.prototype.rate = function (timeUnit) { - return (this._rate || 0) * timeUnit; -}; - /** * Simple cache implementation * @@ -133,6 +101,265 @@ StackTraceParser.prototype.parse = function (stack) { return false; }; +var http = require('http'); +var https = require('https'); +var stream = require('stream'); +var url = require('url'); +var querystring = require('querystring'); +var util = require('util'); +var EventEmitter = require('events').EventEmitter; + +// https://github.com/dimik/node-handy-http +// Copyright Dmitry Poklonskiy under BSD-2-Clause + +/** + * Simple http client. + * @class + * @name HTTPClient + * @param {Object|Boolean} [agent] Controls Agent behavior. When an Agent is used request will default to Connection: keep-alive. + */ +var HTTPClient = function (agent) { + this._httpAgent = agent; +}; + +/** + * Open connection with server. + * @function + * @name HTTPClient.open + * @param {String|Object} connection uniform resource locator string or connection params object. + * if String: Alias for GET request, equivalent for the { url : connection } + * if Object: {Object} [connection.headers] Request headers addition. + * {Object} [conection.proxy] Remote proxy host and port. + * {Object[]} [conection.files] List of files. + * {String|Object|Buffer|Stream.Readable} [connection.data] In case of: + * - String or Buffer is sent as it is with installing properly Content-Length header + * - Stream.Readable is sent in chunks with Transfer-Encoding "chunked" header. + * - Object becomes a string according to querystring.stringify + * @see http://nodejs.org/api/querystring.html#querystring_querystring_stringify_obj_sep_eq + * if no connection.files or Content-Type header any but multipart/form-data. + * @param {Function} callback Called with null or error description and server answer. + * @returns {HTTPRequest} Useful for events listening. + */ +HTTPClient.prototype.open = function (connection, callback) { + var options = url.parse(connection.url || connection); + var data = connection.data; + var isBuffer = Buffer.isBuffer(data); + var isReadableStream = data instanceof stream.Readable; + var method = (connection.method || 'GET').toUpperCase(); + var headers = Object.keys(connection.headers || {}).reduce(function (headers, header) { + headers[header.toLowerCase()] = connection.headers[header]; + return headers; + }, {}); + var files = connection.files || []; + var proxy = connection.proxy; + + if (files.length) { + headers['content-type'] = 'multipart/form-data'; + } + + switch (headers['content-type'] || typeof data) { + case 'multipart/form-data': + var boundary = Date.now().toString(16); + var prefix = 'Content-Disposition: form-data;'; + var segments = []; + + headers['content-type'] += '; boundary=' + boundary; + + for (var key in data) { + segments.push(util.format('%s name="%s"\r\n\r\n%s\r\n', prefix, key, data[key])); + } + + files.forEach(function (file) { + segments.push(util.format('%s name="%s"; filename="%s"\r\nContent-Type: %s\r\n\r\n%s\r\n', + prefix, file.fieldname || file.name, file.name, file.type, file.value)); + }); + + data = util.format('--%s\r\n%s--%s--\r\n', boundary, segments.join('--' + boundary + '\r\n'), boundary); + break; + case 'application/x-www-form-urlencoded': + case 'object': { + if (isBuffer) { + headers['content-length'] = data.length; + break; + } else if (isReadableStream) { + headers['transfer-encoding'] = 'chunked'; + break; + } else { + headers['content-type'] = 'application/x-www-form-urlencoded'; + data = querystring.stringify(data); + + if (method === 'GET') { + options.pathname = options.path = url.format({ + pathname: options.pathname, + search: [options.search, data].filter(Boolean).join('&') + }); + break; + } + } + } + case 'string': // eslint-disable-line + headers['content-length'] = Buffer.byteLength(data); + break; + default: + data = ''; + } + + if (proxy) { + options.pathname = + options.path = options.protocol + '//' + options.hostname + options.pathname; + options.hostname = + options.host = proxy.host; + options.port = proxy.port; + } + + options.headers = headers; + options.method = method; + options.agent = this._httpAgent; + + var contentType; + var size = 0; + var result = []; + var onData = function (chunk) { + size += chunk.length; + result.push(chunk); + }; + var request = new HTTPRequest(options) + .once('request', function (request) { + if (isReadableStream) { + data.pipe(request); + } else { + method === 'GET' || request.write(data); + request.end(); + } + }) + .once('response', function (response) { + contentType = response.headers['content-type']; + }) + .on('data', onData) + .once('end', function () { + request.removeListener('data', onData); + result = Buffer.concat(result, size); + + if (contentType && ~contentType.search(/json/i)) { + try { + result = JSON.parse(result); + } catch (err) { + return callback(err.toString()); + } + } + callback(null, result); + }) + .once('error', function (err) { + callback(err.toString()); + }) + .open(); + + return request; +}; + +/** + * Wrapper above native NodeJS http.ClientRequest. + * @class + * @name HTTPRequest + * @param {Object} options Request params. + * @augments events.EventEmitter + * @borrows http.ClientRequest#event:response as this.event:response + * @borrows http.ClientRequest#event:data as this.event:data + * @borrows http.ClientRequest#event:end as this.event:end + * @borrows http.ClientRequest#event:error as this.event:error + */ +var HTTPRequest = function (options) { + EventEmitter.call(this); + + this._options = options; +}; +/** + * @augments events.EventEmitter + */ +util.inherits(HTTPRequest, EventEmitter); + +/** + * Open connection with server. + * @function + * @name HTTPRequest.open + * @returns {HTTPRequest} Useful for events listening. + */ +HTTPRequest.prototype.open = function () { + var self = this; + var onData = function (chunk) { + self.emit('data', chunk); + }; + + this._request = ~this._options.protocol.indexOf('https') + ? https.request(this._options) : http.request(this._options); + + this.emit('request', this._request); + + this._request + .once('socket', function (socket) { + self.emit('socket', socket); + }) + .once('response', function (response) { + self.emit('response', response); + response + .on('data', onData) + .once('end', function () { + response.removeListener('data', onData); + self.emit('end'); + }); + }) + .once('error', function (err) { + self.emit('error', err); + }); + + return this; +}; + +/** + * Close connection with server. + * @function + * @name HTTPRequest.close + * @returns {HTTPRequest} + */ +HTTPRequest.prototype.close = function () { + this._request.abort(); + this.emit('abort'); + + return this; +}; + +// EWMA = ExponentiallyWeightedMovingAverage from +// https://github.com/felixge/node-measured/blob/master/lib/util/ExponentiallyMovingWeightedAverage.js +// Copyright Felix Geisendörfer under MIT license +function EWMA () { + this._timePeriod = 60000; + this._tickInterval = 5000; + this._alpha = 1 - Math.exp(-this._tickInterval / this._timePeriod); + this._count = 0; + this._rate = 0; + + var self = this; + this._interval = setInterval(function () { + self.tick(); + }, this._tickInterval); + this._interval.unref(); +} + +EWMA.prototype.update = function (n) { + this._count += n || 1; +}; + +EWMA.prototype.tick = function () { + var instantRate = this._count / this._tickInterval; + this._count = 0; + + this._rate += (this._alpha * (instantRate - this._rate)); +}; + +EWMA.prototype.rate = function (timeUnit) { + return (this._rate || 0) * timeUnit; +}; + // the type of network interface and their default value var interfaceType = { v4: { @@ -161,10 +388,53 @@ function retrieveAddress (type) { } }); }); - return ret; } +var crypto = require('crypto'); +var CIPHER_ALGORITHM = 'aes256'; +var Cipher = {}; + +/** + * Decipher data using 256 bits key (AES) + * @param {Hex} data input data + * @param {String} key 256 bits key + * @return {Object} deciphered data parsed as json object + */ +Cipher.decipherMessage = function (msg, key) { + try { + var decipher = crypto.createDecipher(CIPHER_ALGORITHM, key); + var decipheredMessage = decipher.update(msg, 'hex', 'utf8'); + decipheredMessage += decipher.final('utf8'); + return JSON.parse(decipheredMessage); + } catch (err) { + console.error(err); + return null; + } +}; + +/** + * Cipher data using 256 bits key (AES) + * @param {String} data input data + * @param {String} key 256 bits key + * @return {Hex} ciphered data + */ +Cipher.cipherMessage = function (data, key) { + try { + // stringify if not already done (fail safe) + if (typeof data !== 'string') { + data = JSON.stringify(data); + } + + var cipher = crypto.createCipher(CIPHER_ALGORITHM, key); + var cipheredData = cipher.update(data, 'utf8', 'hex'); + cipheredData += cipher.final('hex'); + return cipheredData; + } catch (err) { + console.error(err); + } +}; + module.exports = { EWMA: EWMA, Cache: Cache, @@ -173,5 +443,8 @@ module.exports = { network: { v4: retrieveAddress('v4'), v6: retrieveAddress('v6') - } + }, + HTTPClient: HTTPClient, + Cipher: Cipher, + fclone: require('fclone') }; diff --git a/lib/Interactor/WebsocketTransport.js b/lib/Interactor/WebsocketTransport.js new file mode 100644 index 000000000..0f5a5a9d6 --- /dev/null +++ b/lib/Interactor/WebsocketTransport.js @@ -0,0 +1,178 @@ + +'use strict'; + +var WebSocket = require('ws'); +var EventEmitter2 = require('eventemitter2').EventEmitter2; +var util = require('util'); +var log = require('debug')('interactor:ws'); +var cst = require('../../constants.js'); +var Utility = require('./Utility.js'); + +/** + * Websocket Transport used to communicate with KM + * @param {Object} opts options + * @param {Daemon} daemon Interactor instance + */ +var WebsocketTransport = module.exports = function (opts, daemon) { + this.opts = opts; + this._daemon = daemon; + this._ws = null; + + // instanciate the eventemitter + EventEmitter2.call(this, { + wildcard: true, + delimiter: ':' + }); +}; + +util.inherits(WebsocketTransport, EventEmitter2); + +/** + * Connect the websocket client to a url + * @param {String} url where the client will connect + * @param {Function} cb invoked with + */ +WebsocketTransport.prototype.connect = function (url, cb) { + var self = this; + // cipher metadata to prove that we have the secret key + var data = this._daemon.getSystemMetadata(); + data = Utility.Cipher.cipherMessage(JSON.stringify(data), this.opts.SECRET_KEY); + + this._host = url; + this._ws = new WebSocket(url, { + perMessageDeflate: false, + headers: { + 'X-KM-PUBLIC': this.opts.PUBLIC_KEY, + 'X-KM-DATA': data, + 'X-KM-SERVER': this.opts.MACHINE_NAME, + 'X-PM2-VERSION': this.opts.PM2_VERSION + } + }); + + function onError (err) { + return cb(err); + } + this._ws.once('error', cb); + this._ws.once('open', cb); + + this._ws.on('close', this._onClose.bind(this)); + this._ws.on('error', this._onError.bind(this)); + this._ws.on('message', this._onMessage.bind(this)); +}; + +/** + * Disconnect the websocket client + */ +WebsocketTransport.prototype.disconnect = function () { + if (this.isConnected()) { + this._ws.close(1000, 'Disconnecting'); + } +}; + +/** + * Disconnect and connect to a url + * @param {String} url where the client will connect [optionnal] + * @param {Function} cb invoked with + */ +WebsocketTransport.prototype.reconnect = function (url, cb) { + if (typeof url === 'function') { + cb = url; + url = this._host; + } + + this.disconnect(); + this.connect(url, cb); +}; + +/** + * Is the websocket connection ready + * @return {Boolean} + */ +WebsocketTransport.prototype.isConnected = function () { + return this._ws && this._ws.readyState === 1; +}; + +// PRIVATE METHODS // + +/** + * Broadcast the close event from websocket connection + * @private + * @param {Integer} code + * @param {String} reason + */ +WebsocketTransport.prototype._onClose = function (code, reason) { + this.emit('close', code, reason); +}; + +/** + * Broadcast the error event from websocket connection + * and eventually close the connection if it isnt already + * @private + * @param {Error} err + */ +WebsocketTransport.prototype._onError = function (err) { + // close connection if needed + if (this.isConnected()) { + this._ws.close(400, err.message); + } + this.emit('error', err); +}; + +/** + * Broadcast the close event from websocket connection + * @private + * @param {Integer} code + * @param {String} reason + */ +WebsocketTransport.prototype._onMessage = function (data, flags) { + // ensure that all required field are present + if (!data || !data.version || !data.payload || !data.channel) { + return log('Received message without all necessary fields'); + } + this.emit(data.channel, data.payload); +}; + +/** + * Broadcast the close event from websocket connection + * @private + * @param {Integer} code + * @param {String} reason + */ +WebsocketTransport.prototype.ping = function (data) { + log('Sending ping request to remote'); + try { + this._ws.ping(JSON.stringify(data), true, false); + } catch (err) { + // connection is closed + this.emit('error', err); + } +}; + +/** + * Send data to ws endpoint + * @param {String} channel + * @param {Object} data + */ +WebsocketTransport.prototype.send = function (channel, data) { + if (!channel || !data) { + return log('Trying to send message without all necessary fields'); + } + if (!this.isConnected()) { + return log('Trying to send data while not connected'); + } + + log('Sending packet over for channel %s', channel); + var packet = { + version: cst.PROTOCOL_VERSION, + payload: data, + channel: channel + }; + this._ws.send(JSON.stringify(packet), { + compress: cst.COMPRESS_PROTOCOL || false + }, function (err) { + if (err) { + console.error(err); + } + // TODO: add fixed queue of packet to allow retry if network fail + }); +}; diff --git a/lib/Satan.js b/lib/Satan.js index e6df06ba9..7db70ad8b 100644 --- a/lib/Satan.js +++ b/lib/Satan.js @@ -329,7 +329,7 @@ Satan.launchDaemon = function launchDaemon(cb) { debug('Launching daemon'); var SatanJS = p.resolve(p.dirname(module.filename), 'Satan.js'); - var InteractorDaemonizer = require('./Interactor/InteractorDaemonizer.js'); + var InteractorClient = require('./Interactor/InteractorClient.js'); var node_args = []; @@ -385,7 +385,7 @@ Satan.launchDaemon = function launchDaemon(cb) { debug('PM2 daemon launched with return message: ', msg); child.removeListener('error', onError); child.disconnect(); - InteractorDaemonizer.launchAndInteract({}, function(err, data) { + InteractorClient.launchAndInteract({}, function(err, data) { if (data) debug('Interactor launched'); return cb ? cb(null, child) : false; diff --git a/test/interface/custom-actions.mocha.js b/test/interface/custom-actions.mocha.js index 5d6117ec3..bc5d00c1a 100644 --- a/test/interface/custom-actions.mocha.js +++ b/test/interface/custom-actions.mocha.js @@ -9,7 +9,7 @@ var util = require('util'); var Cipher = require('../../lib/Interactor/Cipher.js'); var cst = require('../../constants.js'); var Plan = require('../helpers/plan.js'); -var Interactor = require('../../lib/Interactor/InteractorDaemonizer.js'); +var Interactor = require('../../lib/Interactor/InteractorClient.js'); var server = new events.EventEmitter(); var pm2_bus; diff --git a/test/interface/interactor.daemonizer.mocha.js b/test/interface/interactor.daemonizer.mocha.js index 9c8fd1a10..7c3a437b8 100644 --- a/test/interface/interactor.daemonizer.mocha.js +++ b/test/interface/interactor.daemonizer.mocha.js @@ -3,7 +3,7 @@ var should = require('should'); var fs = require('fs'); var os = require('os'); var default_conf = require('../../constants'); -var interactorDaemonizer = require('../../lib/Interactor/InteractorDaemonizer'); +var InteractorClient = require('../../lib/Interactor/InteractorClient'); var json5 = require('../../lib/tools/json5.js'); describe('Daemonizer interactor', function() { @@ -20,7 +20,7 @@ describe('Daemonizer interactor', function() { describe('General tests', function() { it('should try get set keys but get error because nothing exposed', function(done) { - interactorDaemonizer.getOrSetConf(default_conf, null, function(err, data) { + InteractorClient.getOrSetConf(default_conf, null, function(err, data) { err.should.not.be.null(); done(); }); @@ -33,7 +33,7 @@ describe('Daemonizer interactor', function() { }); it('should set right node by default', function(done) { - interactorDaemonizer.getOrSetConf(default_conf, { + InteractorClient.getOrSetConf(default_conf, { secret_key : 'xxx', public_key : 'yyy', machine_name : null, @@ -50,7 +50,7 @@ describe('Daemonizer interactor', function() { }); it('should retrieve data from file without env variable', function(done) { - interactorDaemonizer.getOrSetConf(default_conf, null, function(err, data) { + InteractorClient.getOrSetConf(default_conf, null, function(err, data) { should(err).be.null(); data.secret_key.should.eql('xxx'); data.public_key.should.eql('yyy'); @@ -64,7 +64,7 @@ describe('Daemonizer interactor', function() { }); it('should set new keys and write in configuration file', function(done) { - interactorDaemonizer.getOrSetConf(default_conf, { + InteractorClient.getOrSetConf(default_conf, { secret_key : 'XXXS2', public_key : 'XXXP2', info_node : 'test2.url' @@ -88,7 +88,7 @@ describe('Daemonizer interactor', function() { }); it('should retrieve data from file without env variable', function(done) { - interactorDaemonizer.getOrSetConf(default_conf, null, function(err, data) { + InteractorClient.getOrSetConf(default_conf, null, function(err, data) { should(err).be.null(); data.secret_key.should.eql('XXXS2'); data.public_key.should.eql('XXXP2'); @@ -113,7 +113,7 @@ describe('Daemonizer interactor', function() { it('should work with env variables and create file', function(done) { - interactorDaemonizer.getOrSetConf(default_conf, { + InteractorClient.getOrSetConf(default_conf, { secret_key : null, public_key : null, machine_name : null, @@ -136,7 +136,7 @@ describe('Daemonizer interactor', function() { }); it('should retrieve data from file without env variable', function(done) { - interactorDaemonizer.getOrSetConf(default_conf, null, function(err, data) { + InteractorClient.getOrSetConf(default_conf, null, function(err, data) { should(err).be.null(); data.secret_key.should.eql('XXXS'); data.public_key.should.eql('XXXP'); diff --git a/test/interface/scoped_pm2_actions.mocha.js b/test/interface/scoped_pm2_actions.mocha.js index c8c0dc6af..c98849597 100644 --- a/test/interface/scoped_pm2_actions.mocha.js +++ b/test/interface/scoped_pm2_actions.mocha.js @@ -9,7 +9,7 @@ var cst = require('../../constants.js'); var Plan = require('../helpers/plan.js'); var Configuration = require('../../lib/Configuration.js'); var Helpers = require('../helpers/apps.js'); -var Interactor = require('../../lib/Interactor/InteractorDaemonizer.js'); +var Interactor = require('../../lib/Interactor/InteractorClient.js'); var gl_interactor_process; var send_cmd = new events.EventEmitter();