diff --git a/README.md b/README.md index 058b493..636c36b 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,8 @@ unity-cache-server [arguments] -P, --cache-path [path] Specify the path of the cache directory. -l, --log-level Specify the level of log verbosity. Valid values are 0 (silent) through 5 (debug). Default is 3 -w, --workers Number of worker threads to spawn. Default is 0 + --statsd-server [host] Send statsd metrics to this host + --statsd-tags [key:val,...] Extra tags for statsd metrics -m --mirror [host:port] Mirror transactions to another cache server. Can be repeated for multiple mirrors. -m, --monitor-parent-process Monitor a parent process and exit if it dies -h, --help output usage information diff --git a/lib/server/command_processor.js b/lib/server/command_processor.js index cb67e44..fbddc00 100644 --- a/lib/server/command_processor.js +++ b/lib/server/command_processor.js @@ -12,18 +12,23 @@ const kReadStateCommand = Symbol("readStateCommand"); const kReadStatePutStream = Symbol("readStatePutStream"); const kReadStateNone = Symbol("readStateNone"); +const StatsDClient = require('statsd-client'); + class CommandProcessor extends Duplex { /** * * @param {CacheBase} cache */ - constructor(cache) { + constructor(cache, statsd) { super(); this[kCache] = cache; this[kSendFileQueue] = []; this._setWriteHandler(kReadStateVersion); + this.statsd = statsd || new StatsDClient(); + this.transactionTimer; + /** * * @type {PutTransaction} @@ -239,22 +244,27 @@ class CommandProcessor extends Duplex { switch(cmd) { case 'q': + this.statsd.increment('cmd', 1, {cmd: 'quit'}); await this._quit(); break; case 'ga': case 'gi': case 'gr': + this.statsd.increment('cmd', 1, {cmd: 'get', type}); await this._onGet(type, guid, hash); break; case 'ts': + this.statsd.increment('cmd', 1, {cmd: 'trx'}); await this._onTransactionStart(guid, hash); break; case 'te': + this.statsd.increment('cmd', 1, {cmd: 'trx_end'}); await this._onTransactionEnd(); break; case 'pa': case 'pi': case 'pr': + this.statsd.increment('cmd', 1, {cmd: 'put', type}); await this._onPut(type, size); break; default: @@ -286,6 +296,9 @@ class CommandProcessor extends Duplex { this._sendFileQueueCount++; helpers.log(consts.LOG_DBG, `Adding file to send queue, size ${info.size}`); + + this.statsd.increment('hit', 1, {type}); + this.statsd.histogram('get-size', info.size, {type}) } catch(err) { const resp = Buffer.from(`-${type}`, 'ascii'); @@ -293,6 +306,8 @@ class CommandProcessor extends Duplex { exists: false, header: Buffer.concat([resp, guid, hash], 34) }); + + this.statsd.increment('miss', 1, {type}); } finally { if(this[kSendFileQueue].length === 1) { @@ -313,6 +328,7 @@ class CommandProcessor extends Duplex { helpers.log(consts.LOG_DBG, "Cancel previous transaction"); this._trx = null; } + this.transactionTimer = new Date(); this._trx = await this[kCache].createPutTransaction(guid, hash); helpers.log(consts.LOG_DBG, `Start transaction for GUID: ${helpers.GUIDBufferToString(guid)} Hash: ${hash.toString('hex')}`); @@ -328,6 +344,11 @@ class CommandProcessor extends Duplex { throw new Error("Invalid transaction isolation"); } + if (this.transactionTimer) { + this.statsd.timing('trx-latency', this.transactionTimer); + this.transactionTimer = null; + } + await this[kCache].endPutTransaction(this._trx); this.emit('onTransactionEnd', this._trx); helpers.log(consts.LOG_DBG, `End transaction for GUID: ${helpers.GUIDBufferToString(this._trx.guid)} Hash: ${this._trx.hash.toString('hex')}`); diff --git a/lib/server/server.js b/lib/server/server.js index 192aaa2..00e4e58 100644 --- a/lib/server/server.js +++ b/lib/server/server.js @@ -6,6 +6,7 @@ const ClientStreamProcessor = require('./client_stream_processor'); const CommandProcessor = require('./command_processor'); const TransactionMirror = require('./transaction_mirror'); const ip = require('ip'); +const StatsDClient = require('statsd-client'); class CacheServer { /** @@ -27,6 +28,9 @@ class CacheServer { options.mirror = [].concat(options.mirror); this._mirrors = options.mirror.map(m => new TransactionMirror(m, cache)); } + + // TODO: Add some keys and what not... + this.statsd = options.statsd || new StatsDClient({}); } /** @@ -59,7 +63,7 @@ class CacheServer { this._server = net.createServer(socket => { helpers.log(consts.LOG_TEST, `${socket.remoteAddress}:${socket.remotePort} connected.`); - const cmdProc = new CommandProcessor(self.cache); + const cmdProc = new CommandProcessor(self.cache, self.statsd); // TODO: Prune mirror list to exclude the incoming socket address, to prevent looping transactions const mirrors = self._mirrors; diff --git a/main.js b/main.js index 1fc1b58..4bfc570 100755 --- a/main.js +++ b/main.js @@ -10,6 +10,7 @@ const prompt = require('prompt'); const dns = require('dns'); const ip = require('ip'); const VERSION = require('./package.json').version; +const StatsDClient = require('statsd-client'); function myParseInt(val, def) { val = parseInt(val); @@ -25,6 +26,16 @@ function collect(val, memo) { return memo; } + +function parseKeyValues(val) { + let obj = {}; + val.split(',').forEach(function (kv) { + let pair = kv.split(':'); + obj[pair[0]] = pair[1]; + }); + return obj; +} + const defaultCacheModule = config.get("Cache.defaultModule"); program.description("Unity Cache Server") @@ -34,6 +45,8 @@ program.description("Unity Cache Server") .option('-P, --cache-path [path]', `Specify the path of the cache directory`) .option('-l, --log-level ', `Specify the level of log verbosity. Valid values are 0 (silent) through 5 (debug). Default is ${consts.DEFAULT_LOG_LEVEL}`, myParseInt, consts.DEFAULT_LOG_LEVEL) .option('-w, --workers ', `Number of worker threads to spawn. Default is ${consts.DEFAULT_WORKERS}`, zeroOrMore, consts.DEFAULT_WORKERS) + .option('--statsd-server [host]', 'Send statsd metrics to this host', '127.0.0.1') + .option('--statsd-tags [key:val,...]', 'Extra tags for statsd metrics', parseKeyValues) .option('-m --mirror [host:port]', `Mirror transactions to another cache server. Can be repeated for multiple mirrors`, collect, []) .option('-m, --monitor-parent-process ', 'Monitor a parent process and exit if it dies', myParseInt, 0); @@ -42,6 +55,12 @@ program.parse(process.argv); helpers.setLogLevel(program.logLevel); helpers.setLogger(program.workers > 0 ? helpers.defaultClusterLogger : helpers.defaultLogger); +const statsd = new StatsDClient({ + prefix: 'cache-server', + host: program.statsdHost, + tags: program.statsdTags +}); + if (program.monitorParentProcess > 0) { function monitor() { function is_running(pid) { @@ -78,7 +97,9 @@ if(program.workers > 0 && !CacheModule.properties.clustering) { let server = null; -let cacheOpts = {}; +let cacheOpts = { + statsd: statsd +}; if(program.cachePath !== null) { cacheOpts.cachePath = program.cachePath; } @@ -158,6 +179,7 @@ function startPrompt() { switch(result.command) { case 'q': helpers.log(consts.LOG_INFO, "Shutting down ..."); + this.statsd.close(); Cache.shutdown().then(() => { server.stop(); process.exit(0); diff --git a/package-lock.json b/package-lock.json index 2f9ef99..2522555 100644 --- a/package-lock.json +++ b/package-lock.json @@ -35,16 +35,18 @@ "integrity": "sha1-FopHAXVran9RoSzgyXv6KMCE7WM=" }, "commander": { - "version": "https://registry.npmjs.org/commander/-/commander-2.11.0.tgz", - "integrity": "sha1-FXFS/R56bI2YpbcVzzdt+SgARWM=" + "version": "2.13.0", + "resolved": "https://registry.npmjs.org/commander/-/commander-2.13.0.tgz", + "integrity": "sha512-MVuS359B+YzaWqjCL/c+22gfryv+mCBPHAv3zyVI2GN8EY6IRP8VwtasXn8jyyhvvq84R4ImN1OKRtcbIasjYA==" }, "concat-map": { "version": "https://registry.npmjs.org/concat-map/-/concat-map-0.0.1.tgz", "integrity": "sha1-2Klr13/Wjfd5OnMDajug1UBdR3s=" }, "config": { - "version": "https://registry.npmjs.org/config/-/config-1.27.0.tgz", - "integrity": "sha1-OrMNAID/dvQHwvR6wTJq39kIr18=", + "version": "1.29.2", + "resolved": "https://registry.npmjs.org/config/-/config-1.29.2.tgz", + "integrity": "sha1-Lr3JJjnrnQb//TAvHuMuKtDpThE=", "requires": { "json5": "https://registry.npmjs.org/json5/-/json5-0.4.0.tgz", "os-homedir": "https://registry.npmjs.org/os-homedir/-/os-homedir-1.0.2.tgz" @@ -53,7 +55,7 @@ "coveralls": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/coveralls/-/coveralls-3.0.0.tgz", - "integrity": "sha1-Iu9zAzBTgIDSm4wVHckUav3oipk=", + "integrity": "sha512-ZppXR9y5PraUOrf/DzHJY6gzNUhXYE3b9D43xEXs4QYZ7/Oe0Gy0CS+IPKWFfvQFXB3RG9QduaQUFehzSpGAFw==", "dev": true, "requires": { "js-yaml": "3.10.0", @@ -829,7 +831,7 @@ }, "minimatch": { "version": "https://registry.npmjs.org/minimatch/-/minimatch-3.0.4.tgz", - "integrity": "sha1-UWbihkV/AzBgZL5Ul+jbsMPTIIM=", + "integrity": "sha512-yJHVQEhyqPLUTgt9B83PXu6W3rx4MvvHvSUvToogpwoGDOUQ+yDrR0HRot+yOCdCO7u4hX3pWft6kWBBcqh0UA==", "requires": { "brace-expansion": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-1.1.8.tgz" } @@ -847,7 +849,7 @@ }, "mocha": { "version": "https://registry.npmjs.org/mocha/-/mocha-3.5.3.tgz", - "integrity": "sha1-HgSA/jbS2lhY0etqzDhBiybqog0=", + "integrity": "sha512-/6na001MJWEtYxHOV1WLfsmR4YIynkUEhBwzsb+fk2qmQ3iqsi258l/Q2MWHJMImAcNpZ8DEdYAK72NHoIQ9Eg==", "dev": true, "requires": { "browser-stdout": "https://registry.npmjs.org/browser-stdout/-/browser-stdout-1.3.0.tgz", @@ -2628,6 +2630,11 @@ "resolved": "https://registry.npmjs.org/stack-trace/-/stack-trace-0.0.10.tgz", "integrity": "sha1-VHxws0fo0ytOEI6hoqFZ5f3eGcA=" }, + "statsd-client": { + "version": "0.4.0", + "resolved": "https://registry.npmjs.org/statsd-client/-/statsd-client-0.4.0.tgz", + "integrity": "sha1-Y14EoBvREQQsg8Xl2D6JCG2HNmA=" + }, "supports-color": { "version": "https://registry.npmjs.org/supports-color/-/supports-color-3.1.2.tgz", "integrity": "sha1-cqJiiU2dQIuVbKBf83su2KbiotU=", diff --git a/package.json b/package.json index 77f5820..a5b9cd9 100644 --- a/package.json +++ b/package.json @@ -47,6 +47,7 @@ "lodash": "^4.17.4", "lokijs": "^1.5.1", "prompt": "^1.0.0", + "statsd-client": "^0.4.0", "uuid": "^3.1.0" } }