Skip to content

Commit

Permalink
Merge pull request #4296 from LiskHQ/4224-add_unit_tests_p2p_inbound_…
Browse files Browse the repository at this point in the history
…peer

Add unit tests for P2P Inbound peer class - Closes #4224
  • Loading branch information
shuse2 committed Sep 27, 2019
2 parents fa47f90 + cdeea47 commit e60bcc5
Show file tree
Hide file tree
Showing 10 changed files with 687 additions and 320 deletions.
81 changes: 47 additions & 34 deletions elements/lisk-p2p/src/peer/base.ts
Expand Up @@ -60,6 +60,11 @@ import {
validateRPCRequest,
} from '../utils';

export const socketErrorStatusCodes = {
...(socketClusterClient.SCClientSocket as any).errorStatuses,
1000: 'Intentionally disconnected',
};

// Can be used to convert a rate which is based on the rateCalculationInterval into a per-second rate.
const RATE_NORMALIZATION_FACTOR = 1000;

Expand Down Expand Up @@ -163,42 +168,10 @@ export class Peer extends EventEmitter {
this._wsMessageRate = 0;
this._rateInterval = this._peerConfig.rateCalculationInterval;
this._counterResetInterval = setInterval(() => {
this._wsMessageRate =
(this._wsMessageCount * RATE_NORMALIZATION_FACTOR) / this._rateInterval;
this._wsMessageCount = 0;

if (this._wsMessageRate > this._peerConfig.wsMaxMessageRate) {
this.applyPenalty(this._peerConfig.wsMaxMessageRatePenalty);

return;
}

this._rpcRates = new Map(
[...this._rpcCounter.entries()].map(([key, value]) => {
const rate = value / this._rateInterval;

return [key, rate] as any;
}),
);
this._rpcCounter = new Map();

this._messageRates = new Map(
[...this._messageCounter.entries()].map(([key, value]) => {
const rate = value / this._rateInterval;

return [key, rate] as any;
}),
);
this._messageCounter = new Map();
this._resetCounters();
}, this._rateInterval);
this._productivityResetInterval = setInterval(() => {
// If peer has not recently responded, reset productivity to 0
if (
this._productivity.lastResponded <
Date.now() - DEFAULT_PRODUCTIVITY_RESET_INTERVAL
) {
this._productivity = { ...DEFAULT_PRODUCTIVITY };
}
this._resetProductivity();
}, DEFAULT_PRODUCTIVITY_RESET_INTERVAL);
this._productivity = { ...DEFAULT_PRODUCTIVITY };

Expand Down Expand Up @@ -530,6 +503,46 @@ export class Peer extends EventEmitter {
}
}

private _resetCounters(): void {
this._wsMessageRate =
(this._wsMessageCount * RATE_NORMALIZATION_FACTOR) / this._rateInterval;
this._wsMessageCount = 0;

if (this._wsMessageRate > this._peerConfig.wsMaxMessageRate) {
this.applyPenalty(this._peerConfig.wsMaxMessageRatePenalty);

return;
}

this._rpcRates = new Map(
[...this._rpcCounter.entries()].map(([key, value]) => {
const rate = value / this._rateInterval;

return [key, rate] as any;
}),
);
this._rpcCounter = new Map();

this._messageRates = new Map(
[...this._messageCounter.entries()].map(([key, value]) => {
const rate = value / this._rateInterval;

return [key, rate] as any;
}),
);
this._messageCounter = new Map();
}

private _resetProductivity(): void {
// If peer has not recently responded, reset productivity to 0
if (
this._productivity.lastResponded <
Date.now() - DEFAULT_PRODUCTIVITY_RESET_INTERVAL
) {
this._productivity = { ...DEFAULT_PRODUCTIVITY };
}
}

private _updateFromProtocolPeerInfo(rawPeerInfo: unknown): void {
const protocolPeerInfo = { ...rawPeerInfo, ip: this._ipAddress };
const newPeerInfo = validatePeerInfo(
Expand Down
42 changes: 20 additions & 22 deletions elements/lisk-p2p/src/peer/inbound.ts
Expand Up @@ -25,12 +25,12 @@ import {
REMOTE_SC_EVENT_RPC_REQUEST,
} from '../events';
import { P2PDiscoveredPeerInfo } from '../p2p_types';
import { Peer, PeerConfig, SCServerSocketUpdated } from './base';

const socketErrorStatusCodes = {
...(SCServerSocket as any).errorStatuses,
1000: 'Intentionally disconnected',
};
import {
Peer,
PeerConfig,
SCServerSocketUpdated,
socketErrorStatusCodes,
} from './base';

const getRandomPingDelay = () =>
Math.random() * (DEFAULT_PING_INTERVAL_MAX - DEFAULT_PING_INTERVAL_MIN) +
Expand All @@ -43,7 +43,6 @@ export class InboundPeer extends Peer {
code: number,
reason: string | undefined,
) => void;
private readonly _sendPing: () => void;
private _pingTimeoutId: NodeJS.Timer;

public constructor(
Expand All @@ -68,21 +67,9 @@ export class InboundPeer extends Peer {
reason,
});
};
this._sendPing = () => {
const pingStart = Date.now();
this._socket.emit(
REMOTE_EVENT_PING,
undefined,
(_: Error, __: unknown) => {
this._latency = Date.now() - pingStart;
this._pingTimeoutId = setTimeout(
this._sendPing,
getRandomPingDelay(),
);
},
);
};
this._pingTimeoutId = setTimeout(this._sendPing, getRandomPingDelay());
this._pingTimeoutId = setTimeout(() => {
this._sendPing();
}, getRandomPingDelay());
this._socket = peerSocket;
this._bindHandlersToInboundSocket(this._socket);
}
Expand All @@ -95,9 +82,20 @@ export class InboundPeer extends Peer {

public disconnect(code: number = 1000, reason?: string): void {
super.disconnect(code, reason);
clearTimeout(this._pingTimeoutId);
this._unbindHandlersFromInboundSocket(this._socket);
}

private _sendPing(): void {
const pingStart = Date.now();
this._socket.emit(REMOTE_EVENT_PING, undefined, () => {
this._latency = Date.now() - pingStart;
this._pingTimeoutId = setTimeout(() => {
this._sendPing();
}, getRandomPingDelay());
});
}

// All event handlers for the inbound socket should be bound in this method.
private _bindHandlersToInboundSocket(
inboundSocket: SCServerSocketUpdated,
Expand Down
12 changes: 6 additions & 6 deletions elements/lisk-p2p/src/peer/outbound.ts
Expand Up @@ -33,12 +33,12 @@ import {
P2PResponsePacket,
} from '../p2p_types';
import { sanitizeNodeInfoToLegacyFormat } from '../utils';
import { Peer, PeerConfig, SCClientSocket } from './base';

const socketErrorStatusCodes = {
...(socketClusterClient.SCClientSocket as any).errorStatuses,
1000: 'Intentionally disconnected',
};
import {
Peer,
PeerConfig,
SCClientSocket,
socketErrorStatusCodes,
} from './base';

interface ClientOptionsUpdated {
readonly hostname: string;
Expand Down
3 changes: 2 additions & 1 deletion elements/lisk-p2p/test/integration/_global_hooks.ts
Expand Up @@ -12,6 +12,7 @@
* Removal or modification of this copyright notice is prohibited.
*
*/

beforeEach(() => {
return sandbox.restore();
sandbox.restore();
});
120 changes: 61 additions & 59 deletions elements/lisk-p2p/test/integration/peer_banning.ts
Expand Up @@ -13,7 +13,7 @@
*
*/
import { expect } from 'chai';
import { P2P } from '../../src/index';
import { P2P, P2PDiscoveredPeerInfo } from '../../src/index';
import { wait } from '../utils/helpers';
import { createNetwork, destroyNetwork } from 'utils/network_setup';
import {
Expand Down Expand Up @@ -51,76 +51,78 @@ describe('Peer banning mechanism', () => {
});

p2pNodeList = await createNetwork({ customConfig });

const firstNode = p2pNodeList[0];

firstNode.on(EVENT_BAN_PEER, peerId => {
collectedEvents.set('EVENT_BAN_PEER', peerId);
});
firstNode.on(EVENT_UNBAN_PEER, peerId => {
collectedEvents.set('EVENT_UNBAN_PEER', peerId);
});
firstNode.on(EVENT_CLOSE_INBOUND, packet => {
collectedEvents.set('EVENT_CLOSE_INBOUND', packet);
});
});

afterEach(async () => {
await destroyNetwork(p2pNodeList);
});

it('should not ban a bad peer for a 10 point penalty', async () => {
const firstP2PNode = p2pNodeList[0];
const badPeer = firstP2PNode.getConnectedPeers()[1];
const peerPenalty = {
peerId: `${badPeer.ipAddress}:${badPeer.wsPort}`,
penalty: 10,
};
firstP2PNode.applyPenalty(peerPenalty);
const updatedConnectedPeers = firstP2PNode.getConnectedPeers();
expect(updatedConnectedPeers.map(peer => peer.wsPort)).to.include(
badPeer.wsPort,
);
describe('when penalty is under 100', () => {
it('should not ban any peer', async () => {
const firstP2PNode = p2pNodeList[0];
const badPeer = firstP2PNode.getConnectedPeers()[1];
const peerPenalty = {
peerId: `${badPeer.ipAddress}:${badPeer.wsPort}`,
penalty: 10,
};
firstP2PNode.applyPenalty(peerPenalty);
const updatedConnectedPeers = firstP2PNode.getConnectedPeers();
expect(updatedConnectedPeers.map(peer => peer.wsPort)).to.include(
badPeer.wsPort,
);
});
});

it('should ban a bad peer for a 100 point penalty', async () => {
const firstP2PNode = p2pNodeList[0];
const badPeer = firstP2PNode.getConnectedPeers()[2];
const peerPenalty = {
peerId: `${badPeer.ipAddress}:${badPeer.wsPort}`,
penalty: 100,
};
firstP2PNode.applyPenalty(peerPenalty);
const updatedConnectedPeers = firstP2PNode.getConnectedPeers();
describe('when penalty is 100 or more', () => {
let badPeer: P2PDiscoveredPeerInfo;

expect(updatedConnectedPeers.map(peer => peer.wsPort)).to.not.include(
badPeer.wsPort,
);
});
beforeEach(async () => {
const firstNode = p2pNodeList[0];
firstNode.on(EVENT_BAN_PEER, peerId => {
collectedEvents.set('EVENT_BAN_PEER', peerId);
});
firstNode.on(EVENT_UNBAN_PEER, peerId => {
collectedEvents.set('EVENT_UNBAN_PEER', peerId);
});
firstNode.on(EVENT_CLOSE_INBOUND, packet => {
collectedEvents.set('EVENT_CLOSE_INBOUND', packet);
});
badPeer = firstNode.getConnectedPeers()[2];
const peerPenalty = {
peerId: `${badPeer.ipAddress}:${badPeer.wsPort}`,
penalty: 100,
};
firstNode.applyPenalty(peerPenalty);
});

it(`should fire ${EVENT_BAN_PEER} event`, async () => {
expect(collectedEvents.get('EVENT_BAN_PEER')).to.exist;
});
it('should ban the peer', async () => {
const updatedConnectedPeers = p2pNodeList[0].getConnectedPeers();
expect(updatedConnectedPeers.map(peer => peer.wsPort)).to.not.include(
badPeer.wsPort,
);
});

it('should emit peerId of banned peer', async () => {
expect(collectedEvents.get('EVENT_BAN_PEER')).to.eql('127.0.0.1:5002');
});
it(`should fire ${EVENT_BAN_PEER} event`, async () => {
expect(collectedEvents.get('EVENT_BAN_PEER')).to.exist;
});

it(`should fire ${EVENT_BAN_PEER} event with peerId`, async () => {
expect(collectedEvents.get('EVENT_BAN_PEER')).to.eql('127.0.0.1:5002');
});

it(`should fire ${EVENT_CLOSE_INBOUND} event`, async () => {
expect(collectedEvents.get('EVENT_CLOSE_INBOUND')).to.exist;
});

it('should unban a peer after the ban period', async () => {
const firstP2PNode = p2pNodeList[0];
const badPeer = firstP2PNode.getConnectedPeers()[2];
const peerPenalty = {
peerId: `${badPeer.ipAddress}:${badPeer.wsPort}`,
penalty: 100,
};
firstP2PNode.applyPenalty(peerPenalty);
// Wait for ban time to expire and peer to be re-discovered
await wait(1000);
const updatedConnectedPeers = firstP2PNode.getConnectedPeers();
it('should unban a peer after the ban period', async () => {
// Wait for ban time to expire and peer to be re-discovered
await wait(200);
const updatedConnectedPeers = p2pNodeList[0].getConnectedPeers();

expect(updatedConnectedPeers.map(peer => peer.wsPort)).to.include(
badPeer.wsPort,
);
expect(collectedEvents.get('EVENT_UNBAN_PEER')).to.exist;
expect(updatedConnectedPeers.map(peer => peer.wsPort)).to.include(
badPeer.wsPort,
);
expect(collectedEvents.get('EVENT_UNBAN_PEER')).to.exist;
});
});
});
2 changes: 1 addition & 1 deletion elements/lisk-p2p/test/unit/_global_hooks.ts
Expand Up @@ -14,5 +14,5 @@
*/

afterEach(() => {
return sandbox.restore();
sandbox.restore();
});
14 changes: 14 additions & 0 deletions elements/lisk-p2p/test/unit/p2p_request.ts
Expand Up @@ -14,6 +14,7 @@
*/
import { expect } from 'chai';
import { P2PRequest, RequestOptions } from '../../src/p2p_request';
import { RPCResponseAlreadySentError } from '../../src/errors';

describe('p2p_request', () => {
let requestOptions: RequestOptions;
Expand Down Expand Up @@ -111,6 +112,19 @@ describe('p2p_request', () => {
expect(request)
.to.have.property('wasResponseSent')
.which.equals(true));

it('should throw error when sending another request', () => {
try {
request.end('hello');
} catch (e) {
expect(e).to.be.an.instanceOf(RPCResponseAlreadySentError);
expect(e.message).to.be.eql(
`A response has already been sent for the request procedure <<${
requestOptions.procedure
}>>`,
);
}
});
});

describe('#error', () => {
Expand Down

0 comments on commit e60bcc5

Please sign in to comment.