Skip to content
This repository was archived by the owner on Jan 9, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ const fs = require('fs');
const consts = require('./constants').Constants;
const helpers = require('./helpers');

const StatsDClient = require('statsd-client');

const CMD_QUIT = 'q'.charCodeAt(0);

const CMD_GET = 'g'.charCodeAt(0);
Expand All @@ -27,12 +29,18 @@ const OPT_FIX = 'f'.charCodeAt(0);


class CacheServer {
constructor(cache, port) {
constructor(cache, opts) {
this._cache = cache;
this._port = parseInt(port);
if (!port && port !== 0)
this._port = parseInt(opts.port);
if (!opts.port && opts.port !== 0)
this._port = consts.DEFAULT_PORT;
this._sever = null;

this.statsd = new StatsDClient({
prefix: 'cache-server',
tags: opts.statsdTags,
host: opts.statsdServer ? opts.statsdServer : '127.0.0.1'
});
}

get port() {
Expand All @@ -59,6 +67,8 @@ class CacheServer {

_HandleData(socket, data) {
var self = this;
var start = new Date();
var trxStart = 0;

// There is pending data, add it to the data buffer
if (socket.pendingData != null) {
Expand All @@ -81,6 +91,7 @@ class CacheServer {
}

socket.protocolVersion = helpers.readUInt32(data);
self.statsd.increment('protocol-version', 1, { protocol_version: socket.protocolVersion });
let buf = Buffer.allocUnsafe(consts.UINT32_SIZE);
if (socket.protocolVersion == consts.PROTOCOL_VERSION) {
helpers.log(consts.LOG_INFO, "Client protocol version " + socket.protocolVersion);
Expand Down Expand Up @@ -147,12 +158,15 @@ class CacheServer {
}

if (data[idx] == CMD_QUIT) {
self.statsd.increment('cmd', 1, {cmd_Quit: 'quit'});
self.statsd.timing('request-latency', start)
socket.end();
socket.forceQuit = true;
return false;
}

if (data[idx] == CMD_GET) {
self.statsd.increment('cmd', 1, {cmd: 'get'});
if (data.length < consts.CMD_SIZE + consts.ID_SIZE) {
socket.pendingData = data;
return true;
Expand Down Expand Up @@ -209,6 +223,8 @@ class CacheServer {

// handle a transaction
else if (data[idx] == CMD_TRX) {
self.statsd.increment('cmd', 1, {cmd: 'trx'});
trxStart = new Date();
if (data.length < consts.CMD_SIZE) {
socket.pendingData = data;
return true;
Expand Down Expand Up @@ -242,6 +258,9 @@ class CacheServer {
continue;
}
else if (data[idx] == TRX_END) {
self.statsd.increment('cmd', 1, {cmd: 'trx_end'});
if (trxStart) { self.statsd.timing('trx-latency', trxStart) }

if (!socket.inTransaction) {
helpers.log(consts.LOG_ERR, "Invalid transaction isolation");
socket.destroy();
Expand Down Expand Up @@ -305,9 +324,11 @@ class CacheServer {
idx += 1;

var reqType = data[idx];
self.statsd.increment('cmd', 1, {cmd: 'put', type: reqType});

idx += 1;
var size = helpers.readUInt64(data.slice(idx));
self.statsd.histogram('put-size', size, { type: reqType });

if (reqType == TYPE_ASSET) {
helpers.log(consts.LOG_TEST, "Put Asset Binary " + socket.currentGuid + "-" + socket.currentHash + " (size " + size + ")");
Expand Down Expand Up @@ -345,6 +366,7 @@ class CacheServer {

// handle check integrity
else if (data[idx] == CMD_INTEGRITY) {
self.statsd.increment('cmd', 1, {cmd: 'integrity'});
if (data.length < consts.CMD_SIZE + 1) {
socket.pendingData = data;
return true;
Expand Down Expand Up @@ -411,6 +433,7 @@ class CacheServer {
var buf = Buffer.allocUnsafe(consts.CMD_SIZE + consts.ID_SIZE);
buf[0] = CMD_GETNOK;
buf[1] = type;
self.statsd.increment('miss', 1, {type});
resbuf.copy(buf, consts.CMD_SIZE, consts.CMD_SIZE + consts.UINT64_SIZE, consts.CMD_SIZE + consts.UINT64_SIZE + consts.ID_SIZE);
try {
socket.write(buf);
Expand Down Expand Up @@ -453,6 +476,8 @@ class CacheServer {
else {
resbuf[0] = CMD_GETOK;
resbuf[1] = type;
self.statsd.increment('hit', 1, {type});
self.statsd.histogram('get-size', stats.size, { type });

helpers.log(consts.LOG_TEST, "Found: " + next.cacheStream + " size:" + stats.size);
resbuf.slice(consts.CMD_SIZE).write(helpers.encodeInt64(stats.size));
Expand Down
17 changes: 16 additions & 1 deletion main.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ function atLeastOne(val) {
return Math.max(1, val);
}

function parseKeyValues(val) {
let obj = {};
val.split(',').forEach(function (kv) {
let pair = kv.split(':');
obj[pair[0]] = pair[1];
});
return obj;
}

program.description("Unity Cache Server")
.version(consts.VERSION)
.option('-s, --size <n>', 'Specify the maximum allowed size of the LRU cache. Files that have not been used recently will automatically be discarded when the cache size is exceeded. Default is 50Gb', myParseInt, consts.DEFAULT_CACHE_SIZE)
Expand All @@ -25,6 +34,8 @@ program.description("Unity Cache Server")
.option('-w, --workers <n>', 'Number of worker threads to spawn. Default is 1 for every 2 CPUs reported by the OS', atLeastOne, consts.DEFAULT_WORKERS)
.option('-v, --verify', 'Verify the Cache Server integrity, without fixing errors')
.option('-f, --fix', 'Fix errors found while verifying the Cache Server integrity')
.option('--statsd-server [host]', 'Send statsd metrics to this host')
.option('--statsd-tags [key:val,...]', 'Extra tags for statsd metrics', parseKeyValues)
.option('-m, --monitor-parent-process <n>', 'Monitor a parent process and exit if it dies', myParseInt, 0)
.parse(process.argv);

Expand Down Expand Up @@ -76,7 +87,11 @@ var errHandler = function () {
process.exit(1);
};

var server = new CacheServer(cache, program.port);
var server = new CacheServer(cache, {
port: program.port,
statsdTags: program.statsdTags,
statsdServer: program.statsdServer
});

if(cluster.isMaster) {
helpers.log(consts.LOG_INFO, "Cache Server version " + consts.VERSION);
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"mocha-lcov-reporter": "^1.3.0"
},
"dependencies": {
"commander": "^2.11.0"
"commander": "^2.11.0",
"statsd-client": "^0.4.0"
}
}
2 changes: 1 addition & 1 deletion test/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const MAX_BLOB_SIZE = 2048;

helpers.SetLogger(()=>{});
var cache = new CacheFS(helpers.generateTempDir(), CACHE_SIZE);
var server = new CacheServer(cache, 0);
var server = new CacheServer(cache, {port: 0});
var client;

var cmd = {
Expand Down