Permalink
Browse files

pub/sub rebuilt

  • Loading branch information...
apocas committed Mar 26, 2015
1 parent e851b1c commit eaf28f2722538676c6f80f7878dbd44b27f5d491
Showing with 71 additions and 58 deletions.
  1. +1 −1 .gitignore
  2. +3 −3 lib/utils.js
  3. +17 −7 package.json
  4. +43 −40 plugins/csf.js.example
  5. +7 −7 vendors.js
View
@@ -14,4 +14,4 @@ tests/unlock.js
plugins/csf.js
plugins/csf.js
plugins/suspend.js
View
@@ -55,7 +55,7 @@ module.exports = {
aux.date = Math.round(new Date().getTime());
//console.log(aux);
vendors.mongopubsub.publish('messages', aux);
vendors.redis.publisher.publish('messages', JSON.stringify(aux));
},
sendTrigger: function (level, sensor, value, s) {
@@ -69,7 +69,7 @@ module.exports = {
date: Math.round(new Date().getTime()/1000.0)
};
vendors.mongopubsub.publish('events', aux);
vendors.redis.publisher.publish('events', JSON.stringify(aux));
vendors.mongo(function(db) {
db.collection('triggers').insert(aux, function(err, docs) {
@@ -84,7 +84,7 @@ module.exports = {
feed: feed,
url: url
};
vendors.mongopubsub.publish('events', aux);
vendors.redis.publisher.publish('events', JSON.stringify(aux));
},
secureDelete: function (path) {
View
@@ -1,15 +1,25 @@
{
"name": "outkept",
"version": "2.2.4",
"version": "2.2.5",
"description": "http//outke.pt",
"author": "Pedro Dias <petermdias@gmail.com>",
"maintainers": [
"apocas <petermdias@gmail.com>"
],
"repository": {
"type": "git",
"url": "http://github.com/apocas/outkept.git"
},
"dependencies": {
"ssh2": "0.3.x",
"ssh2": "0.4.x",
"prompt": "0.2.x",
"process": "0.7.x",
"mongodb": "1.4.x",
"feedparser": "0.18.x",
"request": "2.39.x",
"process": "0.10.x",
"mongodb": "2.0.x",
"feedparser": "1.0.x",
"request": "2.54.x",
"ssh2-multiplexer": "0.0.x",
"mubsub": "1.0.x",
"hiredis": "0.2.x",
"redis": "0.12.x",
"async": "0.9.x",
"gauss": "0.2.x"
}
View
@@ -20,51 +20,54 @@ CSF.prototype.start = function() {
});
}
vendors.mongo(function(db) {
vendors.mongopubsub.subscribe('csf', function (event) {
var ev = {
type: 'csf',
message: event.hostname + ' -> CSF operation failed.'
};
var ip = event.ip.trim().replace(/;/g, '');
var subscriber = vendors.redis.createClient();
subscriber.subscribe('csf');
if(event.hostname) {
console.log(event);
var server = new Server(event.hostname, config.crawler_port, config.crawler_user, self.passphrase, self.key, config.timer);
subscriber.on('message', function (channel, event) {
event = JSON.parse(event);
var ev = {
type: 'plugins',
message: event.hostname + ' -> CSF operation failed.'
};
var ip = event.ip.trim().replace(/;/g, '');
ip = event.ip.trim().replace(/|/g, '');
if(event.hostname) {
console.log(event);
var server = new Server(event.hostname, config.crawler_port, config.crawler_user, self.passphrase, self.key, config.timer);
server.on('ready', function () {
var selfs = this;
if(event.type == 'unlock') {
this.connectionPool.send('csf -g "' + ip + '"', function (err, data) {
if (data && data.toString('utf-8').indexOf('No matches found for ' + ip + ' in iptables') === -1) {
var aux = data.toString('utf-8');
aux = aux.trim().split('\n');
var motive = aux[aux.length - 1];
unlock(selfs, event);
ev.message = event.hostname + ' -> ' + motive;
} else {
ev.message = event.hostname + ' -> IP not blocked.';
server.disconnect();
}
});
} else if(event.type == 'lock') {
var reason = event.reason.trim().replace(/;/g, '');
this.connectionPool.send('csf -d ' + ip + ' ' + reason, function (err, data) {
ev.message = event.hostname + ' -> IP blocked.';
server.on('ready', function () {
var selfs = this;
if(event.type == 'unlock') {
this.connectionPool.send('csf -g "' + ip + '"', function (err, data) {
if (data && data.toString('utf-8').indexOf('No matches found for ' + ip + ' in iptables') === -1) {
var aux = data.toString('utf-8');
aux = aux.trim().split('\n');
var motive = aux[aux.length - 1];
unlock(selfs, event);
ev.message = event.hostname + ' -> ' + motive;
} else {
ev.message = event.hostname + ' -> IP not blocked.';
server.disconnect();
});
}
});
}
});
} else if(event.type == 'lock') {
var reason = event.reason.trim().replace(/;/g, '');
this.connectionPool.send('csf -d ' + ip + ' ' + reason, function (err, data) {
ev.message = event.hostname + ' -> IP blocked.';
server.disconnect();
});
}
});
server.on('closed', function(connected) {
vendors.mongopubsub.publish('events', ev);
});
server.on('closed', function(connected) {
vendors.redis.publisher.publish('events', JSON.stringify(ev));
});
server.connect();
} else {
vendors.mongopubsub.publish('events', ev);
}
});
server.connect();
} else {
vendors.redis.publisher.publish('events', JSON.stringify(ev));
}
});
};
View
@@ -1,13 +1,12 @@
var mongoClient = require('mongodb').MongoClient,
mubsub = require('mubsub'),
redis = require('redis'),
config = require('./conf/config');
var db = null;
exports.mongo = function(cb){
if(db){
cb(db);
return;
return cb(db);
}
mongoClient.connect('mongodb://' + config.mongo_host + ':' + config.mongo_port + '/' + config.mongo_database, function(err, conn) {
@@ -16,10 +15,11 @@ exports.mongo = function(cb){
throw new Error(err);
} else {
db = conn;
var channel = mubsub(db).channel('pubsub');
channel.on('error', console.error);
exports.mongopubsub = channel;
cb(db);
return cb(db);
}
});
};
exports.redis = redis;
exports.redis.publisher = redis.createClient();

0 comments on commit eaf28f2

Please sign in to comment.