Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Add network ready event - Closes #4090 #4100

Merged
merged 10 commits into from
Aug 17, 2019
17 changes: 17 additions & 0 deletions elements/lisk-p2p/src/p2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ export {
export const EVENT_NEW_INBOUND_PEER = 'newInboundPeer';
export const EVENT_FAILED_TO_ADD_INBOUND_PEER = 'failedToAddInboundPeer';
export const EVENT_NEW_PEER = 'newPeer';
export const EVENT_NETWORK_READY = 'networkReady';

export const DEFAULT_NODE_HOST_IP = '0.0.0.0';
export const DEFAULT_DISCOVERY_INTERVAL = 30000;
Expand Down Expand Up @@ -156,6 +157,7 @@ export class P2P extends EventEmitter {
private readonly _sanitizedPeerLists: PeerLists;
private readonly _httpServer: http.Server;
private _isActive: boolean;
private _hasConnected: boolean;
private readonly _peerBook: PeerBook;
private readonly _bannedPeers: Set<string>;
private readonly _populatorInterval: number;
Expand Down Expand Up @@ -214,6 +216,7 @@ export class P2P extends EventEmitter {
);
this._config = config;
this._isActive = false;
this._hasConnected = false;
this._peerBook = new PeerBook({
secret: config.secret ? config.secret : DEFAULT_RANDOM_SECRET,
});
Expand Down Expand Up @@ -260,6 +263,9 @@ export class P2P extends EventEmitter {

// Re-emit the message to allow it to bubble up the class hierarchy.
this.emit(EVENT_CONNECT_OUTBOUND, peerInfo);
if (this._isNetworkReady()) {
this.emit(EVENT_NETWORK_READY);
}
};

this._handleOutboundPeerConnectAbort = (peerInfo: P2PPeerInfo) => {
Expand Down Expand Up @@ -777,6 +783,16 @@ export class P2P extends EventEmitter {
}
}

private _isNetworkReady(): boolean {
if (!this._hasConnected && this._peerPool.getConnectedPeers().length > 0) {
this._hasConnected = true;

return true;
}

return false;
}

private _pickRandomPeers(count: number): ReadonlyArray<P2PPeerInfo> {
const peerList: ReadonlyArray<P2PPeerInfo> = this._peerBook.getAllPeers(); // Peers whose values has been updated at least once.

Expand Down Expand Up @@ -865,6 +881,7 @@ export class P2P extends EventEmitter {
throw new Error('Cannot stop the node because it is not active');
}
this._isActive = false;
this._hasConnected = false;
this._stopPopulator();
this._peerPool.removeAllPeers();
await this._stopPeerServer();
Expand Down
4 changes: 1 addition & 3 deletions elements/lisk-p2p/src/peer_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -316,16 +316,14 @@ export class PeerPool extends EventEmitter {
);
const selectedPeers = this._peerSelectForRequest({
peers: getUniquePeersbyIp(listOfPeerInfo),
nodeInfo: this._nodeInfo,
peerLimit: 1,
requestPacket: packet,
});

if (selectedPeers.length <= 0) {
throw new RequestFailError(
'Request failed due to no peers found in peer selection',
);
}

const selectedPeerId = constructPeerIdFromPeerInfo(selectedPeers[0]);

return this.requestFromPeer(packet, selectedPeerId);
Expand Down
58 changes: 4 additions & 54 deletions elements/lisk-p2p/src/peer_selection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,6 @@ import {
} from './p2p_types';

/* tslint:disable: readonly-keyword*/
interface Histogram {
[key: number]: number;
}
interface HistogramValues {
height: number;
histogram: Histogram;
max: number;
}

export const getUniquePeersbyIp = (
peerList: ReadonlyArray<P2PDiscoveredPeerInfo>,
Expand All @@ -55,60 +47,18 @@ export const getUniquePeersbyIp = (
export const selectPeersForRequest = (
input: P2PPeerSelectionForRequestInput,
): ReadonlyArray<P2PDiscoveredPeerInfo> => {
const { peers, nodeInfo } = input;
const { peers } = input;
const peerLimit = input.peerLimit;
const nodeHeight = nodeInfo ? nodeInfo.height : 0;
const filteredPeers = peers.filter(
// Remove unreachable peers or heights below last block height
(peer: P2PDiscoveredPeerInfo) => peer.height >= nodeHeight,
);

if (filteredPeers.length === 0) {
if (peers.length === 0) {
return [];
}

// Order peers by descending height
const sortedPeers = filteredPeers.sort((a, b) => b.height - a.height);

const aggregation = 2;

const calculatedHistogramValues = sortedPeers.reduce(
(histogramValues: HistogramValues, peer: P2PDiscoveredPeerInfo) => {
const val = Math.floor(peer.height / aggregation) * aggregation;
histogramValues.histogram[val] =
(histogramValues.histogram[val] ? histogramValues.histogram[val] : 0) +
1;
if (histogramValues.histogram[val] > histogramValues.max) {
histogramValues.max = histogramValues.histogram[val];
histogramValues.height = val;
}

return histogramValues;
},
{ height: 0, histogram: {}, max: -1 },
);

// Perform histogram cut of peers too far from histogram maximum
const processedPeers = sortedPeers.filter(
peer =>
peer &&
Math.abs(calculatedHistogramValues.height - peer.height) <
aggregation + 1,
);

if (peerLimit === undefined) {
return processedPeers;
}

if (peerLimit === 1) {
const goodPeer: ReadonlyArray<P2PDiscoveredPeerInfo> = [
processedPeers[Math.floor(Math.random() * processedPeers.length)],
];

return goodPeer;
return shuffle(peers);
}

return shuffle(processedPeers).slice(0, peerLimit);
return shuffle(peers).slice(0, peerLimit);
};

export const selectPeersForSend = (
Expand Down
8 changes: 4 additions & 4 deletions elements/lisk-p2p/test/unit/peer_selection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ describe('peer selector', () => {
it('returned array should contain good peers according to algorithm', () => {
return expect(selectPeersForRequest({ peers: peerList, nodeInfo }))
.and.be.an('array')
.and.of.length(3);
.and.of.length(5);
});

it('return empty peer list for no peers as an argument', () => {
Expand Down Expand Up @@ -82,7 +82,7 @@ describe('peer selector', () => {
it('should return an array having all good peers', () => {
return expect(selectPeersForRequest({ peers: peerList, nodeInfo }))
.and.be.an('array')
.and.of.length(3);
.and.of.length(5);
});

it('should return an array of equal length equal to requested number of peers', () => {
Expand All @@ -102,7 +102,7 @@ describe('peer selector', () => {
peer => peer.height < nodeInfo.height,
);

it('should return an array with 0 good peers', () => {
it('should return an array with 1 good peer', () => {
return expect(
selectPeersForRequest({
peers: lowHeightPeers,
Expand All @@ -111,7 +111,7 @@ describe('peer selector', () => {
}),
)
.and.be.an('array')
.and.of.length(0);
.and.of.length(1);
});
});
});
Expand Down
5 changes: 4 additions & 1 deletion framework/src/modules/chain/chain.js
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,14 @@ module.exports = class Chain {
this._subscribeToEvents();

this.channel.subscribe('network:bootstrap', async () => {
this._startLoader();
this._calculateConsensus();
await this._startForging();
});

this.channel.subscribe('network:ready', async () => {
this._startLoader();
});

// Avoid receiving blocks/transactions from the network during snapshotting process
if (!this.options.loading.rebuildUpToRound) {
this.channel.subscribe('network:event', ({ data: { event, data } }) => {
Expand Down
2 changes: 1 addition & 1 deletion framework/src/modules/network/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ module.exports = class NetworkModule extends BaseModule {
}

get events() {
return ['bootstrap', 'event'];
return ['bootstrap', 'event', 'ready'];
}

get actions() {
Expand Down
5 changes: 5 additions & 0 deletions framework/src/modules/network/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
const { getRandomBytes } = require('@liskhq/lisk-cryptography');
const {
P2P,
EVENT_NETWORK_READY,
EVENT_NEW_INBOUND_PEER,
EVENT_CLOSE_INBOUND,
EVENT_CLOSE_OUTBOUND,
Expand Down Expand Up @@ -171,6 +172,10 @@ module.exports = class Network {
});

// ---- START: Bind event handlers ----
this.p2p.on(EVENT_NETWORK_READY, () => {
this.logger.debug('Node connected to the network');
this.channel.publish('network:ready');
});

this.p2p.on(EVENT_CLOSE_OUTBOUND, closePacket => {
this.logger.debug(
Expand Down