Permalink
Browse files

servers cleanup

  • Loading branch information...
apocas committed Dec 1, 2014
1 parent 64b8190 commit a2f8f7d9c8eb6f01678c628fc1bd0af68ccbca95
Showing with 130 additions and 75 deletions.
  1. +53 −30 lib/outkept.js
  2. +64 −40 lib/sensor.js
  3. +7 −0 lib/server.js
  4. +4 −4 main.js
  5. +2 −1 package.json
@@ -4,92 +4,115 @@ var Server = require('./server'),
outils = require('./utils'),
async = require('async');
var Outkept = function (id, passphrase, key, ids) {
var Outkept = function(id, passphrase, key, ids) {
this.servers = [];
this.queue = [];
this.passphrase = passphrase;
this.key = key;
this.ids = ids;
this.id = id;
this.loadGardeners();
};
Outkept.prototype.loadGardeners = function () {
Outkept.prototype.loadGardeners = function() {
var self = this;
setInterval(function() {
self.servers.forEach(function (server) {
if (server.connected === false && server.started) {
server.conf.privateKey = self.key;
server.conf.passphrase = self.passphrase;
console.log(self.id + ' - ' + server.conf.host + ' retrying...');
server.connect();
} else if(server.connected === true) {
vendors.mongo(function(db) {
vendors.mongo(function(db) {
async.mapSeries(self.servers, function(server, callback) {
if (server.connected === false && server.started) {
server.conf.privateKey = self.key;
server.conf.passphrase = self.passphrase;
console.log(self.id + ' - ' + server.conf.host + ' retrying...');
server.connect();
} else if (server.connected === true) {
var aux = server.extract();
aux.time = new Date().getTime() / 1000;
aux.time = parseInt(new Date().getTime() / 1000);
db.collection('readings').insert(aux, function(err, docs) {
if(err) console.log(self.id + ' - Mongo error in outkept->save');
if (err) console.log(self.id + ' - Mongo error in outkept->save');
});
}
callback();
}, function(err, results) {});
async.mapSeries(self.ids, function(id, callback) {
db.collection('servers').findOne({
'id': id
}, function(err, server) {
var now = parseInt(new Date().getTime() / 1000);
if (server && (!server.pinged || now - server.pinged > 345600)) {
db.collection('servers').remove({
'id': server.id
}, function(err, removed) {
if (removed) {
console.log('Server ' + server.id + ' removed after 4 days.');
outils.sendMessage('message', 'Server ' + server.id + ' removed after 4 days.');
}
});
}
callback();
});
}
}, function(err, results) {});
});
}, 120000);
};
Outkept.prototype.createServer = function (hostname, sshport) {
Outkept.prototype.createServer = function(hostname, sshport) {
var self = this;
var s = new Server(hostname, sshport, config.crawler_user, this.passphrase, this.key, config.timer);
s.on('ready', function () {
s.on('ready', function() {
this.loadSensors();
outils.sendMessage('message', 'Server ' + this.conf.host + ' connected.');
});
s.on('available', function (sensor) {
s.on('available', function(sensor) {
outils.sendMessage('message', 'New sensor detected at ' + this.conf.host + ', ' + sensor.conf.name + '.');
this.save();
});
s.on('closed', function (was) {
if(was) {
s.on('closed', function(was) {
if (was) {
this.save();
outils.sendMessage('message', 'Server disconnected: ' + this.conf.host);
}
});
s.on('alarmed', function (sensor) {
s.on('alarmed', function(sensor) {
console.log(self.id + ' - ' + this.hostname + ' ALARM!');
outils.sendTrigger('alarmed', sensor.conf.name, sensor.value, s);
});
s.on('warned', function (sensor) {
s.on('warned', function(sensor) {
console.log(self.id + ' - ' + this.hostname + ' WARNING!');
outils.sendTrigger('warned', sensor.conf.name, sensor.value, s);
});
s.on('fired', function (sensor) {
s.on('fired', function(sensor) {
console.log(self.id + ' - ' + this.conf.host + ' sensor ' + sensor.conf.name + ' fired with value ' + sensor.value);
outils.sendTrigger('fired', sensor.conf.name, sensor.value, s);
});
return s;
};
Outkept.prototype.start = function () {
Outkept.prototype.start = function() {
this.loadServers();
this.loadGardeners();
};
Outkept.prototype.loadServers = function() {
var self = this;
vendors.mongo(function(db) {
async.mapLimit(self.ids, 5, function(id, callback) {
db.collection('servers').findOne({id: id}, function(err, server) {
if(server && server.address !== undefined && self.findServer(server.id) === undefined) {
db.collection('servers').findOne({
id: id
}, function(err, server) {
if (server && server.address !== undefined && self.findServer(server.id) === undefined) {
console.log(self.id + ' - Loading server: ' + server.id);
var s = self.createServer(server.address, server.port);
s.id = server.id;
@@ -119,12 +142,12 @@ Outkept.prototype.loadServers = function() {
setTimeout(function() {
self.loadServers();
}, 30000);
}, 60000);
});
});
};
Outkept.prototype.findServer = function (id, cb) {
Outkept.prototype.findServer = function(id, cb) {
for (var i = 0; i < this.servers.length; i++) {
if (this.servers[i].id === id) {
return this.servers[i];
@@ -135,11 +158,11 @@ Outkept.prototype.findServer = function (id, cb) {
var main;
process.on('message', function(m) {
if(m.boot) {
if (m.boot) {
console.log(m.id + ' - Starting...');
main = new Outkept(m.id, m.boot, m.key, m.ids);
main.start();
} else if(m.server) {
} else if (m.server) {
main.ids.push(m.server);
}
});
@@ -1,8 +1,9 @@
var sys = require('sys'),
events = require('events'),
util = require('util');
util = require('util'),
gauss = require('gauss');
var Sensor = function (connection, conf, timing) {
var Sensor = function(connection, conf, timing) {
this.connection = connection;
this.conf = conf;
this.alarmed = false;
@@ -11,11 +12,12 @@ var Sensor = function (connection, conf, timing) {
this.timing = this.conf.timer || timing;
this.latest = new Date().getTime() / 1000;
this.added = false;
this.values = [];
};
sys.inherits(Sensor, events.EventEmitter);
Sensor.prototype.extract = function () {
Sensor.prototype.extract = function() {
var aux = {};
aux.name = this.conf.name;
aux.value = this.value;
@@ -25,13 +27,13 @@ Sensor.prototype.extract = function () {
return aux;
};
Sensor.prototype.check = function () {
Sensor.prototype.check = function() {
var self = this;
if (!this.conf.verifier || this.conf.verifier === '') {
this.emit('available');
} else {
this.connection.send(self.conf.verifier, function (err, data) {
this.connection.send(self.conf.verifier, function(err, data) {
if (data && data.toString('utf-8').indexOf("yes") !== -1) {
self.emit('available');
} else {
@@ -42,22 +44,22 @@ Sensor.prototype.check = function () {
};
Sensor.prototype.getStatus = function() {
if(this.alarmed === true) {
if (this.alarmed === true) {
return "alarmed";
} else if(this.warned === true) {
} else if (this.warned === true) {
return "warned";
} else if(this.fired === true) {
} else if (this.fired === true) {
return "fired";
} else {
return "normal";
}
};
Sensor.prototype.start = function () {
Sensor.prototype.start = function() {
var self = this;
var handler = function(callback) {
self.timer = setTimeout(function () {
self.timer = setTimeout(function() {
self.work(function() {
handler();
});
@@ -69,49 +71,71 @@ Sensor.prototype.start = function () {
});
};
Sensor.prototype.stop = function () {
Sensor.prototype.stop = function() {
clearTimeout(this.timer);
};
Sensor.prototype.work = function (cb) {
Sensor.prototype.work = function(cb) {
var self = this;
this.connection.send(this.conf.cmd, function (err, data) {
if(err) return self.emit('error', err);
this.connection.send(this.conf.cmd, function(err, data) {
if (err) {
if (cb) cb();
return self.emit('error', err);
}
if(self.conf.warning === undefined && self.conf.alarm === undefined) {
self.value = data.toString('utf-8').replace(/^\s+|\s+$/g, "");
if (self.conf.warning === undefined && self.conf.alarm === undefined) {
self.value = data.toString('utf-8').replace(/^\s+|\s+$/g, "");
self.emit('data');
} else {
self.value = parseFloat(data.toString('utf-8').replace(/^\s+|\s+$/g, ""));
if ((self.value >= self.conf.alarm && self.conf.inverted === false) || (self.value <= self.conf.alarm && self.conf.inverted)) {
if (self.conf.reactive !== undefined && self.conf.reactive.length > 0 && ((self.value > self.conf.alarm && self.conf.inverted === false) || (self.value < self.conf.alarm && self.conf.inverted))) {
if (self.fired === false) {
self.connection.send(self.conf.reactive);
self.emit('fired');
self.fired = true;
}
}
if (self.alarmed === false) {
self.emit('alarmed');
var val = parseFloat(data.toString('utf-8').replace(/^\s+|\s+$/g, ""));
var aux = new gauss.Vector(self.values);
var max = aux.max();
var min = aux.min();
if (self.values.length < 10 || max === 0 || val < max * 10000) {
self.values.push(val);
if (self.values.length > 10) {
self.values.shift();
}
self.alarmed = true;
self.warned = false;
} else if ((self.value >= self.conf.warning && self.conf.inverted === false) || (self.value <= self.conf.warning && self.conf.inverted)) {
if (self.warned === false) {
self.emit('warned');
self.value = self.values[self.values.length - 1];
if ((self.value >= self.conf.alarm && self.conf.inverted === false) || (self.value <= self.conf.alarm && self.conf.inverted)) {
if (self.conf.reactive !== undefined && self.conf.reactive.length > 0 && ((self.value > self.conf.alarm && self.conf.inverted === false) || (self.value < self.conf.alarm && self.conf.inverted))) {
if (self.fired === false) {
self.connection.send(self.conf.reactive);
self.emit('fired');
self.fired = true;
}
}
if (self.alarmed === false) {
self.emit('alarmed');
}
self.alarmed = true;
self.warned = false;
} else if ((self.value >= self.conf.warning && self.conf.inverted === false) || (self.value <= self.conf.warning && self.conf.inverted)) {
if (self.warned === false) {
self.emit('warned');
}
self.warned = true;
self.alarmed = false;
self.fired = false;
} else {
self.alarmed = false;
self.warned = false;
self.fired = false;
}
self.warned = true;
self.alarmed = false;
self.fired = false;
self.emit('data');
} else {
self.alarmed = false;
self.warned = false;
self.fired = false;
console.log('IGNORING ' + data.toString('utf-8').replace(/^\s+|\s+$/g, "") + ' at sensor ' + self.conf.name);
console.log(self.values);
}
}
self.emit('data');
if(cb) cb();
if (cb) cb();
});
};
@@ -57,6 +57,8 @@ var Server = function (hostname, sshport, user, password, key, timing) {
self.id = crypto.createHash('md5').update(self.mac).digest('hex');
}
self.pinged = parseInt(new Date().getTime() / 1000);
self.emit('ready');
});
});
@@ -96,6 +98,8 @@ Server.prototype.extract = function () {
//aux.ips = this.ips; //overwriting crawler
aux.connected = this.connected;
aux.pinged = this.pinged;
aux.sensors = [];
this.sensors.forEach(function (sensor) {
var sens = {
@@ -156,6 +160,9 @@ Server.prototype.loadSensors = function () {
self.save();
this.latest = now;
}
self.pinged = parseInt(new Date().getTime() / 1000);
self.emit('data', this);
});
@@ -45,13 +45,13 @@ function loadMain(passphrase, key, ids) {
function loadFeeds() {
var feeds = new Feeds();
feeds.on('alert', function (feed, data) {
feeds.on('alert', function(feed, data) {
console.log('Feed ' + feed.template.name + ' reported ' + data);
outils.sendFeed(feed.template.name, data);
});
}
prompt.get(schema, function (err, result) {
prompt.get(schema, function(err, result) {
if (err) return console.log(err);
var passphrase = result.passphrase;
@@ -66,8 +66,8 @@ prompt.get(schema, function (err, result) {
var ids = [];
for (var i = 0; i < replies.length; i++) {
if(loaded.indexOf(replies[i].id) === -1) {
if(l.ids.length < 50) {
if (loaded.indexOf(replies[i].id) === -1) {
if (l.ids.length < 50) {
l.loadServer(replies[i].id);
loaded.push(replies[i].id);
} else {
Oops, something went wrong.

0 comments on commit a2f8f7d

Please sign in to comment.