Browse files

Refactored agents, added simple test

  • Loading branch information...
1 parent 3bcadd3 commit 4372fe7e736128baa31d3c8029a0a24254a4a72e @DamonOehlman committed Aug 31, 2011
Showing with 220 additions and 190 deletions.
  1. +2 −1 .gitignore
  2. +12 −0 lib/agent.js
  3. +42 −52 lib/agents/cpu.js
  4. +16 −24 lib/agents/mem.js
  5. +0 −52 lib/agents/net.js
  6. +50 −6 lib/agents/proc.js
  7. +75 −46 lib/collector.js
  8. +11 −9 lib/sampledb.js
  9. +1 −0 package.json
  10. +11 −0 test/takesample.js
View
3 .gitignore
@@ -1 +1,2 @@
-node_modules
+node_modules
+samples
View
12 lib/agent.js
@@ -0,0 +1,12 @@
+var DEFAULT_AMOUNT = Math.pow(10, 5);
+
+// define the base agent
+var BaseAgent = exports.BaseAgent = function() {
+};
+
+BaseAgent.prototype.roundVal = function(value, decimals) {
+ // initialise the amount, avoid math if we can
+ var amount = decimals ? Math.pow(10, decimals) : DEFAULT_AMOUNT;
+
+ return Math.round(value * amount) / amount;
+};
View
94 lib/agents/cpu.js
@@ -1,61 +1,51 @@
-var os = require('os');
-
-module.exports = (function() {
- var cpuCores = os.cpus().length,
- cpuOld;
- /* internals */
+var os = require('os'),
+ util = require('util'),
+ BaseAgent = require('../agent').BaseAgent,
+ coreCount = os.cpus().length;
-
- function collect() {
- var raw = os.cpus(),
- cpu = [];
-
- for (var ii=0; ii < cpuCores; ii++) {
- var times = raw[ii].times,
- total = 0;
-
- for (var key in times) {
- total += times[key];
- } // for
+function readCpu() {
+ var raw = os.cpus(),
+ cpu = [];
+
+ for (var ii = coreCount; ii--; ) {
+ var times = raw[ii].times,
+ total = 0;
- cpu[ii] = {
- total: total,
- idle: times.idle
- };
+ for (var key in times) {
+ total += times[key];
} // for
- return cpu;
- } // collect
-
- function calcLoad(cpuNew) {
- var idleDelta,
- tickDelta,
- totalTick,
- load = [];
+ cpu[ii] = {
+ total: total,
+ idle: times.idle
+ };
+ } // for
+
+ return cpu;
+};
- for (var ii=0; ii < cpuCores; ii++) {
- tickDelta = cpuNew[ii].total - cpuOld[ii].total;
- idleDelta = cpuNew[ii].idle - cpuOld[ii].idle;
-
- load[ii] = 1 - idleDelta / tickDelta;
- cpuOld[ii] = cpuNew[ii];
- } // for
+var Agent = exports.Agent = function() {
+ // set the default frequency to 100ms
+ this.frequency = 100;
+ this.lastReading = readCpu();
+};
- return load;
- } // calcLoad
+util.inherits(Agent, BaseAgent);
- cpuOld = collect();
+Agent.prototype.run = function(callback) {
+ var idleDelta, tickDelta, load = [],
+ reading = readCpu();
- function run(callback) {
- callback(calcLoad(collect()));
- } // callback
-
- /* exports */
+ for (var ii = coreCount; ii--; ) {
+ tickDelta = reading[ii].total - this.lastReading[ii].total;
+ idleDelta = reading[ii].idle - this.lastReading[ii].idle;
+
+ // calculate the load for the core
+ load[ii] = this.roundVal(1 - (tickDelta ? idleDelta / tickDelta : 1));
+
+ // update the last reading
+ this.lastReading[ii] = reading[ii];
+ } // for
- return {
- // define the frequency for
- frequency: 100, // '* * * * * *',
- run: run
- };
-
-})();
+ callback(load);
+};
View
40 lib/agents/mem.js
@@ -1,27 +1,19 @@
-var os = require('os');
-
-module.exports = (function() {
- var data = {
- "total" : os.totalmem()
- };
-
- /* internals */
- function collect() {
- data.free = os.freemem();
-
- return data;
- }
+var os = require('os'),
+ util = require('util'),
+ BaseAgent = require('../agent').BaseAgent,
+ totalMem = os.totalmem();
+
+var Agent = exports.Agent = function() {
+};
+util.inherits(Agent, BaseAgent);
- function run(callback) {
- callback(collect());
- } // callback
-
- /* exports */
+Agent.prototype.run = function(callback) {
+ // read the free memory
+ var free = os.freemem();
- return {
- // define the frequency for
- frequency: '* * * * * *',
- run: run
- };
-})();
+ callback(this.roundVal((totalMem - free) / totalMem), {
+ free: free,
+ total: totalMem
+ });
+};
View
52 lib/agents/net.js
@@ -1,52 +0,0 @@
-var os = require('os'),
- exec = require('child_process').exec;
-
-module.exports = (function() {
- /* internals */
- var reNetworkAct = /(\wX).*?\:(\d+)/g,
- oldData = {};
-
- function collect(callback) {
- var netTraffic = {};
- var adapter = exec('/sbin/ifconfig eth0 | grep "RX bytes"', function(err, data, stderr){
- var match;
- // get the first match
- match = reNetworkAct.exec(data);
-
- while (match) {
- netTraffic[match[1]] = match[2];
- match = reNetworkAct.exec(data);
- } // while
-
- netTraffic.tickCount = new Date().getTime();
- callback(netTraffic);
- });
- } // collect
-
- collect(function(traffic) {
- oldData = traffic;
- });
-
- function run(callback) {
- function compare(newData) {
- var timeElapsed = newData.tickCount - oldData.tickCount,
- netIO = {
- transmit : Math.floor((newData.TX - oldData.TX) * (1000 / timeElapsed)),
- receive : Math.floor((newData.RX - oldData.RX) * (1000 / timeElapsed))
- };
- // reset the Data for the next pass
- oldData = newData;
- //console.log(netIO);
- callback(netIO);
- } // compare
- collect(compare);
- } // run
-
- /* exports */
-
- return {
- // define the frequency for
- frequency: '* * * * * *',
- run: run
- };
-})();
View
56 lib/agents/proc.js
@@ -1,13 +1,57 @@
var os = require('os'),
- exec = require('child_process').exec;
+ util = require('util'),
+ _ = require('underscore'),
+ exec = require('child_process').exec,
+ BaseAgent = require('../agent').BaseAgent;
+
+function readProcesses(command, callback) {
+ var ps = exec(command, function(err, stdout, stderr) {
+ var lines = (stdout || '').split('\n').slice(1),
+ processes = [],
+ sortedProcesses;
+
+ lines.forEach(function(line) {
+ var data = line.split(/\s+/).slice(1),
+ psData = {
+ cpu: parseFloat(data[0]),
+ mem: parseFloat(data[1]),
+ process: data.slice(2).join(' ')
+ };
+
+ if (psData.cpu >= 0.5 || psData.mem >= 0.5) {
+ processes.push(psData);
+ } // if
+ });
+
+ callback(null, processes.sort(function(a, b) {
+ return b.cpu - a.cpu;
+ }));
+ });
+};
+
+var Agent = exports.Agent = function() {
+ this.frequency = '*/2 * * * * *';
+};
+
+util.inherits(Agent, BaseAgent);
+
+Agent.prototype.run = function(callback) {
+ var command = this.opts.command || 'ps -A -o %cpu,%mem,fname,command';
+
+ readProcesses(command, function(err, processes) {
+ if (! err) {
+ callback(processes.slice(0, 5), processes.slice(5));
+ } // if
+ });
+};
-module.exports = (function() {
- /* internals */
+/*
+module.exports = (function() {
function run(callback) {
// The following code is taken from https://github.com/codejoust/node-sysmon/blob/master/core.js line 36 - 50
- var ps = exec('ps -e --sort=%cpu -o %cpu,%mem,pid,user,fname | tail --lines 5', function(err, stdout, stderr) {
+ var ps = exec('ps -A -o %cpu,%mem,pid,user,fname,command', function(err, stdout, stderr) {
var ps_raw = (' ' + stdout).split("\n").slice(1, 5).reverse();
var processes = [];
@@ -31,11 +75,11 @@ module.exports = (function() {
}); // ps
} // callback
- /* exports */
-
return {
// define the frequency for
frequency: '* * * * * *',
run: run
};
})();
+
+*/
View
121 lib/collector.js
@@ -7,12 +7,46 @@ var fs = require('fs'),
agents = [];
-function loadAgents(collector, agents) {
- agents.forEach(function(agentPath) {
+function loadAgents(collector, agentPaths, callback) {
+ agentPaths.forEach(function(agentPath) {
// iterate through the agents in the
fs.readdir(agentPath, function(err, files) {
if (! err) {
- files.forEach(collector.loadAgent);
+ files.forEach(function(filename) {
+ var agentId = path.basename(filename, '.js');
+ console.log('Attempting to load agent: ' + agentId);
+
+ try {
+ var Agent = require('./agents/' + agentId).Agent;
+
+ // if the Agent class has been defined, then create an instance
+ if (Agent) {
+ // create the agent
+ var agent = new Agent();
+
+ // initialise
+ agent.id = agentId;
+ agent.opts = collector.opts[agentId] || {};
+
+ // initialise the frequency
+ // priority = options first, default frequency (if set) 2nd, cron pattern for every second last
+ agent.frequency = agent.opts.frequency || agent.frequency || '* * * * * *';
+
+ // add the list of active agents
+ collector.agents.push(agent);
+ } // if
+ else {
+ console.log('Agent= no run handler for agent "' + agentId + '", not registering');
+ } // if..else
+ }
+ catch (e) {
+ console.log('agent "' + filename + '" is invalid: ' + e.message);
+ } // try..catch
+ });
+
+ if (callback) {
+ callback();
+ } // if
} // if
});
});
@@ -26,72 +60,67 @@ var Collector = exports.Collector = function(opts) {
// add the core agents to the begin of the requested agents
opts.agents.unshift(path.join(__dirname, 'agents'));
+ // save the options to the collector
+ this.opts = opts;
+
// initialise members
this.agents = [];
-
- // load the agents
- loadAgents(this, opts.agents);
}; // Collector
util.inherits(Collector, events.EventEmitter);
-Collector.prototype.loadAgent = function(filename) {
- var agentId = path.basename(filename, '.js');
- console.log('Attempting to load agent: ' + agentId);
-
- try {
- var agent = require('./agents/' + agentId);
-
- if (agent.run) {
- if (typeof agent.frequency == 'string') {
- // check the pattern is a valid cron pattern, and then register the job
- new cron.CronTime(agent.frequency);
- console.log('pattern ok, registering agent: ' + agentId);
+Collector.prototype.loadAgents = function(callback) {
+ // load the agents
+ loadAgents(this, this.opts.agents, callback);
+};
- agent.cron = new cron.CronJob(agent.frequency, function() {
- this.runAgent(agentId, agent);
- });
- }
- else {
- console.log('agent specified interval, scheduling');
+Collector.prototype.scheduleAgents = function() {
+ var collector = this;
+
+ this.agents.forEach(function(agent) {
+ if (typeof agent.frequency == 'string') {
+ // check the pattern is a valid cron pattern, and then register the job
+ new cron.CronTime(agent.frequency);
+ console.log('pattern ok, registering agent');
- setInterval(function() {
- this.runAgent(agentId, agent);
- }, agent.frequency);
- } // if..else
-
- // add the agent
- this.agents.push(agent);
+ agent.cron = new cron.CronJob(agent.frequency, function() {
+ collector.runAgent(agent);
+ });
}
else {
- console.log('no run handler for agent "' + agentId + '", not registering');
+ console.log('agent specified interval, scheduling');
+
+ setInterval(function() {
+ collector.runAgent(agent);
+ }, agent.frequency);
} // if..else
- }
- catch (e) {
- console.log('agent "' + filename + '" is invalid: ' + e.message);
- } // try..catch
-}; // loadAgent
+ });
+}; // schedule
+
+Collector.prototype.runAgent = function(agent) {
+ var collector = this;
-Collector.prototype.runAgent = function(agentId, agent) {
- agent.run(function(data) {
- this.emit('data', agentId, data);
+ agent.run(function(data, details) {
+ collector.emit('data', agent.id, data, details);
});
}; // runAgent
-exports.collectToDB = function(opts, path) {
+exports.collectToDB = function(path, opts) {
var db, collector;
// create the sample db
db = new SampleDB(path);
// create the collector and save samples to the db
- collector = new Collector(opts).on('data', function(agentId, data) {
- db.save({
- type: agentId,
- data: data
- });
+ collector = new Collector(opts).on('data', function(agentId, data, details) {
+ db.save(agentId, data, details);
+ });
+
+ collector.loadAgents(function() {
+ collector.scheduleAgents();
});
+
return {
collector: collector,
db: db,
View
20 lib/sampledb.js
@@ -5,15 +5,17 @@ var SampleDB = exports.SampleDB = function(path) {
this.storage.open({ create_if_missing: true }, path || 'samples');
};
-SampleDB.prototype.save = function(entry) {
- // initialise defaults
- entry.time = entry.time || new Date().getTime();
+SampleDB.prototype.close = function() {
+ this.storage.close();
+}; // close
- // if we have an entry type and entry data then save
- if (entry.type && entry.value) {
- var serializedValue = JSON.stringify(entry),
- entryKey = entry.time + '_' + entry.type;
+SampleDB.prototype.save = function(type, data, details) {
+ var tick = new Date().getTime(),
+ serialized = {
+ data: JSON.stringify(data),
+ details: details ? JSON.stringify(details) : undefined
+ },
+ entryKey = tick + '_' + type;
- storage.put({}, new Buffer(entryKey), new Buffer(serializedValue));
- } // if
+ this.storage.put({}, new Buffer(entryKey), new Buffer(serialized));
};
View
1 package.json
@@ -9,6 +9,7 @@
"dependencies": {
"cron": "0.1.x",
+ "underscore": "1.x.x",
"node-leveldb": "https://github.com/DamonOehlman/node-leveldb/tarball/master"
},
View
11 test/takesample.js
@@ -0,0 +1,11 @@
+var collector = require('../lib/collector'),
+ collectionProcess;
+
+console.log('Collecting 5 seconds worth of data');
+
+collectionProcess = collector.collectToDB();
+collectionProcess.collector.on('data', function(agentId, data, details) {
+ console.log(agentId, data);
+});
+
+setTimeout(collectionProcess.stop, 5000);

0 comments on commit 4372fe7

Please sign in to comment.