Skip to content
This repository was archived by the owner on Jan 9, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions lib/server/command_processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@ const helpers = require('./../helpers');
const config = require('config');
const filesize = require('filesize');
const consts = require('./../constants');
const Duplex = require('stream').Duplex;
const { Duplex, Writable } = require('stream');
const { promisify } = require('util');

const kSource = Symbol("source");
const kCache = Symbol("cache");
const kSendFileQueue = Symbol("sendFileQueue");

class NullStream extends Writable {
_write(chunk, encoding, cb) {
setImmediate(cb);
}
}

class CommandProcessor extends Duplex {

/**
Expand All @@ -24,7 +30,6 @@ class CommandProcessor extends Duplex {
putStream: this._handleWrite.bind(this),
command: this._handleCommand.bind(this),
version: this._handleVersion.bind(this),
none: () => Promise.resolve()
};

this._writeHandler = this._writeHandlers.version;
Expand All @@ -40,6 +45,7 @@ class CommandProcessor extends Duplex {
this._putWhitelist = this._options.putWhitelist;
this._whitelistEmpty = (!Array.isArray(this._putWhitelist) || !this._putWhitelist.length);

this._nullStream = new NullStream();
this._putStream = null;
this._putSize = 0;
this._putSent = 0;
Expand Down Expand Up @@ -344,14 +350,14 @@ class CommandProcessor extends Duplex {

if (this._isWhitelisted(this._trx.clientAddress)) {
this._putStream = await this._trx.getWriteStream(type, size);
this._putStream.promiseWrite = promisify(this._putStream.write).bind(this._putStream);
this._putSize = size;
this._writeHandler = this._writeHandlers.putStream;
}
else {
this._writeHandler = this._writeHandlers.none;
} else {
this._putStream = this._nullStream;
helpers.log(consts.LOG_DBG, `PUT rejected from non-whitelisted IP: ${this._trx.clientAddress}`);
}

this._putStream.promiseWrite = promisify(this._putStream.write).bind(this._putStream);
this._putSize = size;
this._writeHandler = this._writeHandlers.putStream;
}
}

Expand Down
25 changes: 8 additions & 17 deletions test/command_processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ describe("CommandProcessor", () => {
const p = this.cmdProc._onPut("a", 999);
p.catch(function () {});

assert(spy.called)
assert(spy.called);
});

it("should implement PUT when whitelisted (multiple)", async () => {
Expand All @@ -33,7 +33,7 @@ describe("CommandProcessor", () => {
const p = this.cmdProc._onPut("a", 999);
p.catch(function () {});

assert(spy.called)
assert(spy.called);
});

it("should implement PUT when whitelist empty", async () => {
Expand All @@ -47,29 +47,20 @@ describe("CommandProcessor", () => {
const p = this.cmdProc._onPut("a", 999);
p.catch(function () {});

assert(spy.called)
assert(spy.called);
});

it("should not implement PUT when not whitelisted", async () => {
it("should allow commands after writing when being whitelisted", async () => {
this.cmdProc._whitelistEmpty = false;
this.cmdProc._putWhitelist = ["127.0.0.1"];

this.cmdProc._trx = new PutTransaction();
this.cmdProc._trx.clientAddress = "127.0.0.2";

await this.cmdProc._onPut("a", 999);
assert.strictEqual(this.cmdProc._writeHandler, this.cmdProc._writeHandlers.none);
});

it("should not implement PUT when not whitelisted (multiple)", async () => {
this.cmdProc._whitelistEmpty = false;
this.cmdProc._putWhitelist = ["127.0.0.6", "127.0.0.3", "127.0.0.1"];

this.cmdProc._trx = new PutTransaction();
this.cmdProc._trx.clientAddress = "127.0.0.2";

await this.cmdProc._onPut("a", 999);
assert.strictEqual(this.cmdProc._writeHandler, this.cmdProc._writeHandlers.none);
await this.cmdProc._onPut("a", 6);
assert.strictEqual(this.cmdProc._writeHandler, this.cmdProc._writeHandlers.putStream);
this.cmdProc._writeHandler('abcdef');
assert(this.cmdProc._writeHandlers.command);
});
});
});