Skip to content
This repository was archived by the owner on Jan 9, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion config/default.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
Cache:
defaultModule: "cache_fs"
options:
processor:
putWhitelist: []
cache_ram:
cachePath: ".cache_ram"
pageSize: 100000000
Expand Down Expand Up @@ -34,4 +36,7 @@ Cache:
Mirror:
options:
queueProcessDelay: 2000
connectionIdleTimeout: 10000
connectionIdleTimeout: 10000
Server:
options:
allowIpv6: false
26 changes: 22 additions & 4 deletions lib/server/command_processor.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const helpers = require('./../helpers');
const config = require('config');
const filesize = require('filesize');
const consts = require('./../constants');
const Duplex = require('stream').Duplex;
Expand Down Expand Up @@ -35,6 +36,10 @@ class CommandProcessor extends Duplex {
*/
this._trx = null;

this._options = config.get("Cache.options.processor");
this._putWhitelist = this._options.putWhitelist;
this._whitelistEmpty = (!Array.isArray(this._putWhitelist) || !this._putWhitelist.length);

this._putStream = null;
this._putSize = 0;
this._putSent = 0;
Expand Down Expand Up @@ -77,6 +82,13 @@ class CommandProcessor extends Duplex {
Promise.resolve().then(() => this._read_internal());
}

/**
* @private
*/
_isWhitelisted(ip) {
return this._whitelistEmpty || this._putWhitelist.includes(ip);
}

/**
* @private
*/
Expand Down Expand Up @@ -330,10 +342,16 @@ class CommandProcessor extends Duplex {
throw new Error("Not in a transaction");
}

this._putStream = await this._trx.getWriteStream(type, size);
this._putStream.promiseWrite = promisify(this._putStream.write).bind(this._putStream);
this._putSize = size;
this._writeHandler = this._writeHandlers.putStream;
if (this._isWhitelisted(this._trx.clientAddress)) {
this._putStream = await this._trx.getWriteStream(type, size);
this._putStream.promiseWrite = promisify(this._putStream.write).bind(this._putStream);
this._putSize = size;
this._writeHandler = this._writeHandlers.putStream;
}
else {
this._writeHandler = this._writeHandlers.none;
helpers.log(consts.LOG_DBG, `PUT rejected from non-whitelisted IP: ${this._trx.clientAddress}`);
}
}
}

Expand Down
10 changes: 9 additions & 1 deletion lib/server/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class CacheServer {
options.mirror = [].concat(options.mirror);
this._mirrors = options.mirror.map(m => new TransactionMirror(m, cache));
}

this.allowIpv6 = options.allowIpv6;
}

/**
Expand Down Expand Up @@ -87,7 +89,13 @@ class CacheServer {
});

return new Promise(resolve => {
this._server.listen(this.port, () => resolve());
if(this.allowIpv6) {
this._server.listen(this.port, () => resolve());
}
else {
this._server.listen(this.port, "0.0.0.0", () => resolve());
}

});
};

Expand Down
8 changes: 7 additions & 1 deletion main.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ function collect(val, memo) {

const defaultCacheModule = config.get("Cache.defaultModule");

const processorOptions = config.get("Cache.options.processor");
if(Array.isArray(processorOptions.putWhitelist) && processorOptions.putWhitelist.length){
helpers.log(consts.LOG_INFO, `PUT whitelist: ${processorOptions.putWhitelist}`);
};

program.description("Unity Cache Server")
.version(VERSION)
.allowUnknownOption(true)
Expand Down Expand Up @@ -114,7 +119,8 @@ Cache.init(cacheOpts)
.then(mirrors => {
const opts = {
port: program.port,
mirror: mirrors
mirror: mirrors,
allowIpv6: config.has("Server.options.allowIpv6") ? config.get("Server.options.allowIpv6") : false
};

server = new Server(Cache, opts);
Expand Down
75 changes: 75 additions & 0 deletions test/command_processor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
const assert = require('assert');
const sinon = require('sinon');
const { CommandProcessor, CacheBase , PutTransaction } = require('../lib');

describe("CommandProcessor", () => {
describe("PUT Whitelist", () => {
beforeEach(() => {
cmdProc = new CommandProcessor(new CacheBase());
});

it("should implement PUT when whitelisted", async () => {
cmdProc._whitelistEmpty = false;
cmdProc._putWhitelist = ["127.0.0.1"];

cmdProc._trx = new PutTransaction();
cmdProc._trx.clientAddress = "127.0.0.1";
spy = sinon.spy(cmdProc._trx, "getWriteStream");

p = cmdProc._onPut("a", 999)
p.catch(function () {});

assert(spy.called)
});

it("should implement PUT when whitelisted (multiple)", async () => {
cmdProc._whitelistEmpty = false;
cmdProc._putWhitelist = ["127.0.0.6", "127.0.0.3", "127.0.0.1"];

cmdProc._trx = new PutTransaction();
cmdProc._trx.clientAddress = "127.0.0.1";
spy = sinon.spy(cmdProc._trx, "getWriteStream");

p = cmdProc._onPut("a", 999)
p.catch(function () {});

assert(spy.called)
});

it("should implement PUT when whitelist empty", async () => {
cmdProc._whitelistEmpty = true;
cmdProc._putWhitelist = [];

cmdProc._trx = new PutTransaction();
cmdProc._trx.clientAddress = "127.0.0.1";
spy = sinon.spy(cmdProc._trx, "getWriteStream");

p = cmdProc._onPut("a", 999)
p.catch(function () {});

assert(spy.called)
});

it("should not implement PUT when not whitelisted", async () => {
cmdProc._whitelistEmpty = false
cmdProc._putWhitelist = ["127.0.0.1"]

cmdProc._trx = new PutTransaction();
cmdProc._trx.clientAddress = "127.0.0.2";

await cmdProc._onPut("a", 999)
assert.strictEqual(cmdProc._writeHandler, cmdProc._writeHandlers.none);
});

it("should not implement PUT when not whitelisted (multiple)", async () => {
cmdProc._whitelistEmpty = false
cmdProc._putWhitelist = ["127.0.0.6", "127.0.0.3", "127.0.0.1"]

cmdProc._trx = new PutTransaction();
cmdProc._trx.clientAddress = "127.0.0.2";

await cmdProc._onPut("a", 999)
assert.strictEqual(cmdProc._writeHandler, cmdProc._writeHandlers.none);
});
});
});
57 changes: 55 additions & 2 deletions test/server.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const assert = require('assert');
const net = require('net');
const os = require('os');
const helpers = require('../lib/helpers');
const consts = require('../lib/constants');
const CacheServer = require('../lib/server/server');
Expand Down Expand Up @@ -67,6 +68,57 @@ describe("Server common", function() {
});
});

describe("Ipv6", function() {
const ipv6Server = new CacheServer(cache, {port: 0, allowIpv6: true});

before(function () {
var interfaces = os.networkInterfaces();
var ipv6Available = false;
Object.keys(interfaces).forEach(function (interfaceName){
interfaces[interfaceName].forEach(function (address){
if(address.family === "IPv6"){
ipv6Available = true;
}
});
});

if(!ipv6Available){
console.log("Skipping IPv6 tests because IPv6 is not available on this machine");
this.skip();
}

return ipv6Server.start(err => assert(!err, `Cache Server reported error! ${err}`));
});

after(function() {
ipv6Server.stop();
});

it("should bind to ipv6 when allowed", function(done) {
var serverAddress = ipv6Server._server.address();
assert.strictEqual(serverAddress.family, "IPv6");
done();
});

});
describe("Ipv4", function() {
const ipv4Server = new CacheServer(cache, {port: 0, allowIpv6: false});

before(function () {
return ipv4Server.start(err => assert(!err, `Cache Server reported error! ${err}`));
});

after(function() {
ipv4Server.stop();
});

it("should bind to ipv4 when ipv6 not allowed", function(done) {
var serverAddress = ipv4Server._server.address();
assert.strictEqual(serverAddress.family, "IPv4");
done();
});
});

describe("Other", function() {
it("should force close the socket when a quit (q) command is received", function(done) {
client = net.connect({port: server.port}, function (err) {
Expand All @@ -92,6 +144,7 @@ describe("Server common", function() {
client.write(helpers.encodeInt32(consts.PROTOCOL_VERSION));
client.write('xx');
});
})
});
})
});
});