Skip to content
This repository was archived by the owner on Jan 9, 2023. It is now read-only.

Commit 98cad20

Browse files
- Add test coverage for transaction mirroring. In the process, I simplified the implementation to remove a check that was intended to prevent looping transactions but in practice would only be effective in limited cases, and would actually prevent mirrors running on different ports behind the same NAT address from replicating.
- Add another protocol test case to catch all of the branches in client_stream_processor
1 parent 1f038aa commit 98cad20

File tree

7 files changed

+97
-35
lines changed

7 files changed

+97
-35
lines changed

lib/cache/cache_base.js

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const { promisify } = require('util');
1010
const loki = require('lokijs');
1111
const ReliabilityManager = require('./reliability_manager');
1212
const _ = require('lodash');
13+
const { Writable } = require('stream');
1314

1415
const kDbName = 'cache.db';
1516

@@ -176,7 +177,7 @@ class CacheBase extends EventEmitter {
176177
* @returns {Promise<any>}
177178
*/
178179
createPutTransaction(guid, hash) {
179-
return Promise.reject(new Error("Not implemented"));
180+
return Promise.resolve(new PutTransaction(guid, hash));
180181
}
181182

182183
/**
@@ -300,7 +301,9 @@ class PutTransaction extends EventEmitter {
300301
* @returns {Promise<any>}
301302
*/
302303
getWriteStream(type, size) {
303-
return Promise.reject(new Error("Not implemented"));
304+
return Promise.resolve(new Writable({
305+
write(chunk, encoding, cb){ setImmediate(cb); }
306+
}));
304307
}
305308

306309
async invalidate() {
@@ -313,7 +316,7 @@ class PutTransaction extends EventEmitter {
313316
* @returns {Promise<any>}
314317
*/
315318
async writeFilesToPath(targetPath) {
316-
return Promise.reject(new Error("Not implemented"));
319+
return Promise.resolve();
317320
}
318321
}
319322

lib/server/server.js

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,9 @@ class CacheServer {
2020
this._port = consts.DEFAULT_PORT;
2121

2222
this._server = null;
23+
this._mirrorConfig = options.mirror ? [].concat(options.mirror) : [];
2324
this._mirrors = [];
2425

25-
if(options.mirror) {
26-
options.mirror = [].concat(options.mirror);
27-
this._mirrors = options.mirror.map(m => new TransactionMirror(m, cache));
28-
}
29-
3026
this.allowIpv6 = options.allowIpv6;
3127
this._errCallback = null;
3228
}
@@ -57,6 +53,14 @@ class CacheServer {
5753
return this._server;
5854
}
5955

56+
/**
57+
*
58+
* @returns {Array|*}
59+
*/
60+
get mirrors() {
61+
return this._mirrors;
62+
}
63+
6064
/**
6165
*
6266
* @param {Function} cb
@@ -79,12 +83,10 @@ class CacheServer {
7983
const cmdProc = new CommandProcessor(this.cache);
8084
const streamProc = new ClientStreamProcessor({clientAddress: `${socket.remoteAddress}:${socket.remotePort}`});
8185

82-
const mirrors = this._mirrors;
83-
if(mirrors.length > 0) {
86+
if(this._mirrors.length > 0) {
8487
cmdProc.on('onTransactionEnd', (trx) => {
85-
mirrors.forEach(m => {
86-
if(m.address !== socket.remoteAddress)
87-
m.queueTransaction(trx);
88+
this._mirrors.forEach(m => {
89+
m.queueTransaction(trx);
8890
});
8991
});
9092
}
@@ -103,14 +105,16 @@ class CacheServer {
103105
.pipe(socket); // Connect back to socket to send files
104106

105107
socket['commandProcessor'] = cmdProc;
106-
});
107-
108-
this._server.on('error', err => {
108+
}).on('error', err => {
109109
if (err.code === 'EADDRINUSE') {
110110
helpers.log(consts.LOG_ERR, `Port ${this.port} is already in use...`);
111111
}
112112

113113
if (this._errCallback && typeof(this._errCallback === 'function')) { this._errCallback(err); }
114+
}).on('listening', () => {
115+
this._mirrors = this._mirrorConfig
116+
.filter(m => m && m.port && m.host)
117+
.map(m => new TransactionMirror(m, this.cache));
114118
});
115119

116120
return new Promise(resolve => {

lib/server/transaction_mirror.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ class TransactionMirror {
3737
return this._connectOptions.host;
3838
}
3939

40+
get port() {
41+
return this._connectOptions.port;
42+
}
43+
4044
_connect() {
4145
helpers.log(consts.LOG_INFO, `[TransactionMirror] Connecting to ${this._connectOptions.host}:${this._connectOptions.port}`);
4246
return this._client.connect();

test/cache_base.js

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ const path = require('path');
88
const randomBuffer = require('./test_utils').randomBuffer;
99
const consts = require('../lib/constants');
1010
const sinon = require('sinon');
11+
const { Writable } = require('stream');
1112

1213
describe("Cache: Base Class", () => {
1314
let cache;
@@ -124,9 +125,9 @@ describe("Cache: Base Class", () => {
124125
});
125126

126127
describe("createPutTransaction", () => {
127-
it("should require override implementation in subclasses by returning an error", () => {
128-
return cache.createPutTransaction()
129-
.then(() => { throw new Error("Expected error!"); }, () => {});
128+
it("should return an instance of a PutTransaction", async () => {
129+
const t = await cache.createPutTransaction();
130+
assert.ok(t instanceof PutTransaction);
130131
});
131132
});
132133

@@ -264,16 +265,16 @@ describe("PutTransaction: Base Class", () => {
264265
});
265266

266267
describe("getWriteStream", () => {
267-
it("should require override implementation in subclasses by returning an error", () => {
268-
return trx.getWriteStream(consts.FILE_TYPE.INFO, 0)
269-
.then(() => { throw new Error("Expected error!"); }, () => {});
268+
it("should return a Writable stream", async() => {
269+
const s = await trx.getWriteStream(consts.FILE_TYPE.INFO, 0);
270+
assert.ok(s instanceof Writable);
270271
});
271272
});
272273

273274
describe("writeFilesToPath", () => {
274-
it("should require override implementation in subclasses by returning an error", () => {
275-
return trx.writeFilesToPath()
276-
.then(() => { throw new Error("Expected error!"); }, () => {});
275+
it("should return a promise", () => {
276+
const p = trx.writeFilesToPath();
277+
assert.ok(p instanceof Promise);
277278
});
278279
});
279280
});

test/protocol.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ describe("Protocol", () => {
323323
});
324324

325325
const tests = [
326+
{cmd: cmd.getAsset, blob: self.data.bin, type: 'bin', packetSize: 1},
326327
{cmd: cmd.getAsset, blob: self.data.bin, type: 'bin', packetSize: SMALL_PACKET_SIZE},
327328
{cmd: cmd.getInfo, blob: self.data.info, type: 'info', packetSize: MED_PACKET_SIZE},
328329
{cmd: cmd.getResource, blob: self.data.resource, type: 'resource', packetSize: LARGE_PACKET_SIZE}

test/server.js

Lines changed: 56 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,61 @@ const os = require('os');
66
const helpers = require('../lib/helpers');
77
const consts = require('../lib/constants');
88
const CacheServer = require('../lib/server/server');
9-
const Cache = require('../lib/cache/cache_base').CacheBase;
10-
const sleep = require('./test_utils').sleep;
11-
const cmd = require('./test_utils').cmd;
9+
const CacheBase = require('../lib/cache/cache_base').CacheBase;
10+
const TransactionMirror = require('../lib/server/transaction_mirror');
11+
const { generateCommandData, encodeCommand, clientWrite, sleep, cmd } = require('./test_utils');
12+
const sinon = require('sinon');
1213

13-
const cache = new Cache();
14-
const server = new CacheServer(cache, {port: 0});
14+
const cache = new CacheBase();
1515
let client;
1616

17+
describe("Server constructor", function() {
18+
it("should use the default port if no port is specified in options", () => {
19+
const s = new CacheServer(cache, { mirror:[] });
20+
assert.strictEqual(s.port, consts.DEFAULT_PORT);
21+
});
22+
});
23+
24+
describe("Server mirroring", function() {
25+
const server = new CacheServer(cache, {
26+
port: 0,
27+
mirror: [{host: "127.0.0.1", port: 8126}, {host: "1.2.3.4", port: 8126}, {host: "4.3.2.1", port: 8126}]
28+
});
29+
30+
before(function () {
31+
return server.start(err => { return Promise.reject(err); });
32+
});
33+
34+
after(function() {
35+
server.stop();
36+
});
37+
38+
beforeEach(function (done) {
39+
client = net.connect({port: server.port}, done);
40+
});
41+
42+
afterEach(() => client.end());
43+
44+
it("should mirror transactions to the configured list of mirrors", async () => {
45+
const spies = server.mirrors.map(m => {
46+
return sinon.spy(m, "queueTransaction");
47+
});
48+
49+
const testData = generateCommandData();
50+
51+
const buf = Buffer.from(helpers.encodeInt32(consts.PROTOCOL_VERSION) +
52+
encodeCommand(cmd.transactionStart, testData.guid, testData.hash) +
53+
encodeCommand(cmd.putAsset, null, null, testData.bin) +
54+
encodeCommand(cmd.transactionEnd), 'ascii');
55+
56+
await clientWrite(client, buf);
57+
58+
spies.forEach(s => assert(s.calledOnce));
59+
});
60+
});
61+
1762
describe("Server common", function() {
63+
const server = new CacheServer(cache, {port: 0});
1864

1965
before(function () {
2066
this._defaultErrCallback = err => assert(!err, `Cache Server reported error! ${err}`);
@@ -71,7 +117,7 @@ describe("Server common", function() {
71117
describe("Ipv6", function() {
72118
const ipv6Server = new CacheServer(cache, {port: 0, allowIpv6: true});
73119

74-
before(function () {
120+
before(function () {
75121
const interfaces = os.networkInterfaces();
76122
let ipv6Available = false;
77123
Object.keys(interfaces).forEach(function (interfaceName){
@@ -85,15 +131,15 @@ describe("Server common", function() {
85131
if(!ipv6Available){
86132
console.log("Skipping IPv6 tests because IPv6 is not available on this machine");
87133
this.skip();
88-
}
134+
}
89135

90136
return ipv6Server.start(err => assert(!err, `Cache Server reported error! ${err}`));
91137
});
92-
138+
93139
after(function() {
94140
ipv6Server.stop();
95141
});
96-
142+
97143
it("should bind to ipv6 when allowed", function(done) {
98144
const serverAddress = ipv6Server._server.address();
99145
assert.strictEqual(serverAddress.family, "IPv6");
@@ -108,7 +154,7 @@ describe("Server common", function() {
108154
before(function () {
109155
return ipv4Server.start(err => assert(!err, `Cache Server reported error! ${err}`));
110156
});
111-
157+
112158
after(function() {
113159
ipv4Server.stop();
114160
});

test/unity_cache_server.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,9 @@ describe("Unity Cache Server bootstrap", () => {
208208
cachePath: tmpPath
209209
}
210210
}
211+
},
212+
Server: {
213+
port: 0
211214
}
212215
});
213216

0 commit comments

Comments
 (0)