Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Commit

Permalink
Merge pull request #4017 from LiskHQ/3948-minimal_peers_list_data_on_…
Browse files Browse the repository at this point in the history
…discovery

Add get minimal peers list data for discovery process - Closes #3948
  • Loading branch information
jondubois committed Jul 31, 2019
2 parents 72b4d33 + 5e00024 commit 1e5467c
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 461 deletions.
33 changes: 13 additions & 20 deletions elements/lisk-p2p/src/p2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ interface SCServerUpdated extends SCServer {
import {
constructPeerId,
constructPeerIdFromPeerInfo,
REMOTE_RPC_GET_ALL_PEERS_LIST,
REMOTE_RPC_GET_PEERS_LIST,
} from './peer';

import {
Expand All @@ -46,6 +46,7 @@ import {
import { PeerInboundHandshakeError } from './errors';

import {
P2PBasicPeerInfoList,
P2PCheckPeerCompatibility,
P2PClosePacket,
P2PConfig,
Expand All @@ -58,8 +59,6 @@ import {
P2PRequestPacket,
P2PResponsePacket,
PeerLists,
ProtocolPeerInfo,
ProtocolPeerInfoList,
} from './p2p_types';

import { P2PRequest } from './p2p_request';
Expand Down Expand Up @@ -215,7 +214,7 @@ export class P2P extends EventEmitter {

// This needs to be an arrow function so that it can be used as a listener.
this._handlePeerPoolRPC = (request: P2PRequest) => {
if (request.procedure === REMOTE_RPC_GET_ALL_PEERS_LIST) {
if (request.procedure === REMOTE_RPC_GET_PEERS_LIST) {
this._handleGetPeersRequest(request);
}
// Re-emit the request for external use.
Expand Down Expand Up @@ -759,26 +758,20 @@ export class P2P extends EventEmitter {
random,
Math.min(minimumPeerDiscoveryThreshold, knownPeers.length),
);
/* tslint:enable no-magic-numbers*/

// TODO: Remove fields that are specific to the current Lisk protocol.
const peers = this._pickRandomPeers(randomPeerCount).map(
(peerInfo: P2PPeerInfo): ProtocolPeerInfo => {
const { ipAddress, ...peerInfoWithoutIp } = peerInfo;

// The options property is not read by the current legacy protocol but it should be added anyway for future compatibility.
return {
...peerInfoWithoutIp,
ip: ipAddress,
};
},
const basicPeers = this._pickRandomPeers(randomPeerCount).map(
(peerInfo: P2PPeerInfo): P2PPeerInfo =>
// Discovery process only require minmal peers data
({
ipAddress: peerInfo.ipAddress,
wsPort: peerInfo.wsPort,
}),
);
const protocolPeerInfoList: ProtocolPeerInfoList = {
const peerInfoList: P2PBasicPeerInfoList = {
success: true,
peers,
peers: basicPeers,
};

request.end(protocolPeerInfoList);
request.end(peerInfoList);
}

private _isTrustedPeer(peerId: string): boolean {
Expand Down
5 changes: 5 additions & 0 deletions elements/lisk-p2p/src/p2p_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ export interface ProtocolPeerInfoList {
readonly success: boolean;
}

export interface P2PBasicPeerInfoList {
readonly peers: ReadonlyArray<P2PPeerInfo>;
readonly success: boolean;
}

// TODO later: Switch to LIP protocol format.
export interface ProtocolRPCRequestPacket {
readonly data: unknown;
Expand Down
8 changes: 4 additions & 4 deletions elements/lisk-p2p/src/peer/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ import { P2PRequest } from '../p2p_request';
import * as socketClusterClient from 'socketcluster-client';
import { SCServerSocket } from 'socketcluster-server';
import {
validateBasicPeersInfoList,
validatePeerInfo,
validatePeerInfoList,
validateProtocolMessage,
validateRPCRequest,
} from '../validation';
Expand Down Expand Up @@ -91,7 +91,7 @@ export const REMOTE_EVENT_MESSAGE = 'remote-message';

export const REMOTE_RPC_UPDATE_PEER_INFO = 'updateMyself';
export const REMOTE_RPC_GET_NODE_INFO = 'status';
export const REMOTE_RPC_GET_ALL_PEERS_LIST = 'list';
export const REMOTE_RPC_GET_PEERS_LIST = 'getPeers';

export const DEFAULT_CONNECT_TIMEOUT = 2000;
export const DEFAULT_ACK_TIMEOUT = 2000;
Expand Down Expand Up @@ -451,10 +451,10 @@ export class Peer extends EventEmitter {
public async fetchPeers(): Promise<ReadonlyArray<P2PPeerInfo>> {
try {
const response: P2PResponsePacket = await this.request({
procedure: REMOTE_RPC_GET_ALL_PEERS_LIST,
procedure: REMOTE_RPC_GET_PEERS_LIST,
});

return validatePeerInfoList(response.data);
return validateBasicPeersInfoList(response.data);
} catch (error) {
this.emit(EVENT_FAILED_TO_FETCH_PEERS, error);

Expand Down
2 changes: 1 addition & 1 deletion elements/lisk-p2p/src/peer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export {
EVENT_UPDATED_PEER_INFO,
Peer,
PeerConfig,
REMOTE_RPC_GET_ALL_PEERS_LIST,
REMOTE_RPC_GET_PEERS_LIST,
} from './base';

export * from './inbound';
Expand Down
142 changes: 0 additions & 142 deletions elements/lisk-p2p/src/peer/outbound.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@
*
*/

import {
FetchPeerStatusError,
PeerOutboundConnectionError,
RPCResponseError,
} from '../errors';

import {
ClientOptionsUpdated,
convertNodeInfoToLegacyFormat,
Expand All @@ -29,23 +23,20 @@ import {
PeerConfig,
REMOTE_EVENT_MESSAGE,
REMOTE_EVENT_RPC_REQUEST,
REMOTE_RPC_GET_NODE_INFO,
} from './base';

import { EVENT_PING } from './inbound';

import {
P2PDiscoveredPeerInfo,
P2PMessagePacket,
P2PNodeInfo,
P2PPeerInfo,
P2PRequestPacket,
P2PResponsePacket,
} from '../p2p_types';

import * as querystring from 'querystring';
import * as socketClusterClient from 'socketcluster-client';
import { validatePeerInfo } from '../validation';

type SCClientSocket = socketClusterClient.SCClientSocket;

Expand Down Expand Up @@ -215,136 +206,3 @@ export class OutboundPeer extends Peer {
outboundSocket.off(EVENT_PING);
}
}

export const connectAndRequest = async (
basicPeerInfo: P2PPeerInfo,
procedure: string,
nodeInfo?: P2PNodeInfo,
peerConfig?: PeerConfig,
): Promise<P2PResponsePacket> =>
new Promise<P2PResponsePacket>(
(resolve, reject): void => {
const legacyNodeInfo = nodeInfo
? convertNodeInfoToLegacyFormat(nodeInfo)
: undefined;
// Add a new field discovery to tell the receiving side that the connection will be short lived
const requestPacket = {
procedure,
};
// Ideally, we should JSON-serialize the whole NodeInfo object but this cannot be done for compatibility reasons, so instead we put it inside an options property.
const clientOptions: ClientOptionsUpdated = {
hostname: basicPeerInfo.ipAddress,
port: basicPeerInfo.wsPort,
query: querystring.stringify({
...legacyNodeInfo,
options: JSON.stringify(legacyNodeInfo),
}),
connectTimeout: peerConfig
? peerConfig.connectTimeout
? peerConfig.connectTimeout
: DEFAULT_CONNECT_TIMEOUT
: DEFAULT_CONNECT_TIMEOUT,
ackTimeout: peerConfig
? peerConfig.connectTimeout
? peerConfig.connectTimeout
: DEFAULT_CONNECT_TIMEOUT
: DEFAULT_ACK_TIMEOUT,
multiplex: false,
autoConnect: false,
autoReconnect: false,
maxPayload: peerConfig ? peerConfig.wsMaxPayload : undefined,
};

const outboundSocket = socketClusterClient.create(clientOptions);
// Bind an error handler immediately after creating the socket; otherwise errors may crash the process
// tslint:disable-next-line no-empty
outboundSocket.on('error', () => {});
outboundSocket.on(
EVENT_PING,
(_: undefined, res: (_: undefined, data: string) => void) => {
res(undefined, RESPONSE_PONG);
},
);

// tslint:disable-next-line no-let
let disconnectStatusCode: number;
// tslint:disable-next-line no-let
let disconnectReason: string;
const closeHandler = (statusCode: number, reason: string) => {
disconnectStatusCode = statusCode;
disconnectReason = reason;
};
outboundSocket.once('close', closeHandler);

// Attaching handlers for various events that could be used future for logging or any other application
outboundSocket.emit(
REMOTE_EVENT_RPC_REQUEST,
{
type: '/RPCRequest',
procedure: requestPacket.procedure,
},
(err: Error | undefined, responseData: unknown) => {
outboundSocket.off('close', closeHandler);
outboundSocket.disconnect();
if (err) {
const isFailedConnection =
disconnectReason &&
(err.name === 'TimeoutError' ||
err.name === 'BadConnectionError');
const connectionError = new PeerOutboundConnectionError(
isFailedConnection ? disconnectReason : err.message,
disconnectStatusCode,
);
reject(connectionError);

return;
}
if (responseData) {
const responsePacket = responseData as P2PResponsePacket;
resolve(responsePacket);

return;
}

reject(
new RPCResponseError(
`Failed to handle response for procedure ${
requestPacket.procedure
}`,
`${basicPeerInfo.ipAddress}:${basicPeerInfo.wsPort}`,
),
);
},
);
},
);

export const connectAndFetchPeerInfo = async (
basicPeerInfo: P2PPeerInfo,
nodeInfo?: P2PNodeInfo,
peerConfig?: PeerConfig,
): Promise<P2PPeerInfo> => {
try {
const responsePacket = await connectAndRequest(
basicPeerInfo,
REMOTE_RPC_GET_NODE_INFO,
nodeInfo,
peerConfig,
);

const protocolPeerInfo = responsePacket.data;
const rawPeerInfo = {
...protocolPeerInfo,
ip: basicPeerInfo.ipAddress,
wsPort: basicPeerInfo.wsPort,
};

return validatePeerInfo(rawPeerInfo);
} catch (error) {
throw new FetchPeerStatusError(
`Error occurred while fetching information from ${
basicPeerInfo.ipAddress
}:${basicPeerInfo.wsPort}`,
);
}
};
72 changes: 0 additions & 72 deletions elements/lisk-p2p/src/peer_discovery.ts

This file was deleted.

Loading

0 comments on commit 1e5467c

Please sign in to comment.