Skip to content
This repository was archived by the owner on Jan 9, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ unity-cache-server [arguments]
-P, --cache-path [path] Specify the path of the cache directory.
-l, --log-level <n> Specify the level of log verbosity. Valid values are 0 (silent) through 5 (debug). Default is 3
-w, --workers <n> Number of worker threads to spawn. Default is 0
--statsd-server [host] Send statsd metrics to this host
--statsd-tags [key:val,...] Extra tags for statsd metrics
-m --mirror [host:port] Mirror transactions to another cache server. Can be repeated for multiple mirrors.
-m, --monitor-parent-process <n> Monitor a parent process and exit if it dies
-h, --help output usage information
Expand Down
23 changes: 22 additions & 1 deletion lib/server/command_processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,23 @@ const kReadStateCommand = Symbol("readStateCommand");
const kReadStatePutStream = Symbol("readStatePutStream");
const kReadStateNone = Symbol("readStateNone");

const StatsDClient = require('statsd-client');

class CommandProcessor extends Duplex {

/**
*
* @param {CacheBase} cache
*/
constructor(cache) {
constructor(cache, statsd) {
super();
this[kCache] = cache;
this[kSendFileQueue] = [];
this._setWriteHandler(kReadStateVersion);

this.statsd = statsd || new StatsDClient();
this.transactionTimer;

/**
*
* @type {PutTransaction}
Expand Down Expand Up @@ -239,22 +244,27 @@ class CommandProcessor extends Duplex {

switch(cmd) {
case 'q':
this.statsd.increment('cmd', 1, {cmd: 'quit'});
await this._quit();
break;
case 'ga':
case 'gi':
case 'gr':
this.statsd.increment('cmd', 1, {cmd: 'get', type});
await this._onGet(type, guid, hash);
break;
case 'ts':
this.statsd.increment('cmd', 1, {cmd: 'trx'});
await this._onTransactionStart(guid, hash);
break;
case 'te':
this.statsd.increment('cmd', 1, {cmd: 'trx_end'});
await this._onTransactionEnd();
break;
case 'pa':
case 'pi':
case 'pr':
this.statsd.increment('cmd', 1, {cmd: 'put', type});
await this._onPut(type, size);
break;
default:
Expand Down Expand Up @@ -286,13 +296,18 @@ class CommandProcessor extends Duplex {

this._sendFileQueueCount++;
helpers.log(consts.LOG_DBG, `Adding file to send queue, size ${info.size}`);

this.statsd.increment('hit', 1, {type});
this.statsd.histogram('get-size', info.size, {type})
}
catch(err) {
const resp = Buffer.from(`-${type}`, 'ascii');
this[kSendFileQueue].push({
exists: false,
header: Buffer.concat([resp, guid, hash], 34)
});

this.statsd.increment('miss', 1, {type});
}
finally {
if(this[kSendFileQueue].length === 1) {
Expand All @@ -313,6 +328,7 @@ class CommandProcessor extends Duplex {
helpers.log(consts.LOG_DBG, "Cancel previous transaction");
this._trx = null;
}
this.transactionTimer = new Date();

this._trx = await this[kCache].createPutTransaction(guid, hash);
helpers.log(consts.LOG_DBG, `Start transaction for GUID: ${helpers.GUIDBufferToString(guid)} Hash: ${hash.toString('hex')}`);
Expand All @@ -328,6 +344,11 @@ class CommandProcessor extends Duplex {
throw new Error("Invalid transaction isolation");
}

if (this.transactionTimer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be moved after the 'await' on endPutTransaction below.

this.statsd.timing('trx-latency', this.transactionTimer);
this.transactionTimer = null;
}

await this[kCache].endPutTransaction(this._trx);
this.emit('onTransactionEnd', this._trx);
helpers.log(consts.LOG_DBG, `End transaction for GUID: ${helpers.GUIDBufferToString(this._trx.guid)} Hash: ${this._trx.hash.toString('hex')}`);
Expand Down
6 changes: 5 additions & 1 deletion lib/server/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const ClientStreamProcessor = require('./client_stream_processor');
const CommandProcessor = require('./command_processor');
const TransactionMirror = require('./transaction_mirror');
const ip = require('ip');
const StatsDClient = require('statsd-client');

class CacheServer {
/**
Expand All @@ -27,6 +28,9 @@ class CacheServer {
options.mirror = [].concat(options.mirror);
this._mirrors = options.mirror.map(m => new TransactionMirror(m, cache));
}

// TODO: Add some keys and what not...
this.statsd = options.statsd || new StatsDClient({});
}

/**
Expand Down Expand Up @@ -59,7 +63,7 @@ class CacheServer {
this._server = net.createServer(socket => {
helpers.log(consts.LOG_TEST, `${socket.remoteAddress}:${socket.remotePort} connected.`);

const cmdProc = new CommandProcessor(self.cache);
const cmdProc = new CommandProcessor(self.cache, self.statsd);

// TODO: Prune mirror list to exclude the incoming socket address, to prevent looping transactions
const mirrors = self._mirrors;
Expand Down
24 changes: 23 additions & 1 deletion main.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const prompt = require('prompt');
const dns = require('dns');
const ip = require('ip');
const VERSION = require('./package.json').version;
const StatsDClient = require('statsd-client');

function myParseInt(val, def) {
val = parseInt(val);
Expand All @@ -25,6 +26,16 @@ function collect(val, memo) {
return memo;
}


function parseKeyValues(val) {
let obj = {};
val.split(',').forEach(function (kv) {
let pair = kv.split(':');
obj[pair[0]] = pair[1];
});
return obj;
}

const defaultCacheModule = config.get("Cache.defaultModule");

program.description("Unity Cache Server")
Expand All @@ -34,6 +45,8 @@ program.description("Unity Cache Server")
.option('-P, --cache-path [path]', `Specify the path of the cache directory`)
.option('-l, --log-level <n>', `Specify the level of log verbosity. Valid values are 0 (silent) through 5 (debug). Default is ${consts.DEFAULT_LOG_LEVEL}`, myParseInt, consts.DEFAULT_LOG_LEVEL)
.option('-w, --workers <n>', `Number of worker threads to spawn. Default is ${consts.DEFAULT_WORKERS}`, zeroOrMore, consts.DEFAULT_WORKERS)
.option('--statsd-server [host]', 'Send statsd metrics to this host', '127.0.0.1')
.option('--statsd-tags [key:val,...]', 'Extra tags for statsd metrics', parseKeyValues)
.option('-m --mirror [host:port]', `Mirror transactions to another cache server. Can be repeated for multiple mirrors`, collect, [])
.option('-m, --monitor-parent-process <n>', 'Monitor a parent process and exit if it dies', myParseInt, 0);

Expand All @@ -42,6 +55,12 @@ program.parse(process.argv);
helpers.setLogLevel(program.logLevel);
helpers.setLogger(program.workers > 0 ? helpers.defaultClusterLogger : helpers.defaultLogger);

const statsd = new StatsDClient({
prefix: 'cache-server',
host: program.statsdHost,
tags: program.statsdTags
});

if (program.monitorParentProcess > 0) {
function monitor() {
function is_running(pid) {
Expand Down Expand Up @@ -78,7 +97,9 @@ if(program.workers > 0 && !CacheModule.properties.clustering) {

let server = null;

let cacheOpts = {};
let cacheOpts = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why pass the statsd object to the cache init() if it isn't used in the cache module?

statsd: statsd
};
if(program.cachePath !== null) {
cacheOpts.cachePath = program.cachePath;
}
Expand Down Expand Up @@ -158,6 +179,7 @@ function startPrompt() {
switch(result.command) {
case 'q':
helpers.log(consts.LOG_INFO, "Shutting down ...");
this.statsd.close();
Cache.shutdown().then(() => {
server.stop();
process.exit(0);
Expand Down
21 changes: 14 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"lodash": "^4.17.4",
"lokijs": "^1.5.1",
"prompt": "^1.0.0",
"statsd-client": "^0.4.0",
"uuid": "^3.1.0"
}
}