diff --git a/config/default.yml b/config/default.yml index d8d88ba..a8ede15 100644 --- a/config/default.yml +++ b/config/default.yml @@ -1,6 +1,8 @@ Cache: defaultModule: "cache_fs" options: + processor: + putWhitelist: [] cache_ram: cachePath: ".cache_ram" pageSize: 100000000 @@ -34,4 +36,7 @@ Cache: Mirror: options: queueProcessDelay: 2000 - connectionIdleTimeout: 10000 \ No newline at end of file + connectionIdleTimeout: 10000 +Server: + options: + allowIpv6: false \ No newline at end of file diff --git a/lib/server/command_processor.js b/lib/server/command_processor.js index 0aab0d0..c2b4cfe 100644 --- a/lib/server/command_processor.js +++ b/lib/server/command_processor.js @@ -1,4 +1,5 @@ const helpers = require('./../helpers'); +const config = require('config'); const filesize = require('filesize'); const consts = require('./../constants'); const Duplex = require('stream').Duplex; @@ -35,6 +36,10 @@ class CommandProcessor extends Duplex { */ this._trx = null; + this._options = config.get("Cache.options.processor"); + this._putWhitelist = this._options.putWhitelist; + this._whitelistEmpty = (!Array.isArray(this._putWhitelist) || !this._putWhitelist.length); + this._putStream = null; this._putSize = 0; this._putSent = 0; @@ -77,6 +82,13 @@ class CommandProcessor extends Duplex { Promise.resolve().then(() => this._read_internal()); } + /** + * @private + */ + _isWhitelisted(ip) { + return this._whitelistEmpty || this._putWhitelist.includes(ip); + } + /** * @private */ @@ -330,10 +342,16 @@ class CommandProcessor extends Duplex { throw new Error("Not in a transaction"); } - this._putStream = await this._trx.getWriteStream(type, size); - this._putStream.promiseWrite = promisify(this._putStream.write).bind(this._putStream); - this._putSize = size; - this._writeHandler = this._writeHandlers.putStream; + if (this._isWhitelisted(this._trx.clientAddress)) { + this._putStream = await this._trx.getWriteStream(type, size); + this._putStream.promiseWrite = promisify(this._putStream.write).bind(this._putStream); + this._putSize = size; + this._writeHandler = this._writeHandlers.putStream; + } + else { + this._writeHandler = this._writeHandlers.none; + helpers.log(consts.LOG_DBG, `PUT rejected from non-whitelisted IP: ${this._trx.clientAddress}`); + } } } diff --git a/lib/server/server.js b/lib/server/server.js index cfb0999..4d2628f 100644 --- a/lib/server/server.js +++ b/lib/server/server.js @@ -26,6 +26,8 @@ class CacheServer { options.mirror = [].concat(options.mirror); this._mirrors = options.mirror.map(m => new TransactionMirror(m, cache)); } + + this.allowIpv6 = options.allowIpv6; } /** @@ -87,7 +89,13 @@ class CacheServer { }); return new Promise(resolve => { - this._server.listen(this.port, () => resolve()); + if(this.allowIpv6) { + this._server.listen(this.port, () => resolve()); + } + else { + this._server.listen(this.port, "0.0.0.0", () => resolve()); + } + }); }; diff --git a/main.js b/main.js index fee5f26..e760e1a 100755 --- a/main.js +++ b/main.js @@ -28,6 +28,11 @@ function collect(val, memo) { const defaultCacheModule = config.get("Cache.defaultModule"); +const processorOptions = config.get("Cache.options.processor"); +if(Array.isArray(processorOptions.putWhitelist) && processorOptions.putWhitelist.length){ + helpers.log(consts.LOG_INFO, `PUT whitelist: ${processorOptions.putWhitelist}`); +}; + program.description("Unity Cache Server") .version(VERSION) .allowUnknownOption(true) @@ -114,7 +119,8 @@ Cache.init(cacheOpts) .then(mirrors => { const opts = { port: program.port, - mirror: mirrors + mirror: mirrors, + allowIpv6: config.has("Server.options.allowIpv6") ? config.get("Server.options.allowIpv6") : false }; server = new Server(Cache, opts); diff --git a/test/command_processor.js b/test/command_processor.js new file mode 100644 index 0000000..6912634 --- /dev/null +++ b/test/command_processor.js @@ -0,0 +1,75 @@ +const assert = require('assert'); +const sinon = require('sinon'); +const { CommandProcessor, CacheBase , PutTransaction } = require('../lib'); + +describe("CommandProcessor", () => { + describe("PUT Whitelist", () => { + beforeEach(() => { + cmdProc = new CommandProcessor(new CacheBase()); + }); + + it("should implement PUT when whitelisted", async () => { + cmdProc._whitelistEmpty = false; + cmdProc._putWhitelist = ["127.0.0.1"]; + + cmdProc._trx = new PutTransaction(); + cmdProc._trx.clientAddress = "127.0.0.1"; + spy = sinon.spy(cmdProc._trx, "getWriteStream"); + + p = cmdProc._onPut("a", 999) + p.catch(function () {}); + + assert(spy.called) + }); + + it("should implement PUT when whitelisted (multiple)", async () => { + cmdProc._whitelistEmpty = false; + cmdProc._putWhitelist = ["127.0.0.6", "127.0.0.3", "127.0.0.1"]; + + cmdProc._trx = new PutTransaction(); + cmdProc._trx.clientAddress = "127.0.0.1"; + spy = sinon.spy(cmdProc._trx, "getWriteStream"); + + p = cmdProc._onPut("a", 999) + p.catch(function () {}); + + assert(spy.called) + }); + + it("should implement PUT when whitelist empty", async () => { + cmdProc._whitelistEmpty = true; + cmdProc._putWhitelist = []; + + cmdProc._trx = new PutTransaction(); + cmdProc._trx.clientAddress = "127.0.0.1"; + spy = sinon.spy(cmdProc._trx, "getWriteStream"); + + p = cmdProc._onPut("a", 999) + p.catch(function () {}); + + assert(spy.called) + }); + + it("should not implement PUT when not whitelisted", async () => { + cmdProc._whitelistEmpty = false + cmdProc._putWhitelist = ["127.0.0.1"] + + cmdProc._trx = new PutTransaction(); + cmdProc._trx.clientAddress = "127.0.0.2"; + + await cmdProc._onPut("a", 999) + assert.strictEqual(cmdProc._writeHandler, cmdProc._writeHandlers.none); + }); + + it("should not implement PUT when not whitelisted (multiple)", async () => { + cmdProc._whitelistEmpty = false + cmdProc._putWhitelist = ["127.0.0.6", "127.0.0.3", "127.0.0.1"] + + cmdProc._trx = new PutTransaction(); + cmdProc._trx.clientAddress = "127.0.0.2"; + + await cmdProc._onPut("a", 999) + assert.strictEqual(cmdProc._writeHandler, cmdProc._writeHandlers.none); + }); + }); +}); diff --git a/test/server.js b/test/server.js index c768e54..f2e7672 100644 --- a/test/server.js +++ b/test/server.js @@ -1,5 +1,6 @@ const assert = require('assert'); const net = require('net'); +const os = require('os'); const helpers = require('../lib/helpers'); const consts = require('../lib/constants'); const CacheServer = require('../lib/server/server'); @@ -67,6 +68,57 @@ describe("Server common", function() { }); }); + describe("Ipv6", function() { + const ipv6Server = new CacheServer(cache, {port: 0, allowIpv6: true}); + + before(function () { + var interfaces = os.networkInterfaces(); + var ipv6Available = false; + Object.keys(interfaces).forEach(function (interfaceName){ + interfaces[interfaceName].forEach(function (address){ + if(address.family === "IPv6"){ + ipv6Available = true; + } + }); + }); + + if(!ipv6Available){ + console.log("Skipping IPv6 tests because IPv6 is not available on this machine"); + this.skip(); + } + + return ipv6Server.start(err => assert(!err, `Cache Server reported error! ${err}`)); + }); + + after(function() { + ipv6Server.stop(); + }); + + it("should bind to ipv6 when allowed", function(done) { + var serverAddress = ipv6Server._server.address(); + assert.strictEqual(serverAddress.family, "IPv6"); + done(); + }); + + }); + describe("Ipv4", function() { + const ipv4Server = new CacheServer(cache, {port: 0, allowIpv6: false}); + + before(function () { + return ipv4Server.start(err => assert(!err, `Cache Server reported error! ${err}`)); + }); + + after(function() { + ipv4Server.stop(); + }); + + it("should bind to ipv4 when ipv6 not allowed", function(done) { + var serverAddress = ipv4Server._server.address(); + assert.strictEqual(serverAddress.family, "IPv4"); + done(); + }); + }); + describe("Other", function() { it("should force close the socket when a quit (q) command is received", function(done) { client = net.connect({port: server.port}, function (err) { @@ -92,6 +144,7 @@ describe("Server common", function() { client.write(helpers.encodeInt32(consts.PROTOCOL_VERSION)); client.write('xx'); }); - }) + }); }) -}); \ No newline at end of file +}); +