Permalink
Browse files

First new working version

  • Loading branch information...
felixge committed Nov 18, 2011
1 parent fec0c4d commit 2f61b334ff0efab41d40cf271986bd0512a7eadb
Showing with 277 additions and 2 deletions.
  1. +1 −0 index.js
  2. +34 −0 lib/CarbonClient.js
  3. +44 −0 lib/GraphiteClient.js
  4. +2 −1 package.json
  5. +3 −1 test/common.js
  6. +53 −0 test/helper/CarbonServer.js
  7. +42 −0 test/integration/test-send.js
  8. +98 −0 test/unit/test-GraphiteClient.js
View
@@ -0,0 +1 @@
+module.exports = require('./lib/GraphiteClient');
View
@@ -0,0 +1,34 @@
+var LazySocket = require('lazy-socket');
+var url = require('url');
+
+module.exports = CarbonClient;
+function CarbonClient(properties) {
+ this._dsn = properties.dsn;
+ this._socket = null;
+}
+
+CarbonClient.prototype.write = function(metrics, timestamp, cb) {
+ this._lazyConnect();
+
+ var lines = '';
+ for (var path in metrics) {
+ var value = metrics[path];
+ lines += [path, value, timestamp].join(' ') + '\n';
+ }
+
+ this._socket.write(lines, 'utf-8', cb);
+};
+
+CarbonClient.prototype._lazyConnect = function() {
+ if (this._socket) return;
+
+ var dsn = url.parse(this._dsn);
+ var port = parseInt(dsn.port, 10) || 2003;
+ var host = dsn.hostname;
+
+ this._socket = LazySocket.createConnection(port, host);
+};
+
+CarbonClient.prototype.end = function() {
+ this._socket.end();
+};
View
@@ -0,0 +1,44 @@
+var CarbonClient = require('./CarbonClient');
+
+module.exports = GraphiteClient;
+function GraphiteClient(properties) {
+ this._carbon = properties.carbon;
+}
+
+GraphiteClient.createClient = function(carbonDsn) {
+ var client = new this({
+ carbon: new CarbonClient({dsn: carbonDsn}),
+ });
+ return client;
+};
+
+GraphiteClient.flatten = function(obj, flat, prefix) {
+ flat = flat || {};
+ prefix = prefix || '';
+
+ for (var key in obj) {
+ var value = obj[key];
+ if (typeof value === 'object') {
+ this.flatten(value, flat, prefix + key + '.');
+ } else {
+ flat[prefix + key] = value;
+ }
+ }
+
+ return flat;
+};
+
+GraphiteClient.prototype.write = function(metrics, timestamp, cb) {
+ if (typeof timestamp === 'function') {
+ cb = timestamp;
+ timestamp = null;
+ }
+
+ timestamp = timestamp || Date.now();
+
+ this._carbon.write(GraphiteClient.flatten(metrics), timestamp, cb);
+};
+
+GraphiteClient.prototype.end = function() {
+ this._carbon.end();
+};
View
@@ -13,7 +13,8 @@
"node": "*"
},
"dependencies": {
- "request": "2.1.1"
+ "request": "2.1.1",
+ "lazy-socket": "0.0.3"
},
"devDependencies": {
"urun": "0.0.3",
View
@@ -5,4 +5,6 @@ common.dir = {}
common.dir.root = path.dirname(__dirname);
common.dir.lib = path.join(common.dir.root, 'lib');
-common.graphite = require(common.dir.root);
+common.graphite = require(common.dir.root);
+common.port = 12523;
+common.carbonDsn = 'plaintext://localhost:' + common.port + '/';
@@ -0,0 +1,53 @@
+var net = require('net');
+
+module.exports = CarbonServer;
+function CarbonServer() {
+ this.metrics = [];
+ this._server = null;
+}
+
+CarbonServer.prototype.listen = function(port, cb) {
+ if (!this._server) {
+ this._server = net.createServer(this._handleSocket.bind(this));
+ }
+
+ this._server.listen(port, cb);
+};
+
+CarbonServer.prototype.close = function() {
+ this._server.close();
+};
+
+CarbonServer.prototype._handleSocket = function(socket) {
+ var self = this;
+ var buffer = '';
+
+ socket.setEncoding('utf8');
+ socket
+ .on('data', function(chunk) {
+ buffer += chunk;
+
+ while (buffer.length) {
+ var index = buffer.indexOf('\n');
+
+ if (index === -1) break;
+
+ var line = buffer.substr(0, index);
+ buffer = buffer.substr(index + 1);
+
+ self._parseLine(line);
+ }
+ })
+ .on('end', function() {
+ self.close();
+ });
+};
+
+CarbonServer.prototype._parseLine = function(line) {
+ var parts = line.split(/ /g);
+ this.metrics.push({
+ path : parts[0],
+ value : parts[1],
+ timestamp : parseInt(parts[2], 10),
+ });
+};
@@ -0,0 +1,42 @@
+var common = require('../common');
+var assert = require('assert');
+var graphite = common.graphite;
+var CarbonServer = require('../helper/CarbonServer');
+
+var server = new CarbonServer();
+server.listen(common.port, function() {
+ var client = graphite.createClient(common.carbonDsn);
+ var metrics = {
+ foo: 1,
+ deep: {
+ down: {
+ a: 2,
+ b: 3,
+ }
+ }
+ };
+
+ client.write(metrics, function(err) {
+ assert.ok(!err);
+
+ client.end();
+ });
+});
+
+process.on('exit', function() {
+ var metric = server.metrics.shift();
+ assert.equal(metric.path, 'foo');
+ assert.equal(metric.value, 1);
+ assert.ok(metric.timestamp + 1000 >= Date.now());
+
+ metric = server.metrics.shift();
+ assert.equal(metric.path, 'deep.down.a');
+ assert.equal(metric.value, 2);
+ assert.ok(metric.timestamp + 1000 >= Date.now());
+
+ metric = server.metrics.shift();
+ assert.equal(metric.path, 'deep.down.b');
+ assert.equal(metric.value, 3);
+ assert.ok(metric.timestamp + 1000 >= Date.now());
+});
+
@@ -0,0 +1,98 @@
+var common = require('../common');
+var test = require('utest');
+var assert = require('assert');
+var sinon = require('sinon');
+var graphite = common.graphite;
+var GraphiteClient = graphite;
+
+test('graphite.createClient', {
+ 'returns a new GraphiteClient': function() {
+ var client = graphite.createClient();
+ assert.ok(client instanceof GraphiteClient);
+ },
+
+ 'takes carbon dsn first and creates lazy socket': function() {
+ var client = graphite.createClient('plaintext://example.org:8080/');
+ },
+});
+
+test('graphite.flatten', {
+ 'returns an already flat object as is': function() {
+ var obj = {foo: 'bar'};
+ assert.deepEqual(graphite.flatten(obj), {foo: 'bar'});
+ },
+
+ 'returns a copy of the object': function() {
+ var obj = {foo: 'bar'};
+ var flat = graphite.flatten(obj);
+
+ assert.notStrictEqual(obj, flat);
+ },
+
+ 'flattens a deep object': function() {
+ var obj = {
+ a: 1,
+ deep: {
+ we: {
+ go: {
+ b: 2,
+ c: 3,
+ }
+ }
+ },
+ d: 4,
+ };
+ var flat = graphite.flatten(obj);
+
+ assert.deepEqual(flat, {
+ 'a' : 1,
+ 'deep.we.go.b' : 2,
+ 'deep.we.go.c' : 3,
+ 'd' : 4,
+ });
+ },
+});
+
+var client;
+var carbon;
+test('GraphiteClient', {
+ before: function() {
+ carbon = sinon.stub({
+ write: function() {},
+ });
+ client = new GraphiteClient({carbon: carbon});
+ },
+
+ '#write flattens metrics before passing to carbon': function() {
+ var metrics = {foo: {bar: 1}};
+ client.write(metrics);
+
+ assert.ok(carbon.write.calledWith({'foo.bar': 1}));
+ },
+
+ '#write passes the current time to carbon': function() {
+ client.write({});
+
+ assert.ok(carbon.write.getCall(0).args[1] >= Date.now());
+ },
+
+ '#write lets you pass a timestamp to carbon': function() {
+ client.write({}, 23);
+
+ assert.equal(carbon.write.getCall(0).args[1], 23);
+ },
+
+ '#write passes a callback to carbon': function() {
+ var cb = function() {};
+ client.write({}, null, cb);
+
+ assert.equal(carbon.write.getCall(0).args[2], cb);
+ },
+
+ '#write takes callback as second argument as well': function() {
+ var cb = function() {};
+ client.write({}, cb);
+
+ assert.equal(carbon.write.getCall(0).args[2], cb);
+ },
+});

0 comments on commit 2f61b33

Please sign in to comment.