Permalink
Browse files

initial commit; needs some cleaning

  • Loading branch information...
0 parents commit 35cff45705830f1eb7cffcb693aaa8e340891893 @scoates scoates committed Sep 20, 2012
Showing with 424 additions and 0 deletions.
  1. +1 −0 .gitignore
  2. +1 −0 README.md
  3. +67 −0 funnel.js
  4. +26 −0 package.json
  5. +37 −0 plugin/cloudwatch.js
  6. +68 −0 plugin/dbi.js
  7. +21 −0 plugin/json.js
  8. +67 −0 plugin/mongo.js
  9. +42 −0 plugin/munin.js
  10. +49 −0 plugin/nagios.js
  11. +44 −0 plugin/shared.js
  12. +1 −0 version.js
@@ -0,0 +1 @@
+node_modules/*
@@ -0,0 +1 @@
+Funnel metrics into StatsD
@@ -0,0 +1,67 @@
+var shared = require('./plugin/shared');
+
+var collect = function (sourcesdotdotdot) {
+
+ var fetchers = [];
+
+ // arguments is not a real array; no forEach
+ var len = arguments.length;
+ for (var i=0; i<len; i++) {
+ fetchers.push(arguments[i]);
+ }
+
+ var fixMetricName = function (name, preserveDot) {
+ var re = preserveDot ? /[^a-z0-9._-]/ig : /[^a-z0-9_-]/ig;
+ return name.replace(re, '-').replace(/-+/g, '-').toLowerCase();
+ };
+
+ var asMetricName = function (data, preserveDot) {
+ var name = ['funnel'];
+ name.push(data.funnel);
+ name.push(fixMetricName(data.nodeName));
+ if (data.serviceName) {
+ name.push(fixMetricName(data.serviceName));
+ }
+ name.push(fixMetricName(data.metricName, preserveDot));
+ return name.join('.');
+ };
+
+ var display = function () {
+ fetchers.forEach(function (fetcher) {
+ fetcher(function (data) {
+ console.log(asMetricName(data, data.preserveMetricNameDot), data.reading);
+ });
+ });
+ };
+
+ var toStatsD = function (host, port) {
+ port = port || 8125;
+ var SDC = require('statsd-client'),
+ sdc = new SDC({host: host, port: port, debug: true});
+ fetchers.forEach(function (fetcher) {
+ fetcher(function (data) {
+ sdc.gauge(asMetricName(data, data.preserveMetricNameDot), data.reading)
+ });
+ });
+ };
+
+ return {
+ toStatsD: toStatsD,
+ display: display
+ };
+};
+
+module.exports = {
+ collect: collect,
+ nagios: require('./plugin/nagios'),
+ mongo: require('./plugin/mongo'),
+ munin: require('./plugin/munin'),
+ json: require('./plugin/json'),
+ cloudwatch: require('./plugin/cloudwatch'),
+ dbi: require('./plugin/dbi'),
+
+ COUNT: shared.COUNT,
+ ALL: shared.ALL,
+ dbiSolo: shared.dbiSolo
+}
+
@@ -0,0 +1,26 @@
+{
+ "name": "metrics-funnel",
+ "description": "Funnel metrics from various sources into StatsD",
+ "version": "0.1.0",
+ "author": "Sean Coates <sean@seancoates.com>",
+ "contributors": [
+ { "name": "Sean Coates", "email": "sean@seancoates.com" }
+ ],
+ "keywords": ["funnel","statsd","graphite","nagios","munin","json","cloudwatch","mongodb", "dbi"],
+ "repository": {
+ "type": "git",
+ "url": "https://github.com/fictivekin/metrics-funnel.git"
+ },
+ "bin": {
+ "metrics_funnel": "funnel.js"
+ },
+ "files": ["funnel.js"],
+ "dependencies": {
+ "munin-client": ">=0.1.0",
+ "mongodb": ">=1.1.0",
+ "aws-lib": ">=0.1.2",
+ "node-dbi": ">=0.6.1",
+ "pg": ">=0.7.2"
+ },
+ "engines": { "node": ">=0.8.0" }
+}
@@ -0,0 +1,37 @@
+module.exports = function (service) {
+ var aws = require ('aws-lib');
+ cw = aws.createCWClient(service.from.id, service.from.secret);
+ var now = new Date,
+ start = new Date(now.getTime() - 1 * 60 * 1000); // 1 min ago
+
+ return function(funneler) {
+ for (var serviceName in service.services) {
+ var thisService = service.services[serviceName];
+ (function (serviceName, thisService) {
+ cw.call(
+ "GetMetricStatistics",
+ {
+ EndTime: now.toISOString(),
+ StartTime: start.toISOString(),
+ Namespace: thisService.namespace,
+ MetricName: thisService.metric,
+ 'Statistics.member.1': thisService.type,
+ 'Dimensions.member.1.Name': thisService.name,
+ 'Dimensions.member.1.Value': thisService.value,
+ Unit: thisService.unit,
+ Period: 60,
+ },
+ function(err, result) {
+ funneler({
+ 'funnel': 'cloudwatch',
+ 'nodeName': thisService.value,
+ 'metricName': serviceName,
+ 'reading': result.GetMetricStatisticsResult.Datapoints.member[thisService.type]
+ });
+ }
+ );
+ })(serviceName, thisService);
+ }
+ }
+};
+
@@ -0,0 +1,68 @@
+var shared = require('./shared');
+
+module.exports = function (service) {
+ var pending = 0;
+
+ var DBWrapper = require('node-dbi').DBWrapper;
+ var DBExpr = require('node-dbi').DBExpr;
+
+ var dbWrapper = new DBWrapper(service.from.adapter, service.from);
+
+ return function(funneler) {
+
+ dbWrapper.connect();
+
+ for (var serviceName in service.services) {
+
+ (function (serviceName, thisService) {
+ pending++;
+ dbWrapper.fetchAll(thisService.query, null, function(err, result) {
+ pending--;
+ if (err) {
+ console.log("Error in dbi:", err);
+ if (0 == pending) {
+ dbWrapper.close(shared.nocb);
+ }
+ return;
+ }
+
+ var reading = thisService.callback(result);
+
+ if ("object" == typeof reading) {
+ if (serviceName.indexOf('%') === -1) {
+ throw "Can't emit multiple values if metric name does not contain %";
+ }
+ for (var k in reading) {
+ var val = reading[k];
+ var metricName = serviceName.replace('%', k);
+ funneler({
+ 'funnel': 'dbi',
+ 'nodeName': thisService.name || service.from.adapter + '-' + service.from.host,
+ 'metricName': metricName,
+ 'reading': val,
+ 'preserveMetricNameDot': true
+ });
+
+ }
+
+ } else { // scalar
+ funneler({
+ 'funnel': 'dbi',
+ 'nodeName': thisService.name || service.from.adapter + '-' + service.from.host,
+ 'metricName': serviceName,
+ 'reading': reading,
+ });
+
+ }
+
+ if (0 == pending) {
+ dbWrapper.close(shared.nocb);
+ }
+ });
+ })(serviceName, service.services[serviceName]);
+
+ };
+
+ };
+};
+
@@ -0,0 +1,21 @@
+var shared = require('./shared');
+
+module.exports = function (service) {
+ var vm = require('vm');
+ return function(funneler) {
+ shared.fromJsonUrl(service.from, function (body, urlParts) {
+ for (var serviceName in service.services) {
+ var thisService = service.services[serviceName];
+ var reading = vm.runInNewContext(thisService, body);
+ funneler({
+ 'funnel': 'json',
+ 'nodeName': service.name || urlParts.host,
+ 'metricName': serviceName,
+ 'reading': reading
+ });
+ }
+ });
+ }
+
+};
+
@@ -0,0 +1,67 @@
+var shared = require('./shared');
+
+module.exports = function (service) {
+ var pending = 0;
+
+ var doCount = function (conn, from, funneler, collection) {
+ conn.collection(collection, function (err, coll) {
+ pending++;
+ coll.count(function (err, count) {
+ pending--;
+ funneler({
+ 'funnel': 'mongo',
+ 'nodeName': from.replace(/^mongodb:\/\//, ''),
+ 'serviceName': collection,
+ 'metricName': 'count',
+ 'reading': count
+ });
+ if (pending == 0) {
+ conn.close();
+ }
+ });
+ });
+ };
+
+ var doQuery = function (conn, from, funneler, serviceName, collection, query) {
+ conn.collection(collection, function (err, coll) {
+ pending++;
+ coll.count(query, function (err, count) {
+ pending--;
+ funneler({
+ 'funnel': 'mongo',
+ 'nodeName': from.replace(/^mongodb:\/\//, ''),
+ 'serviceName': serviceName,
+ 'metricName': 'query',
+ 'reading': count
+ });
+ if (pending == 0) {
+ conn.close();
+ }
+ });
+ });
+ };
+
+ return function(funneler) {
+ if (typeof service.from == 'string') {
+ service.from = [service.from];
+ }
+ var mongodb = require('mongodb');
+ service.from.forEach(function (from) {
+ mongodb.connect(from, function(err, conn) {
+
+ for (var serviceName in service.services) {
+ var collectionName;
+ if (service.services[serviceName] === shared.COUNT) {
+ doCount(conn, from, funneler, serviceName);
+ } else {
+ doQuery(conn, from, funneler, serviceName, service.services[serviceName].collection, service.services[serviceName].query);
+ }
+ }
+
+ });
+
+ });
+
+ };
+};
+
@@ -0,0 +1,42 @@
+var shared = require('./shared');
+
+module.exports = function (service) {
+ var Munin = require('munin-client');
+
+ return function (funneler) {
+ var from = service.from;
+ // cast to array
+ if (typeof from == 'string') {
+ from = [from];
+ }
+ from.forEach(function (host) {
+ var munin = new Munin(host);
+ for (var sName in service.services) {
+ (function (serviceName) { // yum! delicious scope!
+ munin.fetch(serviceName, function(metrics) {
+ for (var metricName in metrics) {
+
+ var capture = false;
+ if (shared.ALL == service.services[serviceName]) {
+ capture = true;
+ } else if (service.services[serviceName].indexOf(metricName) !== -1) {
+ capture = true;
+ }
+ if (capture) {
+ funneler({
+ 'funnel': 'munin',
+ 'nodeName': host,
+ 'serviceName': serviceName,
+ 'metricName': metricName,
+ 'reading': metrics[metricName]
+ });
+ }
+ }
+ });
+ })(sName);
+ }
+ munin.quit();
+ });
+ }
+};
+
@@ -0,0 +1,49 @@
+var shared = require('./shared');
+
+module.exports = function (service) {
+ // TODO: validation
+ return function(funneler) {
+ shared.fromJsonUrl(service.source, function (body) {
+ service.from.forEach(function (from) {
+ var node;
+ if (body.content[from]) {
+ // long name
+ node = body.content[from];
+ } else if (body.content[from.split('.')[0]]) {
+ // short name
+ node = body.content[from.split('.')[0]];
+ } else {
+ console.log("Could not find server", from);
+ return;
+ }
+
+ for (var serviceName in service.services) {
+ if (serviceName in node.services) {
+ var perfDataNames;
+ if (shared.ALL == service.services[serviceName]) {
+ perfDataNames = Object.keys(node.services[serviceName].performance_data);
+ } else {
+ perfDataNames = service.services[serviceName];
+ if (typeof perfDataNames == 'string') {
+ perfDataNames = [perfDataNames];
+ }
+ }
+
+ perfDataNames.forEach(function (perfName) {
+ if (undefined !== node.services[serviceName].performance_data[perfName]) {
+ funneler({
+ 'funnel': 'nagios',
+ 'nodeName': from,
+ 'serviceName': serviceName,
+ 'metricName': perfName,
+ 'reading': node.services[serviceName].performance_data[perfName]
+ });
+ }
+ });
+ }
+ }
+
+ });
+ })
+ }
+};
Oops, something went wrong.

0 comments on commit 35cff45

Please sign in to comment.