diff --git a/lib/server.js b/lib/server.js index cde8f6d..de7cbb4 100644 --- a/lib/server.js +++ b/lib/server.js @@ -5,6 +5,8 @@ const fs = require('fs'); const consts = require('./constants').Constants; const helpers = require('./helpers'); +const StatsDClient = require('statsd-client'); + const CMD_QUIT = 'q'.charCodeAt(0); const CMD_GET = 'g'.charCodeAt(0); @@ -27,12 +29,18 @@ const OPT_FIX = 'f'.charCodeAt(0); class CacheServer { - constructor(cache, port) { + constructor(cache, opts) { this._cache = cache; - this._port = parseInt(port); - if (!port && port !== 0) + this._port = parseInt(opts.port); + if (!opts.port && opts.port !== 0) this._port = consts.DEFAULT_PORT; this._sever = null; + + this.statsd = new StatsDClient({ + prefix: 'cache-server', + tags: opts.statsdTags, + host: opts.statsdServer ? opts.statsdServer : '127.0.0.1' + }); } get port() { @@ -59,6 +67,8 @@ class CacheServer { _HandleData(socket, data) { var self = this; + var start = new Date(); + var trxStart = 0; // There is pending data, add it to the data buffer if (socket.pendingData != null) { @@ -81,6 +91,7 @@ class CacheServer { } socket.protocolVersion = helpers.readUInt32(data); + self.statsd.increment('protocol-version', 1, { protocol_version: socket.protocolVersion }); let buf = Buffer.allocUnsafe(consts.UINT32_SIZE); if (socket.protocolVersion == consts.PROTOCOL_VERSION) { helpers.log(consts.LOG_INFO, "Client protocol version " + socket.protocolVersion); @@ -147,12 +158,15 @@ class CacheServer { } if (data[idx] == CMD_QUIT) { + self.statsd.increment('cmd', 1, {cmd_Quit: 'quit'}); + self.statsd.timing('request-latency', start) socket.end(); socket.forceQuit = true; return false; } if (data[idx] == CMD_GET) { + self.statsd.increment('cmd', 1, {cmd: 'get'}); if (data.length < consts.CMD_SIZE + consts.ID_SIZE) { socket.pendingData = data; return true; @@ -209,6 +223,8 @@ class CacheServer { // handle a transaction else if (data[idx] == CMD_TRX) { + self.statsd.increment('cmd', 1, {cmd: 'trx'}); + trxStart = new Date(); if (data.length < consts.CMD_SIZE) { socket.pendingData = data; return true; @@ -242,6 +258,9 @@ class CacheServer { continue; } else if (data[idx] == TRX_END) { + self.statsd.increment('cmd', 1, {cmd: 'trx_end'}); + if (trxStart) { self.statsd.timing('trx-latency', trxStart) } + if (!socket.inTransaction) { helpers.log(consts.LOG_ERR, "Invalid transaction isolation"); socket.destroy(); @@ -305,9 +324,11 @@ class CacheServer { idx += 1; var reqType = data[idx]; + self.statsd.increment('cmd', 1, {cmd: 'put', type: reqType}); idx += 1; var size = helpers.readUInt64(data.slice(idx)); + self.statsd.histogram('put-size', size, { type: reqType }); if (reqType == TYPE_ASSET) { helpers.log(consts.LOG_TEST, "Put Asset Binary " + socket.currentGuid + "-" + socket.currentHash + " (size " + size + ")"); @@ -345,6 +366,7 @@ class CacheServer { // handle check integrity else if (data[idx] == CMD_INTEGRITY) { + self.statsd.increment('cmd', 1, {cmd: 'integrity'}); if (data.length < consts.CMD_SIZE + 1) { socket.pendingData = data; return true; @@ -411,6 +433,7 @@ class CacheServer { var buf = Buffer.allocUnsafe(consts.CMD_SIZE + consts.ID_SIZE); buf[0] = CMD_GETNOK; buf[1] = type; + self.statsd.increment('miss', 1, {type}); resbuf.copy(buf, consts.CMD_SIZE, consts.CMD_SIZE + consts.UINT64_SIZE, consts.CMD_SIZE + consts.UINT64_SIZE + consts.ID_SIZE); try { socket.write(buf); @@ -453,6 +476,8 @@ class CacheServer { else { resbuf[0] = CMD_GETOK; resbuf[1] = type; + self.statsd.increment('hit', 1, {type}); + self.statsd.histogram('get-size', stats.size, { type }); helpers.log(consts.LOG_TEST, "Found: " + next.cacheStream + " size:" + stats.size); resbuf.slice(consts.CMD_SIZE).write(helpers.encodeInt64(stats.size)); diff --git a/main.js b/main.js index 73aa79e..dd76db4 100755 --- a/main.js +++ b/main.js @@ -16,6 +16,15 @@ function atLeastOne(val) { return Math.max(1, val); } +function parseKeyValues(val) { + let obj = {}; + val.split(',').forEach(function (kv) { + let pair = kv.split(':'); + obj[pair[0]] = pair[1]; + }); + return obj; +} + program.description("Unity Cache Server") .version(consts.VERSION) .option('-s, --size ', 'Specify the maximum allowed size of the LRU cache. Files that have not been used recently will automatically be discarded when the cache size is exceeded. Default is 50Gb', myParseInt, consts.DEFAULT_CACHE_SIZE) @@ -25,6 +34,8 @@ program.description("Unity Cache Server") .option('-w, --workers ', 'Number of worker threads to spawn. Default is 1 for every 2 CPUs reported by the OS', atLeastOne, consts.DEFAULT_WORKERS) .option('-v, --verify', 'Verify the Cache Server integrity, without fixing errors') .option('-f, --fix', 'Fix errors found while verifying the Cache Server integrity') + .option('--statsd-server [host]', 'Send statsd metrics to this host') + .option('--statsd-tags [key:val,...]', 'Extra tags for statsd metrics', parseKeyValues) .option('-m, --monitor-parent-process ', 'Monitor a parent process and exit if it dies', myParseInt, 0) .parse(process.argv); @@ -76,7 +87,11 @@ var errHandler = function () { process.exit(1); }; -var server = new CacheServer(cache, program.port); +var server = new CacheServer(cache, { + port: program.port, + statsdTags: program.statsdTags, + statsdServer: program.statsdServer +}); if(cluster.isMaster) { helpers.log(consts.LOG_INFO, "Cache Server version " + consts.VERSION); diff --git a/package.json b/package.json index c2780d0..b210a47 100644 --- a/package.json +++ b/package.json @@ -36,6 +36,7 @@ "mocha-lcov-reporter": "^1.3.0" }, "dependencies": { - "commander": "^2.11.0" + "commander": "^2.11.0", + "statsd-client": "^0.4.0" } } diff --git a/test/server.js b/test/server.js index 8b63987..2f3d2c1 100644 --- a/test/server.js +++ b/test/server.js @@ -15,7 +15,7 @@ const MAX_BLOB_SIZE = 2048; helpers.SetLogger(()=>{}); var cache = new CacheFS(helpers.generateTempDir(), CACHE_SIZE); -var server = new CacheServer(cache, 0); +var server = new CacheServer(cache, {port: 0}); var client; var cmd = {