Skip to content
This repository has been archived by the owner on Jan 9, 2023. It is now read-only.

Commit

Permalink
Merge 7e26b58 into 545e286
Browse files Browse the repository at this point in the history
  • Loading branch information
stephen-palmer committed Feb 5, 2019
2 parents 545e286 + 7e26b58 commit 271fe46
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 9 deletions.
4 changes: 3 additions & 1 deletion lib/server/command_processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,9 @@ class CommandProcessor extends Duplex {
throw new Error("Not in a transaction");
}

if (this._isWhitelisted(this._trx.clientAddress)) {
const [host] = this._trx.clientAddress.split(':');

if (this._isWhitelisted(host)) {
this._putStream = await this._trx.getWriteStream(type, size);
} else {
this._putStream = new Writable({
Expand Down
2 changes: 1 addition & 1 deletion lib/server/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class CacheServer {
helpers.log(consts.LOG_TEST, `${socket.remoteAddress}:${socket.remotePort} connected.`);

const cmdProc = new CommandProcessor(this.cache);
const streamProc = new ClientStreamProcessor({clientAddress: socket.remoteAddress});
const streamProc = new ClientStreamProcessor({clientAddress: `${socket.remoteAddress}:${socket.remotePort}`});

const mirrors = this._mirrors;
if(mirrors.length > 0) {
Expand Down
8 changes: 4 additions & 4 deletions test/command_processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ describe("CommandProcessor", () => {
this.cmdProc._putWhitelist = ["127.0.0.1"];

this.cmdProc._trx = new PutTransaction();
this.cmdProc._trx.clientAddress = "127.0.0.1";
this.cmdProc._trx.clientAddress = "127.0.0.1:1234";
const spy = sinon.spy(this.cmdProc._trx, "getWriteStream");

const p = this.cmdProc._onPut("a", 999);
Expand All @@ -29,7 +29,7 @@ describe("CommandProcessor", () => {
this.cmdProc._putWhitelist = ["127.0.0.6", "127.0.0.3", "127.0.0.1"];

this.cmdProc._trx = new PutTransaction();
this.cmdProc._trx.clientAddress = "127.0.0.1";
this.cmdProc._trx.clientAddress = "127.0.0.1:1234";
const spy = sinon.spy(this.cmdProc._trx, "getWriteStream");

const p = this.cmdProc._onPut("a", 999);
Expand All @@ -43,7 +43,7 @@ describe("CommandProcessor", () => {
this.cmdProc._putWhitelist = [];

this.cmdProc._trx = new PutTransaction();
this.cmdProc._trx.clientAddress = "127.0.0.1";
this.cmdProc._trx.clientAddress = "127.0.0.1:1234";
const spy = sinon.spy(this.cmdProc._trx, "getWriteStream");

const p = this.cmdProc._onPut("a", 999);
Expand All @@ -57,7 +57,7 @@ describe("CommandProcessor", () => {
this.cmdProc._putWhitelist = ["127.0.0.1"];

this.cmdProc._trx = new PutTransaction();
this.cmdProc._trx.clientAddress = "127.0.0.2";
this.cmdProc._trx.clientAddress = "127.0.0.2:1234";

await this.cmdProc._onPut("a", 6);
assert.strictEqual(this.cmdProc._writeHandler, this.cmdProc._writeHandlers.putStream);
Expand Down
23 changes: 20 additions & 3 deletions test/reliability_manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -157,17 +157,34 @@ describe("ReliabilityManager", () => {
const hash = randomBuffer(consts.HASH_SIZE);

const trx = new StablePutTransaction(guid, hash);
trx.clientAddress = "A";
trx.clientAddress = "A:1234";
await myRm.processTransaction(trx);

const entry = rm.getEntry(helpers.GUIDBufferToString(trx.guid), trx.hash.toString('hex'));
assert.equal(entry.state, ReliabilityManager.reliabilityStates.Pending);

trx.clientAddress = "A";
trx.clientAddress = "A:1234";
await myRm.processTransaction(trx);
assert.equal(entry.state, ReliabilityManager.reliabilityStates.Pending);

trx.clientAddress = "B";
trx.clientAddress = "B:1234";
await myRm.processTransaction(trx);
assert.equal(entry.state, ReliabilityManager.reliabilityStates.ReliableNew);
});

it("should increment the reliability factor for the same IP on different ports", async () => {
const myRm = new ReliabilityManager(db, tmp.tmpNameSync(), { reliabilityThreshold: 2, multiClient: true });
const guid = randomBuffer(consts.GUID_SIZE);
const hash = randomBuffer(consts.HASH_SIZE);

const trx = new StablePutTransaction(guid, hash);
trx.clientAddress = "A:1234";
await myRm.processTransaction(trx);

const entry = rm.getEntry(helpers.GUIDBufferToString(trx.guid), trx.hash.toString('hex'));
assert.equal(entry.state, ReliabilityManager.reliabilityStates.Pending);

trx.clientAddress = "A:4321";
await myRm.processTransaction(trx);
assert.equal(entry.state, ReliabilityManager.reliabilityStates.ReliableNew);
});
Expand Down

0 comments on commit 271fe46

Please sign in to comment.