Skip to content
This repository has been archived by the owner on Jan 9, 2023. It is now read-only.

Commit

Permalink
Merge 2616dfd into 521e298
Browse files Browse the repository at this point in the history
  • Loading branch information
stephen-palmer committed May 22, 2019
2 parents 521e298 + 2616dfd commit 3d2648f
Show file tree
Hide file tree
Showing 16 changed files with 605 additions and 164 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ local-production.yml
.cache*/
Dockerfile*
.dockerignore
diagnostics/
.idea
20 changes: 18 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ This open-source repository is maintained separately from the Cache Server avail
* [Function](#function)
* [Configuration](#configuration)
* [Unity project Library Importer](#unity-project-library-importer)
* [Diagnostics](#diagnostics)
* [Contributors](#contributors)
* [License](#license)

Expand Down Expand Up @@ -52,7 +53,7 @@ npm install github:Unity-Technologies/unity-cache-server -g
unity-cache-server [arguments]
```

Command | Description
Option | Description
-------------------------------- | -----------
`-V`, `--version` | Show the version number of the Cache Server.
`-h`, `--host <address>` | The interface on which the Cache Server listens. The default is to listen on all interfaces.
Expand All @@ -61,6 +62,7 @@ Command | Description
`-P`, `--cache-path [path]` | The path of the cache directory.
`-l`, `--log-level <n>` | The level of log verbosity. Valid values are 0 (silent) through 4 (debug). The default is 3 (info).
`-w`, `--workers <n>` | The number of worker threads to spawn. The default is 0.
`--diag-client-recorder` | Record incoming client network stream to disk.
`-m`, `--mirror <host:port>` | Mirror transactions to another cache server. Repeat this option for multiple mirrors.
`-W`, `--putwhitelist <host:port>` | Only allow PUT transactions (uploads) from the specified client address. Repeat this option for multiple addresses.
`--dump-config` | Write the active configuration to the console.
Expand Down Expand Up @@ -169,7 +171,7 @@ Due to performance considerations, the `cache_fs` module shipped with Cache Serv
or
`node cleanup.js [options]`

Command | Description
Option | Description
-------------------------------- | -----------
-V, --version | Show the version number of cleanup script.
-c --cache-module [path] | The path to the cache module.
Expand Down Expand Up @@ -245,6 +247,20 @@ Tools are provided to quickly seed a Cache Server from a fully imported Unity pr
* The import process connects and uploads to the target host like any other Unity client, so it should be safe in a production environment.
* Files are skipped if any changes were detected between when the JSON data was exported and when the importer tool is executed.

## Diagnostics

### Client Recorder (--diag-client-recorder)

Starting up the Cache Server with the `--diag-client-recorder` option will write to disk raw data from all incoming client connections (by default to the `diagnostics/client-recordings` folder). Example tools and libraries for analysing recorded sessions can be found in the [ucs-diag-tools](https://github.com/Unity-Technologies/ucs-diag-utils) repository.

### Configuration

Option | Default | Description
------------------------------------------ | ------------ | -----------
Diagnostics.clientRecorder | false | Enable client network stream recording.
Diagnostics.clientRecorderOptions.bufferSize | 10000000 | Size of in-memory buffer for client network stream recording.
Diagnostics.clientRecorderOptions.saveDir | "diagnostics/client-recordings" | Directory where client network stream recordings will be saved. A relative directory will be relative to the server application startup directory.

## Contributors
Contributions are welcome! Before submitting pull requests please note the Submission of Contributions section of the Apache 2.0 license.

Expand Down
7 changes: 6 additions & 1 deletion config/default.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,9 @@ Server:
options:
allowIpv6: false
Global:
logLevel: 3
logLevel: 3
Diagnostics:
clientRecorder: false
clientRecorderOptions:
bufferSize: 10000000
saveDir: "diagnostics/client-recordings"
22 changes: 13 additions & 9 deletions lib/client/server_stream_processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class ServerStreamProcessor extends Transform {
}

if(this._errState !== null) {
helpers.log(consts.LOG_ERR, this._errState.msg);
helpers.log(consts.LOG_ERR, this._errState);
break;
}
}

Expand Down Expand Up @@ -97,25 +98,28 @@ class ServerStreamProcessor extends Transform {

// Read command
if (!this.readState.didReadCmd) {
this.readState.didReadCmd = true;

const cmd = this._headerBuf.slice(0, consts.CMD_SIZE).toString('ascii');

this.readState.headerData.cmd = cmd;
this.readState.doReadId = true;
this.readState.headerSize += consts.ID_SIZE;

switch (cmd[0]) {
case '+': // file found
this.readState.doReadSize = true;
this.readState.doReadId = true;
this.readState.headerSize += consts.SIZE_SIZE + consts.ID_SIZE;
this.readState.headerSize += consts.SIZE_SIZE;
break;
case '-': // file not found
this.readState.doReadId = true;
this.readState.headerSize += consts.ID_SIZE;
break;
default:
this._errState = new Error("Unrecognized command response, aborting!");
this._errState = new Error(`Unrecognized command response, aborting! (${cmd})`);
}
}

if(!fillBufferWithData())
break;
if(this._errState || !fillBufferWithData())
break;
}

let pos = consts.CMD_SIZE;

Expand Down
3 changes: 2 additions & 1 deletion lib/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ const constants = {
MIRROR: "Mirror.addresses",
ALLOW_IP_V6: "Server.options.allowIpv6",
COMMAND_PROCESSOR: "Cache.options.processor",
PUTWHITELIST: "Cache.options.processor.putWhitelist"
PUTWHITELIST: "Cache.options.processor.putWhitelist",
CLIENT_RECORDER: "Diagnostics.clientRecorder"
}
};

Expand Down
2 changes: 2 additions & 0 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ const path = require('path');
const helpers = require('./helpers');
helpers.initConfigDir(path.dirname(__dirname));

exports.Constants = require('./constants');
exports.Helpers = require('./helpers');
exports.Server = require('./server/server');
exports.ClientStreamProcessor = require('./server/client_stream_processor');
exports.CommandProcessor = require('./server/command_processor');
Expand Down
2 changes: 2 additions & 0 deletions lib/server/client_stream_processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ class ClientStreamProcessor extends Transform {
break;
}

this.emit('cmd', cmd);

if (!fillBufferWithData()) {
break;
}
Expand Down
91 changes: 91 additions & 0 deletions lib/server/client_stream_recorder.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
const { Writable } = require('stream');
const fs = require('fs-extra');
const path = require('path');
const uuid = require('uuid');
const consts = require('../constants');
const helpers = require('../helpers');

const kRecordBufferSize = 1024 * 1024 * 10;

class ClientStreamRecorder extends Writable {
constructor(options) {
super(options);
this._sessionId = options.sessionId || uuid.v4();
this._saveDir = options.saveDir || '.';
this._bufferSize = options.bufferSize || kRecordBufferSize;
this._bufferPos = 0;
this._isRecording = false;
this._buffer = Buffer.allocUnsafe(this._bufferSize);

this.on('unpipe', () => {
this._finish_recording()
.then(() => this._flush_buffer())
.then(() => this.emit('flushed'));
});
}

/**
*
* @returns {string}
*/
get dataPath() {
return path.join(this._saveDir, this._sessionId);
}

_write(chunk, encoding, callback) {
this._record_chunk(chunk).then(() => {
callback();
});
}

async _record_chunk(chunk) {
this._isRecording = true;

let slice = chunk;
if(this._bufferPos + slice.length > this._bufferSize) {
slice = chunk.slice(0, this._bufferSize - this._bufferPos);
}

slice.copy(this._buffer, this._bufferPos, 0, slice.length);
this._bufferPos += slice.length;
if(this._bufferPos === this._bufferSize) {
await this._flush_buffer();
}

if(slice.length < chunk.length) {
await this._record_chunk(chunk.slice(slice.length));
}

this._isRecording = false;
this.emit('_record_chunk');
}

async _flush_buffer() {
await fs.ensureDir(this._saveDir);

// Normalize the version size so it will be correctly parsed when streamed to a server.
let zeroPad = 0;
if(!await fs.pathExists(this.dataPath)) {
for (let i = consts.PROTOCOL_VERSION_MIN_SIZE; i <= consts.VERSION_SIZE; i++) {
zeroPad = consts.VERSION_SIZE - i;
if (helpers.readUInt32(this._buffer.slice(0, i)) === consts.PROTOCOL_VERSION) break;
}
}

const fd = await fs.open(this.dataPath, 'a');
if(zeroPad > 0) await fs.write(fd, Buffer.alloc(zeroPad, '0', 'ascii'), 0);
await fs.write(fd, this._buffer, 0, this._bufferPos);
await fs.close(fd);
this._bufferPos = 0;
}

async _finish_recording() {
if(!this._isRecording) return;

return new Promise(resolve => {
this.once('_record_chunk', () => resolve());
});
}
}

module.exports = ClientStreamRecorder;
2 changes: 1 addition & 1 deletion lib/server/command_processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ class CommandProcessor extends Duplex {
}

_registerEventListeners() {
this.once('finish', this._printReadStats);
this.on('pipe', src => {
this[kSource] = src;
this._clientAddress = src.clientAddress;
});

this.on('unpipe', () => {
this._printReadStats();
this[kSource] = null;
this[kSendFileQueue] = null;
this._writeHandler = this._writeHandlers.none;
Expand Down
32 changes: 22 additions & 10 deletions lib/server/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class CacheServer {
*/
constructor(cache, options) {
this._cache = cache;
this._errCallback = null;

this._port = options.port;
if (!options.port && options.port !== 0)
Expand All @@ -29,8 +30,16 @@ class CacheServer {
this._mirrors = options.mirror.map(m => new TransactionMirror(m, cache));
}

this.allowIpv6 = options.allowIpv6;
this._errCallback = null;
this._clientRecorder = options.clientRecorder || null;
this._allowIpv6 = options.allowIpv6;
}

/**
*
* @returns {boolean}
*/
get isRecordingClient() {
return this._clientRecorder !== null;
}

/**
Expand Down Expand Up @@ -106,16 +115,19 @@ class CacheServer {

socket.on('close', () => {
helpers.log(consts.LOG_INFO, `${socket.remoteAddress}:${socket.remotePort} closed connection.`);
socket.unpipe(streamProc);
streamProc.unpipe(cmdProc);
cmdProc.unpipe(socket);
socket.unpipe();
streamProc.unpipe();
cmdProc.unpipe();
}).on('error', err => {
helpers.log(consts.LOG_ERR, err);
helpers.log(consts.LOG_ERR, err.message);
});

socket.pipe(streamProc) // Transform the incoming byte stream into commands and file data
.pipe(cmdProc) // Execute commands and interface with the cache module
.pipe(socket); // Connect back to socket to send files
if(this._clientRecorder)
socket.pipe(this._clientRecorder); // Record the incoming byte stream to disk

socket.pipe(streamProc) // Transform the incoming byte stream into commands and file data
.pipe(cmdProc) // Execute commands and interface with the cache module
.pipe(socket); // Connect back to socket to send files

socket['commandProcessor'] = cmdProc;
}).on('error', err => {
Expand All @@ -127,7 +139,7 @@ class CacheServer {
});

return new Promise( (resolve, reject) => {
if(this.allowIpv6 && this.host === consts.DEFAULT_HOST) {
if(this._allowIpv6 && this.host === consts.DEFAULT_HOST) {
// bind to all interfaces with IPV4 and IPV6 only when the default host value is specified AND
// the allowIPv6 flag is true. Omitting the host parameter in listen() enables IPV6.
this._server.listen(this.port, (err) => err ? reject(err) : resolve());
Expand Down
11 changes: 11 additions & 0 deletions lib/unity_cache_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const helpers = require('./helpers');
const yaml = require('js-yaml');
const path = require('path');
const fs = require('fs-extra');
const StreamRecorder = require('./server/client_stream_recorder');

class UnityCacheServer {
static getConfigVal(key, defVal) {
Expand Down Expand Up @@ -161,6 +162,16 @@ class UnityCacheServer {
allowIpv6: this.getConfigVal(consts.CLI_CONFIG_KEYS.ALLOW_IP_V6, false)
};

if(this.getConfigVal(consts.CLI_CONFIG_KEYS.CLIENT_RECORDER, false)) {
const crOpts = this.getConfigVal(`Diagnostics.clientRecorderOptions`, {});
serverOpts.clientRecorder = new StreamRecorder(crOpts);
serverOpts.clientRecorder.on('error', err => {
helpers.log(consts.LOG_ERR, err.message);
});

helpers.log(consts.LOG_WARN, "Warning: client recording enabled - performance may be impacted!");
}

// create Server
const server = new Server(cache, serverOpts);

Expand Down
5 changes: 5 additions & 0 deletions main.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ const optionMap = {
description: "Only allow PUT transactions (uploads) from the specified client address. Repeat this option for multiple addresses.",
validator: collect,
configKey: consts.CLI_CONFIG_KEYS.PUTWHITELIST
},
diagClientRecorder: {
flags: "--diag-client-recorder",
description: "Record incoming client network stream to disk.",
configKey: consts.CLI_CONFIG_KEYS.CLIENT_RECORDER
}
};

Expand Down
Loading

0 comments on commit 3d2648f

Please sign in to comment.