Skip to content

Commit

Permalink
Decoupling Monitor from the rest of the application.
Browse files Browse the repository at this point in the history
The monitor should be able to run on an independent process, even on an independent server, to avoid side effect of server load over resonse time measurement. To that extent, the Monitor class musn't deal with the ORM directly, but communicate only with the API over REST HTTP.
  • Loading branch information
fzaninotto committed Mar 8, 2012
1 parent 84d0adc commit d37957c
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 46 deletions.
5 changes: 1 addition & 4 deletions app.js
Expand Up @@ -14,10 +14,7 @@ mongoose.connect('mongodb://' + config.mongodb.user + ':' + config.mongodb.passw

// see if a check needs a new poll every 10 seconds
// and update the QoS score every minute
m = monitor.createMonitor(config.monitor.pollingInterval, config.monitor.updateInterval, config.monitor.qosAggregationInterval, config.monitor.timeout, config.monitor.pingHistory);
if (config.monitor.http_proxy) {
m.proxy = config.monitor.http_proxy;
}
m = monitor.createMonitor(config.monitor);
m.start();

var app = module.exports = express.createServer();
Expand Down
14 changes: 10 additions & 4 deletions app/api/routes/check.js
Expand Up @@ -14,28 +14,34 @@ var CheckMonthlyStat = require('../../../models/checkMonthlyStat');
module.exports = function(app) {

app.get('/check', function(req, res) {
Check.find({}).asc('isUp').desc('lastChanged').exclude('qosPerHour').run(function(err, checks) {
Check.find({}).asc('isUp').desc('lastChanged').run(function(err, checks) {
res.json(checks);
});
});

app.get('/check/needingPoll', function(req, res) {
Check.needingPoll().exclude('qos').run(function(err, checks) {
res.json(checks);
});
});

app.get('/check/tag/:name', function(req, res, next) {
Check.find({ tags: req.params.name }).asc('isUp').desc('lastChanged').exclude('qosPerHour').find(function(err, checks) {
Check.find({ tags: req.params.name }).asc('isUp').desc('lastChanged').find(function(err, checks) {
if (err) return next(err);
res.json(checks);
});
});

app.get('/check/:id', function(req, res, next) {
Check.find({ _id: req.params.id }).exclude('qosPerHour').findOne(function(err, check) {
Check.find({ _id: req.params.id }).exclude('qos').findOne(function(err, check) {
if (err) return next(err);
if (!check) return next(new Error('failed to load check ' + req.params.id));
res.json(check);
});
});

app.get('/check/:id/stats/:type/:page?', function(req, res) {
Check.find({ _id: req.params.id }).exclude('qosPerHour').findOne(function(err, check) {
Check.find({ _id: req.params.id }).exclude('qos').findOne(function(err, check) {
if (err) return next(err);
if (!check) return next(new Error('failed to load check ' + req.params.id));
check.getStatsForPeriod(req.params.type, req.params.page, function(stats) {
Expand Down
16 changes: 16 additions & 0 deletions app/api/routes/ping.js
Expand Up @@ -29,4 +29,20 @@ module.exports = function(app) {
});
});

app.put('/ping', function(req, res) {
Check.findById(req.body.checkId, function(err1, check) {
if (err1) {
res.send(err1.message, 500);
return;
};
Ping.createForCheck(check, req.body.status, req.body.time, req.body.error, function(err2, ping) {
if (err2) {
res.send(err2.message, 500);
return;
}
res.json(ping);
});
})
});

};
4 changes: 3 additions & 1 deletion config/default.yaml
Expand Up @@ -5,12 +5,14 @@ mongodb:
password:

monitor:
checkProviderUrl: 'http://localhost:8082/api/check/needingPoll'
pingCreatorUrl: 'http://localhost:8082/ping'
pollingInterval: 10000 # ten seconds
updateInterval: 60000 # one minute
qosAggregationInterval: 600000 # ten minutes
timeout: 5000 # five seconds
pingHistory: 8035200000 # three months
http_proxy:
proxy:

server:
port: 8082
101 changes: 68 additions & 33 deletions lib/monitor.js
@@ -1,7 +1,9 @@
/**
* Module dependencies.
*/
var poller = require('./poller'),
var http = require('http'),
url = require('url'),
poller = require('./poller'),
Check = require('../models/check'),
CheckEvent = require('../models/checkEvent'),
Ping = require('../models/ping');
Expand All @@ -16,13 +18,14 @@ var poller = require('./poller'),
* @param {Number} Oldest ping and checkEvent age to keep in milliseconds, defaults to 3 months
* @api public
*/
function Monitor(pollingInterval, updateInterval, qosAggregationInterval, timeout, oldestHistory) {
this.pollingInterval = (typeof pollingInterval == 'undefined') ? 10000 : pollingInterval;
this.updateInterval = (typeof updateInterval == 'undefined') ? 1000 * 60 : updateInterval;
this.qosAggregationInterval = (typeof qosAggregationInterval == 'undefined') ? 1000 * 60 * 60 : qosAggregationInterval;
this.timeout = (typeof timeout == 'undefined') ? 5000 : timeout;
this.oldestHistory = (typeof oldestHistory == 'undefined') ? 3 * 31 * 24 * 60 * 60 * 1000 : oldestHistory;
this.proxy = {};
function Monitor(config) {
config.pollingInterval = config.pollingInterval || 10 * 1000;
config.updateInterval = config.updateInterval || 60 * 1000;
config.qosAggregationInterval = config.qosAggregationInterval || 60 * 60 * 1000;
config.timeout = config.timeout || 5 * 1000;
config.oldestHistory = config.oldestHistory || 3 * 31 * 24 * 60 * 60 * 1000;
config.proxy = config.proxy || {};
this.config = config;
}

/**
Expand All @@ -36,10 +39,10 @@ Monitor.prototype.start = function() {
// start polling right away
this.pollChecksNeedingPoll();
// schedule future polls
this.intervalForPoll = setInterval(this.pollChecksNeedingPoll.bind(this), this.pollingInterval);
this.intervalForPoll = setInterval(this.pollChecksNeedingPoll.bind(this), this.config.pollingInterval);
// schedule updates
this.intervalForUpdate = setInterval(this.updateAllChecks.bind(this), this.updateInterval);
this.intervalForAggregation = setInterval(this.aggregateQos.bind(this), this.qosAggregationInterval);
this.intervalForUpdate = setInterval(this.updateAllChecks.bind(this), this.config.updateInterval);
this.intervalForAggregation = setInterval(this.aggregateQos.bind(this), this.config.qosAggregationInterval);
}

/**
Expand All @@ -48,9 +51,9 @@ Monitor.prototype.start = function() {
* @api public
*/
Monitor.prototype.stop = function() {
clearInterval(this.intervalForPoll);
clearInterval(this.intervalForUpdate);
clearInterval(this.intervalForAggregation);
clearInterval(this.config.intervalForPoll);
clearInterval(this.config.intervalForUpdate);
clearInterval(this.config.intervalForAggregation);
}

/**
Expand All @@ -61,29 +64,63 @@ Monitor.prototype.stop = function() {
* @param {Function} Callback function to be called with each Check
* @api private
*/
Monitor.prototype.pollChecksNeedingPoll = function() {
Check.callForChecksNeedingPoll(this.pollCheck.bind(this));
}
Monitor.prototype.pollChecksNeedingPoll = function(callback) {
var self = this;
this.findChecksNeedingPoll(function(err, checks) {
if (err) {
console.log(err);
callback(err);
}
checks.forEach(function(check) {
self.pollCheck(check);
});
});
};

Monitor.prototype.findChecksNeedingPoll = function(callback) {
var api = url.parse(this.config.checkProviderUrl);
http.get(api, function(res) {
if (res.statusCode == 200) {
var body = '';
res.on('data', function(chunk) {
body += chunk.toString();
});
res.on('end', function() {
callback(null, JSON.parse(body));
});
} else {
callback(new Error('Check provider service responded with error code: ' + res.statusCode));
}
}).on('error', function(e) {
callback(new Error('Cannot connect to the check provider service: ' + e.message));
});
};

/**
* Poll a given check, and create a ping according to the result.
*
* This method can be called by Mongoose streaming cursor interface, therefore check can be undefined.
*
* @param {Object} check is a simple JSON object returned by the API, NOT a Check object
* @api private
*/
Monitor.prototype.pollCheck = function (err, check) {
if (err || !check) return;
check.lastTested = new Date();
check.save();
Monitor.prototype.pollCheck = function(check) {
if (!check) return;
var api = url.parse(this.config.pingCreatorUrl);
api.method = 'PUT';
//check.lastTested = new Date();
//check.save();
p = poller.createPoller(check.url, function(time, error) {
var req = http.request(api, function(res) {

})
Ping.createForCheck(check, false, time, error);
//check.setLastTest(false).save();
}, function(time) {
Ping.createForCheck(check, true, time);
//check.setLastTest(true).save();
});
p.timeout = this.timeout;
if (this.proxy.host) {
p.proxy = this.proxy;
if (this.config.proxy.host) {
p.proxy = this.config.proxy;
}
//p.setDebug(true);
p.poll();
Expand Down Expand Up @@ -111,25 +148,23 @@ Monitor.prototype.aggregateQos = function() {
var TagHourlyStat = require('../models/tagHourlyStat');
TagHourlyStat.updateLastDayQos.apply(TagHourlyStat);
TagHourlyStat.updateLastMonthQos.apply(TagHourlyStat);
Ping.cleanup(this.oldestHistory);
CheckEvent.cleanup(this.oldestHistory);
Ping.cleanup(this.config.oldestHistory);
CheckEvent.cleanup(this.config.oldestHistory);
}

/**
* Create a monitor to poll all checks at a given interval.
*
* Example:
*
* m = monitor.createMonitor(60000);
* m = monitor.createMonitor({ pollingInterval: 60000});
* m.start();
* // the polling starts, every 60 seconds
* m.stop();
*
* @param {Number} Polling interval in milliseconds
* @param {Number} Update interval in milliseconds
* @param {Number} Request timeout in milliseconds
* @param {Object} Configuration object
* @api public
*/
exports.createMonitor = function(pollingInterval, updateInterval, qosAggregationInterval, timeout, oldestHistory) {
return new Monitor(pollingInterval, updateInterval, qosAggregationInterval, timeout, oldestHistory);
exports.createMonitor = function(config) {
return new Monitor(config);
}
8 changes: 6 additions & 2 deletions models/check.js
Expand Up @@ -155,9 +155,13 @@ Check.statics.convertTags = function(tags) {
* @api public
*/
Check.statics.callForChecksNeedingPoll = function(callback) {
this.find().$where(function() {
this.needingPoll().each(callback);
}

Check.statics.needingPoll = function() {
return this.$where(function() {
return !this.lastTested || (Date.now() - this.lastTested.getTime()) > (this.interval || 60000);
}).each(callback);
});
}

Check.statics.updateAllQos = function(callback) {
Expand Down
5 changes: 3 additions & 2 deletions models/ping.js
Expand Up @@ -22,7 +22,6 @@ Ping.methods.findCheck = function(callback) {
}

Ping.statics.createForCheck = function(check, status, time, error, callback) {
check.setLastTest(status).save();
ping = new this();
ping.check = check;
ping.tags = check.tags;
Expand All @@ -37,7 +36,9 @@ Ping.statics.createForCheck = function(check, status, time, error, callback) {
ping.downtime = check.interval || 60000;
ping.error = error;
};
ping.save(callback);
ping.save(function(err) {
callback(err, ping);
});
}

var mapCheckAndTags = function() {
Expand Down

0 comments on commit d37957c

Please sign in to comment.