Skip to content

Commit

Permalink
Merge pull request #83 from ChainSafe/cayman/configuration
Browse files Browse the repository at this point in the history
Add IDiscv5Config configurability
  • Loading branch information
wemeetagain committed Aug 7, 2020
2 parents 7dac2f6 + c994a61 commit 64fe01f
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 58 deletions.
21 changes: 21 additions & 0 deletions src/config/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { ISessionConfig } from "../session";
import { ILookupConfig } from "../kademlia";

export type IDiscv5Config = ISessionConfig &
ILookupConfig & {
/**
* The time between pings to ensure connectivity amongst connected nodes
* defined in milliseconds
*/
pingInterval: number;
};

export const defaultConfig: IDiscv5Config = {
requestTimeout: 1 * 1000,
requestRetries: 1,
sessionTimeout: 86400 * 1000, // 1 day
sessionEstablishTimeout: 15 * 1000,
lookupParallelism: 3,
lookupNumResults: 16,
pingInterval: 300 * 1000,
};
16 changes: 8 additions & 8 deletions src/kademlia/lookup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ export class Lookup extends (EventEmitter as { new (): LookupEventEmitter }) {
this.numPeersWaiting = 0;
this.closestPeers = new Map(
closestPeers
.slice(0, this.config.numResults)
.slice(0, this.config.lookupNumResults)
.map((n) => [distance(nodeId, n), createLookupPeer(n, LookupPeerState.NotContacted)] as [bigint, ILookupPeer])
);
}
Expand Down Expand Up @@ -91,9 +91,9 @@ export class Lookup extends (EventEmitter as { new (): LookupEventEmitter }) {
atCapacity(): boolean {
switch (this.state) {
case LookupState.Stalled:
return this.numPeersWaiting >= this.config.numResults;
return this.numPeersWaiting >= this.config.lookupNumResults;
case LookupState.Iterating:
return this.numPeersWaiting >= this.config.parallelism;
return this.numPeersWaiting >= this.config.lookupParallelism;
case LookupState.Finished:
return true;
}
Expand All @@ -115,7 +115,7 @@ export class Lookup extends (EventEmitter as { new (): LookupEventEmitter }) {
return this.closestPeersByDistance()
.filter((peer) => peer.state === LookupPeerState.Succeeded)
.map((peer) => peer.nodeId)
.slice(0, this.config.numResults);
.slice(0, this.config.lookupNumResults);
}

/**
Expand All @@ -140,7 +140,7 @@ export class Lookup extends (EventEmitter as { new (): LookupEventEmitter }) {
if (peer.state === LookupPeerState.Waiting) {
this.numPeersWaiting -= 1;
peer.peersReturned += closerPeers.length;
if (peer.peersReturned >= this.config.numResults) {
if (peer.peersReturned >= this.config.lookupNumResults) {
peer.state = LookupPeerState.Succeeded;
} else if (this.maxIterationsPerPeer <= peer.iteration) {
if (peer.peersReturned > 0) {
Expand All @@ -166,14 +166,14 @@ export class Lookup extends (EventEmitter as { new (): LookupEventEmitter }) {
// The lookup makes progress if the new peer is either closer to the target than any peer seen so far
// or the lookup did not yet accumulate enough closest peers
const closest = Array.from(this.closestPeers.keys()).sort((a, b) => (b > a ? -1 : 1))[0];
progress = progress || closest === cDist || numClosest < this.config.numResults;
progress = progress || closest === cDist || numClosest < this.config.lookupNumResults;
});

// update the lookup state
if (this.state === LookupState.Iterating) {
// If there's progress, reset the noProgress counter
this.noProgress = progress ? 0 : this.noProgress + 1;
if (this.noProgress >= this.config.parallelism * this.maxIterationsPerPeer) {
if (this.noProgress >= this.config.lookupParallelism * this.maxIterationsPerPeer) {
this.state = LookupState.Stalled;
}
} else if (this.state === LookupState.Stalled) {
Expand Down Expand Up @@ -248,7 +248,7 @@ export class Lookup extends (EventEmitter as { new (): LookupEventEmitter }) {
resultCounter += 1;
// If `numResults` successful results have been delivered for the closest peers,
// the lookup is done
if (resultCounter >= this.config.numResults) {
if (resultCounter >= this.config.lookupNumResults) {
this.state = LookupState.Finished;
this.emit("finished", this.closestNodesByDistance());
return;
Expand Down
4 changes: 2 additions & 2 deletions src/kademlia/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ export interface ILookupConfig {
*
* Default is 3
*/
parallelism: number;
lookupParallelism: number;
/**
* Number of results to produce.
*
* The number of closest peers that a query must obtain successful results for before it terminates.
* Defaults to the maximum number of entries in a single kbucket.
*/
numResults: number;
lookupNumResults: number;
}

export interface ILookupEvents {
Expand Down
10 changes: 8 additions & 2 deletions src/libp2p/discv5.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import { randomBytes } from "libp2p-crypto";

import { Discv5, ENRInput } from "../service";
import { createNodeId, ENR } from "../enr";
import { IDiscv5Config } from "../config";

export interface IDiscv5DiscoveryInputOptions {
export interface IDiscv5DiscoveryInputOptions extends Partial<IDiscv5Config> {
/**
* Local ENR associated with the local libp2p peer id
*/
Expand Down Expand Up @@ -38,7 +39,12 @@ export class Discv5Discovery extends EventEmitter {

constructor(options: IDiscv5DiscoveryOptions) {
super();
this.discv5 = Discv5.create(options.enr, options.peerId, Multiaddr(options.bindAddr));
this.discv5 = Discv5.create({
enr: options.enr,
peerId: options.peerId,
multiaddr: Multiaddr(options.bindAddr),
config: options,
});
this.started = false;
options.bootEnrs.forEach((bootEnr) => this.discv5.addEnr(bootEnr));
}
Expand Down
45 changes: 23 additions & 22 deletions src/service/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ import PeerId = require("peer-id");

import { UDPTransportService } from "../transport";
import { createMagic, AuthTag, MAX_PACKET_SIZE } from "../packet";
import { REQUEST_TIMEOUT, SessionService } from "../session";
import { SessionService } from "../session";
import { ENR, NodeId, MAX_RECORD_SIZE } from "../enr";
import { IKeypair, createKeypairFromPeerId, createPeerIdFromKeypair } from "../keypair";
import {
EntryStatus,
KademliaRoutingTable,
ILookupConfig,
log2Distance,
ILookupPeer,
findNodeLog2Distance,
Expand All @@ -36,6 +35,7 @@ import {
import { Discv5EventEmitter, ENRInput, IActiveRequest, INodesResponse } from "./types";
import { AddrVotes } from "./addrVotes";
import { TimeoutMap } from "../util";
import { IDiscv5Config, defaultConfig } from "../config";

const log = debug("discv5:service");

Expand All @@ -54,6 +54,13 @@ const log = debug("discv5:service");
* registration/advertisement and performs lookups
*/

export interface IDiscv5CreateOptions {
enr: ENRInput;
peerId: PeerId;
multiaddr: Multiaddr;
config?: Partial<IDiscv5Config>;
}

/**
* User-facing service one can use to set up, start and use Discv5.
*
Expand All @@ -65,6 +72,11 @@ const log = debug("discv5:service");
* Additionally, the service offers events when peers are added to the peer table or discovered via lookup.
*/
export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
/**
* Configuration
*/
private config: IDiscv5Config;

private started = false;
/**
* Session service that establishes sessions with peers
Expand Down Expand Up @@ -95,15 +107,6 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
* the interval handler pings the associated node
*/
private connectedPeers: Map<NodeId, NodeJS.Timer>;
/**
* The time between pings to ensure connectivity amongst connected nodes
*/
private pingDelay = 300000; // 300 seconds

/**
* The configuration for iterative lookups
*/
private lookupConfig: ILookupConfig;

/**
* Id for the next lookup that we start
Expand All @@ -118,20 +121,17 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
* Default constructor.
* @param sessionService the service managing sessions underneath.
*/
constructor(sessionService: SessionService) {
constructor(config: IDiscv5Config, sessionService: SessionService) {
super();
this.config = config;
this.sessionService = sessionService;
this.kbuckets = new KademliaRoutingTable(this.sessionService.enr.nodeId, 16);
this.activeLookups = new Map();
this.activeRequests = new TimeoutMap(REQUEST_TIMEOUT, (requestId, activeRequest) =>
this.activeRequests = new TimeoutMap(this.config.requestTimeout, (requestId, activeRequest) =>
this.onActiveRequestFailed(activeRequest)
);
this.activeNodesResponses = new Map();
this.connectedPeers = new Map();
this.lookupConfig = {
parallelism: 3,
numResults: 16,
};
this.nextLookupId = 1;
this.addrVotes = new AddrVotes();
}
Expand All @@ -143,12 +143,13 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
* @param peerId the PeerId with the keypair that identifies the enr
* @param multiaddr The multiaddr which contains the the network interface and port to which the UDP server binds
*/
public static create(enr: ENRInput, peerId: PeerId, multiaddr: Multiaddr): Discv5 {
public static create({ enr, peerId, multiaddr, config = {} }: IDiscv5CreateOptions): Discv5 {
const fullConfig = { ...defaultConfig, ...config };
const decodedEnr = typeof enr === "string" ? ENR.decodeTxt(enr) : enr;
const magic = createMagic(decodedEnr.nodeId);
const udpTransport = new UDPTransportService(multiaddr, magic);
const sessionService = new SessionService(decodedEnr, createKeypairFromPeerId(peerId), udpTransport);
return new Discv5(sessionService);
const sessionService = new SessionService(fullConfig, decodedEnr, createKeypairFromPeerId(peerId), udpTransport);
return new Discv5(fullConfig, sessionService);
}

/**
Expand Down Expand Up @@ -268,7 +269,7 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
}

const knownClosestPeers = this.kbuckets.nearest(target, 16).map((enr) => enr.nodeId);
const lookup = new Lookup(this.lookupConfig, target, 3, knownClosestPeers);
const lookup = new Lookup(this.config, target, 3, knownClosestPeers);
this.activeLookups.set(lookupId, lookup);
return await new Promise((resolve) => {
lookup.on("peer", (peer: ILookupPeer) => this.sendLookup(lookupId, target, peer));
Expand Down Expand Up @@ -468,7 +469,7 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
this.sendPing(nodeId);
this.connectedPeers.set(
nodeId,
setInterval(() => this.sendPing(nodeId), this.pingDelay)
setInterval(() => this.sendPing(nodeId), this.config.pingInterval)
);
};

Expand Down
10 changes: 0 additions & 10 deletions src/session/constants.ts

This file was deleted.

1 change: 0 additions & 1 deletion src/session/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
export * from "./types";
export * from "./crypto";
export * from "./session";
export * from "./constants";
export * from "./service";
23 changes: 14 additions & 9 deletions src/session/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ import { Session } from "./session";
import { IKeypair } from "../keypair";
import { TimeoutMap } from "../util";
import { Message, RequestMessage, encode, decode, ResponseMessage, RequestId, MessageType } from "../message";
import { IPendingRequest, SessionState, ISessionEvents } from "./types";
import { SESSION_TIMEOUT, REQUEST_TIMEOUT, REQUEST_RETRIES } from "./constants";
import { IPendingRequest, SessionState, ISessionEvents, ISessionConfig } from "./types";

const log = debug("discv5:sessionService");

Expand Down Expand Up @@ -54,6 +53,11 @@ export class SessionService extends (EventEmitter as { new (): StrictEventEmitte
* The underlying packet transport
*/
public transport: ITransportService;

/**
* Configuration
*/
private config: ISessionConfig;
/**
* Pending raw requests
* A collection of request objects we are awaiting a response from the remote.
Expand All @@ -72,18 +76,19 @@ export class SessionService extends (EventEmitter as { new (): StrictEventEmitte
*/
private sessions: TimeoutMap<NodeId, Session>;

constructor(enr: ENR, keypair: IKeypair, transport: ITransportService) {
constructor(config: ISessionConfig, enr: ENR, keypair: IKeypair, transport: ITransportService) {
super();
// ensure the keypair matches the one that signed the ENR
if (!keypair.publicKey.equals(enr.publicKey)) {
throw new Error("Provided keypair does not match the provided ENR keypair");
}
this.config = config;
this.enr = enr;
this.keypair = keypair;
this.transport = transport;
this.pendingRequests = new Map();
this.pendingMessages = new Map();
this.sessions = new TimeoutMap(SESSION_TIMEOUT, this.onSessionTimeout);
this.sessions = new TimeoutMap(this.config.sessionTimeout, this.onSessionTimeout);
}

/**
Expand Down Expand Up @@ -372,7 +377,7 @@ export class SessionService extends (EventEmitter as { new (): StrictEventEmitte
}

// session has been established, update the timeout
this.sessions.setTimeout(srcId, SESSION_TIMEOUT);
this.sessions.setTimeout(srcId, this.config.sessionTimeout);

// decrypt the message
this.onMessage(src, {
Expand Down Expand Up @@ -490,7 +495,7 @@ export class SessionService extends (EventEmitter as { new (): StrictEventEmitte
this.transport.send(dst, packet);
let requests = this.pendingRequests.get(dstStr);
if (!requests) {
requests = new TimeoutMap(REQUEST_TIMEOUT, this.onPendingRequestTimeout);
requests = new TimeoutMap(this.config.requestTimeout, this.onPendingRequestTimeout);
this.pendingRequests.set(dstStr, requests);
}
requests.set(message ? message.id : 0n, request);
Expand Down Expand Up @@ -521,7 +526,7 @@ export class SessionService extends (EventEmitter as { new (): StrictEventEmitte
*/
private onPendingRequestTimeout = (requestId: RequestId, request: IPendingRequest): void => {
const dstId = request.dstId;
if (request.retries >= REQUEST_RETRIES) {
if (request.retries >= this.config.requestRetries) {
if (request.packet.type === PacketType.Random || request.packet.type === PacketType.WhoAreYou) {
// no response from peer, flush all pending messages and drop session
log("Session couldn't be established with node: %s at %s", dstId, request.dst);
Expand All @@ -544,7 +549,7 @@ export class SessionService extends (EventEmitter as { new (): StrictEventEmitte
const dstStr = request.dst.toString();
let requests = this.pendingRequests.get(dstStr);
if (!requests) {
requests = new TimeoutMap(REQUEST_TIMEOUT, this.onPendingRequestTimeout);
requests = new TimeoutMap(this.config.requestTimeout, this.onPendingRequestTimeout);
this.pendingRequests.set(dstStr, requests);
}
requests.set(requestId, request);
Expand All @@ -558,7 +563,7 @@ export class SessionService extends (EventEmitter as { new (): StrictEventEmitte
private onSessionTimeout = (nodeId: NodeId, session: Session): void => {
for (const pendingRequests of this.pendingRequests.values()) {
if (Array.from(pendingRequests.values()).find((request) => request.dstId === nodeId)) {
this.sessions.setWithTimeout(nodeId, session, REQUEST_TIMEOUT);
this.sessions.setWithTimeout(nodeId, session, this.config.requestTimeout);
return;
}
}
Expand Down
22 changes: 22 additions & 0 deletions src/session/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,28 @@ import { NodeId, ENR } from "../enr";
import { Packet, AuthTag } from "../packet";
import { Message, RequestMessage } from "../message";

export interface ISessionConfig {
/**
* The timeout for each UDP request
* defined in milliseconds
*/
requestTimeout: number;
/**
* The number of retries for each UDP request
*/
requestRetries: number;
/**
* The session timeout for each node
* defined in milliseconds
*/
sessionTimeout: number;
/**
* The timeout for session establishment
* defined in milliseconds
*/
sessionEstablishTimeout: number;
}

export enum SessionState {
/**
* A WHOAREYOU packet has been sent, and the Session is awaiting an Authentication response.
Expand Down
Loading

0 comments on commit 64fe01f

Please sign in to comment.