From d65b767f740414cb3d8a3fad12aa8499354c8dfc Mon Sep 17 00:00:00 2001 From: Morten Siebuhr Date: Sun, 10 Dec 2017 21:41:38 +0100 Subject: [PATCH 1/4] Add in basic statsd-measuing points --- lib/server.js | 20 ++++++++++++++++++++ package.json | 3 ++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/lib/server.js b/lib/server.js index cde8f6d..cdffcbb 100644 --- a/lib/server.js +++ b/lib/server.js @@ -5,6 +5,9 @@ const fs = require('fs'); const consts = require('./constants').Constants; const helpers = require('./helpers'); +const StatsDClient = require('statsd-client'); +const statsd = new StatsDClient({ prefix: 'cache-server' }); + const CMD_QUIT = 'q'.charCodeAt(0); const CMD_GET = 'g'.charCodeAt(0); @@ -59,6 +62,8 @@ class CacheServer { _HandleData(socket, data) { var self = this; + var start = new Date(); + var trxStart = 0; // There is pending data, add it to the data buffer if (socket.pendingData != null) { @@ -81,6 +86,7 @@ class CacheServer { } socket.protocolVersion = helpers.readUInt32(data); + statsd.increment('protocol-version', 1, { protocol_version: socket.protocolVersion }); let buf = Buffer.allocUnsafe(consts.UINT32_SIZE); if (socket.protocolVersion == consts.PROTOCOL_VERSION) { helpers.log(consts.LOG_INFO, "Client protocol version " + socket.protocolVersion); @@ -147,12 +153,15 @@ class CacheServer { } if (data[idx] == CMD_QUIT) { + statsd.increment('cmd', 1, {cmd_Quit: 'quit'}); + statsd.timing('request-latency', start) socket.end(); socket.forceQuit = true; return false; } if (data[idx] == CMD_GET) { + statsd.increment('cmd', 1, {cmd: 'get'}); if (data.length < consts.CMD_SIZE + consts.ID_SIZE) { socket.pendingData = data; return true; @@ -209,6 +218,8 @@ class CacheServer { // handle a transaction else if (data[idx] == CMD_TRX) { + statsd.increment('cmd', 1, {cmd: 'trx'}); + trxStart = new Date(); if (data.length < consts.CMD_SIZE) { socket.pendingData = data; return true; @@ -242,6 +253,9 @@ class CacheServer { continue; } else if (data[idx] == TRX_END) { + statsd.increment('cmd', 1, {cmd: 'trx_end'}); + if (trxStart) { statsd.timing('trx-latency', trxStart) } + if (!socket.inTransaction) { helpers.log(consts.LOG_ERR, "Invalid transaction isolation"); socket.destroy(); @@ -305,9 +319,11 @@ class CacheServer { idx += 1; var reqType = data[idx]; + statsd.increment('cmd', 1, {cmd: 'put', type: reqType}); idx += 1; var size = helpers.readUInt64(data.slice(idx)); + statsd.histogram('put-size', size, { type: reqType }); if (reqType == TYPE_ASSET) { helpers.log(consts.LOG_TEST, "Put Asset Binary " + socket.currentGuid + "-" + socket.currentHash + " (size " + size + ")"); @@ -345,6 +361,7 @@ class CacheServer { // handle check integrity else if (data[idx] == CMD_INTEGRITY) { + statsd.increment('cmd', 1, {cmd: 'integrity'}); if (data.length < consts.CMD_SIZE + 1) { socket.pendingData = data; return true; @@ -411,6 +428,7 @@ class CacheServer { var buf = Buffer.allocUnsafe(consts.CMD_SIZE + consts.ID_SIZE); buf[0] = CMD_GETNOK; buf[1] = type; + statsd.increment('miss', 1, {type}); resbuf.copy(buf, consts.CMD_SIZE, consts.CMD_SIZE + consts.UINT64_SIZE, consts.CMD_SIZE + consts.UINT64_SIZE + consts.ID_SIZE); try { socket.write(buf); @@ -453,6 +471,8 @@ class CacheServer { else { resbuf[0] = CMD_GETOK; resbuf[1] = type; + statsd.increment('hit', 1, {type}); + statsd.histogram('get-size', stats.size, { type }); helpers.log(consts.LOG_TEST, "Found: " + next.cacheStream + " size:" + stats.size); resbuf.slice(consts.CMD_SIZE).write(helpers.encodeInt64(stats.size)); diff --git a/package.json b/package.json index c2780d0..b210a47 100644 --- a/package.json +++ b/package.json @@ -36,6 +36,7 @@ "mocha-lcov-reporter": "^1.3.0" }, "dependencies": { - "commander": "^2.11.0" + "commander": "^2.11.0", + "statsd-client": "^0.4.0" } } From 4f098e341bd8abf9b66d730572fbc736cee7c7d8 Mon Sep 17 00:00:00 2001 From: Morten Siebuhr Date: Mon, 11 Dec 2017 08:55:54 +0100 Subject: [PATCH 2/4] Metrics: Attach statsd to CacheServer instance --- lib/server.js | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/lib/server.js b/lib/server.js index cdffcbb..4b5316e 100644 --- a/lib/server.js +++ b/lib/server.js @@ -6,7 +6,6 @@ const consts = require('./constants').Constants; const helpers = require('./helpers'); const StatsDClient = require('statsd-client'); -const statsd = new StatsDClient({ prefix: 'cache-server' }); const CMD_QUIT = 'q'.charCodeAt(0); @@ -36,6 +35,10 @@ class CacheServer { if (!port && port !== 0) this._port = consts.DEFAULT_PORT; this._sever = null; + + this.statsd = new StatsDClient({ + prefix: 'cache-server' + }); } get port() { @@ -86,7 +89,7 @@ class CacheServer { } socket.protocolVersion = helpers.readUInt32(data); - statsd.increment('protocol-version', 1, { protocol_version: socket.protocolVersion }); + self.statsd.increment('protocol-version', 1, { protocol_version: socket.protocolVersion }); let buf = Buffer.allocUnsafe(consts.UINT32_SIZE); if (socket.protocolVersion == consts.PROTOCOL_VERSION) { helpers.log(consts.LOG_INFO, "Client protocol version " + socket.protocolVersion); @@ -153,15 +156,15 @@ class CacheServer { } if (data[idx] == CMD_QUIT) { - statsd.increment('cmd', 1, {cmd_Quit: 'quit'}); - statsd.timing('request-latency', start) + self.statsd.increment('cmd', 1, {cmd_Quit: 'quit'}); + self.statsd.timing('request-latency', start) socket.end(); socket.forceQuit = true; return false; } if (data[idx] == CMD_GET) { - statsd.increment('cmd', 1, {cmd: 'get'}); + self.statsd.increment('cmd', 1, {cmd: 'get'}); if (data.length < consts.CMD_SIZE + consts.ID_SIZE) { socket.pendingData = data; return true; @@ -218,7 +221,7 @@ class CacheServer { // handle a transaction else if (data[idx] == CMD_TRX) { - statsd.increment('cmd', 1, {cmd: 'trx'}); + self.statsd.increment('cmd', 1, {cmd: 'trx'}); trxStart = new Date(); if (data.length < consts.CMD_SIZE) { socket.pendingData = data; @@ -253,8 +256,8 @@ class CacheServer { continue; } else if (data[idx] == TRX_END) { - statsd.increment('cmd', 1, {cmd: 'trx_end'}); - if (trxStart) { statsd.timing('trx-latency', trxStart) } + self.statsd.increment('cmd', 1, {cmd: 'trx_end'}); + if (trxStart) { self.statsd.timing('trx-latency', trxStart) } if (!socket.inTransaction) { helpers.log(consts.LOG_ERR, "Invalid transaction isolation"); @@ -319,11 +322,11 @@ class CacheServer { idx += 1; var reqType = data[idx]; - statsd.increment('cmd', 1, {cmd: 'put', type: reqType}); + self.statsd.increment('cmd', 1, {cmd: 'put', type: reqType}); idx += 1; var size = helpers.readUInt64(data.slice(idx)); - statsd.histogram('put-size', size, { type: reqType }); + self.statsd.histogram('put-size', size, { type: reqType }); if (reqType == TYPE_ASSET) { helpers.log(consts.LOG_TEST, "Put Asset Binary " + socket.currentGuid + "-" + socket.currentHash + " (size " + size + ")"); @@ -361,7 +364,7 @@ class CacheServer { // handle check integrity else if (data[idx] == CMD_INTEGRITY) { - statsd.increment('cmd', 1, {cmd: 'integrity'}); + self.statsd.increment('cmd', 1, {cmd: 'integrity'}); if (data.length < consts.CMD_SIZE + 1) { socket.pendingData = data; return true; @@ -428,7 +431,7 @@ class CacheServer { var buf = Buffer.allocUnsafe(consts.CMD_SIZE + consts.ID_SIZE); buf[0] = CMD_GETNOK; buf[1] = type; - statsd.increment('miss', 1, {type}); + self.statsd.increment('miss', 1, {type}); resbuf.copy(buf, consts.CMD_SIZE, consts.CMD_SIZE + consts.UINT64_SIZE, consts.CMD_SIZE + consts.UINT64_SIZE + consts.ID_SIZE); try { socket.write(buf); @@ -471,8 +474,8 @@ class CacheServer { else { resbuf[0] = CMD_GETOK; resbuf[1] = type; - statsd.increment('hit', 1, {type}); - statsd.histogram('get-size', stats.size, { type }); + self.statsd.increment('hit', 1, {type}); + self.statsd.histogram('get-size', stats.size, { type }); helpers.log(consts.LOG_TEST, "Found: " + next.cacheStream + " size:" + stats.size); resbuf.slice(consts.CMD_SIZE).write(helpers.encodeInt64(stats.size)); From a7417a0c04f7d110b3c1f141a563c3eaeb4e1de5 Mon Sep 17 00:00:00 2001 From: Morten Siebuhr Date: Mon, 11 Dec 2017 08:59:02 +0100 Subject: [PATCH 3/4] Metrics: Allow setting upstream statsd-server --- lib/server.js | 9 +++++---- main.js | 6 +++++- test/server.js | 2 +- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/lib/server.js b/lib/server.js index 4b5316e..49d9136 100644 --- a/lib/server.js +++ b/lib/server.js @@ -29,15 +29,16 @@ const OPT_FIX = 'f'.charCodeAt(0); class CacheServer { - constructor(cache, port) { + constructor(cache, opts) { this._cache = cache; - this._port = parseInt(port); - if (!port && port !== 0) + this._port = parseInt(opts.port); + if (!opts.port && opts.port !== 0) this._port = consts.DEFAULT_PORT; this._sever = null; this.statsd = new StatsDClient({ - prefix: 'cache-server' + prefix: 'cache-server', + host: opts.statsdServer ? opts.statsdServer : '127.0.0.1' }); } diff --git a/main.js b/main.js index 73aa79e..7082560 100755 --- a/main.js +++ b/main.js @@ -25,6 +25,7 @@ program.description("Unity Cache Server") .option('-w, --workers ', 'Number of worker threads to spawn. Default is 1 for every 2 CPUs reported by the OS', atLeastOne, consts.DEFAULT_WORKERS) .option('-v, --verify', 'Verify the Cache Server integrity, without fixing errors') .option('-f, --fix', 'Fix errors found while verifying the Cache Server integrity') + .option('--statsd-server [host]', 'Send statsd-metrics to this host') .option('-m, --monitor-parent-process ', 'Monitor a parent process and exit if it dies', myParseInt, 0) .parse(process.argv); @@ -76,7 +77,10 @@ var errHandler = function () { process.exit(1); }; -var server = new CacheServer(cache, program.port); +var server = new CacheServer(cache, { + port: program.port, + statsdServer: program.statsdServer +}); if(cluster.isMaster) { helpers.log(consts.LOG_INFO, "Cache Server version " + consts.VERSION); diff --git a/test/server.js b/test/server.js index 8b63987..2f3d2c1 100644 --- a/test/server.js +++ b/test/server.js @@ -15,7 +15,7 @@ const MAX_BLOB_SIZE = 2048; helpers.SetLogger(()=>{}); var cache = new CacheFS(helpers.generateTempDir(), CACHE_SIZE); -var server = new CacheServer(cache, 0); +var server = new CacheServer(cache, {port: 0}); var client; var cmd = { From fee7dc5d07262a7b6c94eab55f77e4a1f414eec2 Mon Sep 17 00:00:00 2001 From: Morten Siebuhr Date: Mon, 11 Dec 2017 09:10:33 +0100 Subject: [PATCH 4/4] Metrics: Allow setting custom key-values on all metrics data --- lib/server.js | 1 + main.js | 13 ++++++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/lib/server.js b/lib/server.js index 49d9136..de7cbb4 100644 --- a/lib/server.js +++ b/lib/server.js @@ -38,6 +38,7 @@ class CacheServer { this.statsd = new StatsDClient({ prefix: 'cache-server', + tags: opts.statsdTags, host: opts.statsdServer ? opts.statsdServer : '127.0.0.1' }); } diff --git a/main.js b/main.js index 7082560..dd76db4 100755 --- a/main.js +++ b/main.js @@ -16,6 +16,15 @@ function atLeastOne(val) { return Math.max(1, val); } +function parseKeyValues(val) { + let obj = {}; + val.split(',').forEach(function (kv) { + let pair = kv.split(':'); + obj[pair[0]] = pair[1]; + }); + return obj; +} + program.description("Unity Cache Server") .version(consts.VERSION) .option('-s, --size ', 'Specify the maximum allowed size of the LRU cache. Files that have not been used recently will automatically be discarded when the cache size is exceeded. Default is 50Gb', myParseInt, consts.DEFAULT_CACHE_SIZE) @@ -25,7 +34,8 @@ program.description("Unity Cache Server") .option('-w, --workers ', 'Number of worker threads to spawn. Default is 1 for every 2 CPUs reported by the OS', atLeastOne, consts.DEFAULT_WORKERS) .option('-v, --verify', 'Verify the Cache Server integrity, without fixing errors') .option('-f, --fix', 'Fix errors found while verifying the Cache Server integrity') - .option('--statsd-server [host]', 'Send statsd-metrics to this host') + .option('--statsd-server [host]', 'Send statsd metrics to this host') + .option('--statsd-tags [key:val,...]', 'Extra tags for statsd metrics', parseKeyValues) .option('-m, --monitor-parent-process ', 'Monitor a parent process and exit if it dies', myParseInt, 0) .parse(process.argv); @@ -79,6 +89,7 @@ var errHandler = function () { var server = new CacheServer(cache, { port: program.port, + statsdTags: program.statsdTags, statsdServer: program.statsdServer });