Skip to content

Commit

Permalink
Refactor discovery consumer (#3403)
Browse files Browse the repository at this point in the history
* Integrate discv5 into discovery consumer

* Start discovery

* Update test types

* Add metrics for find node queries

* Add cachedENRsSize metric

* Add dashboard

* Track dropped ENRs

* Track peersToConnect metric

* Improve metrics

* Set exemplar to false

* More charts

* Fix e2e tests

* Tune charts

* WIP test

* Uncomment retry

* Track count of sync peers

* Review libp2p options

* Disable libp2p latency monitor

* Improve PeerManager peer data

* Overshoot when connecting to peers

* Skip discv5 e2e test
  • Loading branch information
dapplion committed Nov 4, 2021
1 parent 70a534d commit 08dbb21
Show file tree
Hide file tree
Showing 30 changed files with 6,030 additions and 4,215 deletions.
9,056 changes: 5,282 additions & 3,774 deletions docker/grafana/provisioning/dashboards/lodestar.json

Large diffs are not rendered by default.

8 changes: 2 additions & 6 deletions packages/cli/src/cmds/beacon/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {ErrorAborted} from "@chainsafe/lodestar-utils";
import {LevelDbController} from "@chainsafe/lodestar-db";
import {BeaconNode, BeaconDb, createNodeJsLibp2p} from "@chainsafe/lodestar";
// eslint-disable-next-line no-restricted-imports
import {createDbMetrics, createDiscv5Metrics} from "@chainsafe/lodestar/lib/metrics";
import {createDbMetrics} from "@chainsafe/lodestar/lib/metrics";
import {createIBeaconConfig} from "@chainsafe/lodestar-config";
import {ACTIVE_PRESET, PresetName} from "@chainsafe/lodestar-params";
import {IGlobalArgs} from "../../options";
Expand Down Expand Up @@ -56,15 +56,11 @@ export async function beaconHandler(args: IBeaconArgs & IGlobalArgs): Promise<vo
if (ACTIVE_PRESET === PresetName.minimal) logger.info("ACTIVE_PRESET == minimal preset");

let dbMetrics: null | ReturnType<typeof createDbMetrics> = null;
let discv5Metrics: null | ReturnType<typeof createDiscv5Metrics> = null;
// additional metrics registries
const metricsRegistries = [];
if (options.metrics.enabled) {
dbMetrics = createDbMetrics();
discv5Metrics = createDiscv5Metrics();
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
options.network.discv5!.metrics = discv5Metrics.metrics;
metricsRegistries.push(dbMetrics.registry, discv5Metrics.registry);
metricsRegistries.push(dbMetrics.registry);
}
const db = new BeaconDb({
config,
Expand Down
9 changes: 9 additions & 0 deletions packages/config/src/genesisConfig/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ export function createICachedGenesis(chainForkConfig: IChainForkConfig, genesisV
return forkName;
},

forkDigest2ForkNameOption(forkDigest: ForkDigest | ForkDigestHex): ForkName | null {
const forkDigestHex = toHexStringNoPrefix(forkDigest);
const forkName = forkNameByForkDigest.get(forkDigestHex);
if (!forkName) {
return null;
}
return forkName;
},

forkName2ForkDigest(forkName: ForkName): ForkDigest {
const forkDigest = forkDigestByForkName.get(forkName);
if (!forkDigest) {
Expand Down
1 change: 1 addition & 0 deletions packages/config/src/genesisConfig/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export type ForkDigestHex = string;

export interface IForkDigestContext {
forkDigest2ForkName(forkDigest: ForkDigest | ForkDigestHex): ForkName;
forkDigest2ForkNameOption(forkDigest: ForkDigest | ForkDigestHex): ForkName | null;
forkName2ForkDigest(forkName: ForkName): ForkDigest;
forkName2ForkDigestHex(forkName: ForkName): ForkDigestHex;
}
Expand Down
33 changes: 1 addition & 32 deletions packages/lodestar/src/metrics/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
import {getCurrentSlot} from "@chainsafe/lodestar-beacon-state-transition";
import {IChainForkConfig} from "@chainsafe/lodestar-config";
import {allForks} from "@chainsafe/lodestar-types";
import {collectDefaultMetrics, Counter, Gauge, Registry} from "prom-client";
import {collectDefaultMetrics, Counter, Registry} from "prom-client";
import gcStats from "prometheus-gc-stats";
import {DbMetricLabels, IDbMetrics} from "@chainsafe/lodestar-db";
import {createBeaconMetrics, IBeaconMetrics} from "./metrics/beacon";
import {createLodestarMetrics, ILodestarMetrics} from "./metrics/lodestar";
import {IMetricsOptions} from "./options";
import {RegistryMetricCreator} from "./utils/registryMetricCreator";
import {createValidatorMonitor, IValidatorMonitor} from "./validatorMonitor";
import {IDiscv5Metrics} from "@chainsafe/discv5";

export type IMetrics = IBeaconMetrics & ILodestarMetrics & IValidatorMonitor & {register: Registry};

Expand Down Expand Up @@ -70,33 +69,3 @@ export function createDbMetrics(): {metrics: IDbMetrics; registry: Registry} {
registry.registerMetric(metrics.dbWrites);
return {metrics, registry};
}

export function createDiscv5Metrics(): {metrics: IDiscv5Metrics; registry: Registry} {
const metrics = {
kadTableSize: new Gauge({
name: "lodestar_discv5_kad_table_size",
help: "Total size of the discv5 kad table",
}) as Gauge<string> & {collect(): void},
activeSessionCount: new Gauge({
name: "lodestar_discv5_active_session_count",
help: "Count of the discv5 active sessions",
}) as Gauge<string> & {collect(): void},
connectedPeerCount: new Gauge({
name: "lodestar_discv5_connected_peer_count",
help: "Count of the discv5 connected peers",
}) as Gauge<string> & {collect(): void},
sentMessageCount: new Gauge<"type">({
name: "lodestar_discv5_sent_message_count",
help: "Count of the discv5 messages sent by message type",
labelNames: ["type"],
}) as Gauge<"type"> & {collect(): void},
rcvdMessageCount: new Gauge<"type">({
name: "lodestar_discv5_rcvd_message_count",
help: "Count of the discv5 messages received by message type",
labelNames: ["type"],
}) as Gauge<"type"> & {collect(): void},
};
const registry = new Registry();
Object.keys(metrics).forEach((metricName) => registry.registerMetric(metrics[metricName as keyof typeof metrics]));
return {metrics, registry};
}
87 changes: 87 additions & 0 deletions packages/lodestar/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ export function createLodestarMetrics(
help: "number of peers, labeled by direction",
labelNames: ["direction"],
}),
peersSync: register.gauge({
name: "lodestar_peers_sync_count",
help: "Current count of peers useful for sync",
}),
peerConnectedEvent: register.gauge<"direction">({
name: "lodestar_peer_connected_total",
help: "Total number of peer:connected event, labeled by direction",
Expand All @@ -68,6 +72,89 @@ export function createLodestarMetrics(
name: "lodestar_peers_total_unique_connected",
help: "Total number of unique peers that have had a connection with",
}),
peersRequestedToConnect: register.gauge({
name: "lodestar_peers_requested_total_to_connect",
help: "Priorization results total peers count requested to connect",
}),
peersRequestedToDisconnect: register.gauge({
name: "lodestar_peers_requested_total_to_disconnect",
help: "Priorization results total peers count requested to disconnect",
}),
peersRequestedSubnetsToQuery: register.gauge<"type">({
name: "lodestar_peers_requested_total_subnets_to_query",
help: "Priorization results total subnets to query and discover peers in",
labelNames: ["type"],
}),
peersRequestedSubnetsPeerCount: register.gauge<"type">({
name: "lodestar_peers_requested_total_subnets_peers_count",
help: "Priorization results total peers in subnets to query and discover peers in",
labelNames: ["type"],
}),

discovery: {
peersToConnect: register.gauge({
name: "lodestar_discovery_peers_to_connect",
help: "Current peers to connect count from discoverPeers requests",
}),
cachedENRsSize: register.gauge({
name: "lodestar_discovery_cached_enrs_size",
help: "Current size of the cachedENRs Set",
}),
findNodeQueryRequests: register.gauge<"action">({
name: "lodestar_discovery_find_node_query_requests_total",
help: "Total count of find node queries started",
labelNames: ["action"],
}),
findNodeQueryTime: register.histogram({
name: "lodestar_discovery_find_node_query_time_seconds",
help: "Time to complete a find node query in seconds in seconds",
buckets: [5, 60],
}),
findNodeQueryEnrCount: register.gauge({
name: "lodestar_discovery_find_node_query_enrs_total",
help: "Total count of found ENRs in queries",
}),
discoveredStatus: register.gauge<"status">({
name: "lodestar_discovery_discovered_status_total_count",
help: "Total count of status results of PeerDiscovery.onDiscovered() function",
labelNames: ["status"],
}),
dialAttempts: register.gauge({
name: "lodestar_discovery_total_dial_attempts",
help: "Total dial attempts by peer discovery",
}),
dialTime: register.histogram<"status">({
name: "lodestar_discovery_dial_time_seconds",
help: "Time to dial peers in seconds",
labelNames: ["status"],
buckets: [0.1, 5, 60],
}),
},

discv5: {
kadTableSize: register.gauge({
name: "lodestar_discv5_kad_table_size",
help: "Total size of the discv5 kad table",
}),
activeSessionCount: register.gauge({
name: "lodestar_discv5_active_session_count",
help: "Count of the discv5 active sessions",
}),
connectedPeerCount: register.gauge({
name: "lodestar_discv5_connected_peer_count",
help: "Count of the discv5 connected peers",
}),
sentMessageCount: register.gauge<"type">({
name: "lodestar_discv5_sent_message_count",
help: "Count of the discv5 messages sent by message type",
labelNames: ["type"],
}),
rcvdMessageCount: register.gauge<"type">({
name: "lodestar_discv5_rcvd_message_count",
help: "Count of the discv5 messages received by message type",
labelNames: ["type"],
}),
},

gossipMesh: {
peersByType: register.gauge<"type" | "fork">({
Expand Down
2 changes: 1 addition & 1 deletion packages/lodestar/src/metrics/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

export interface IMetricsOptions {
enabled: boolean;
timeout: number;
timeout?: number;
serverPort?: number;
gatewayUrl?: string;
listenAddr?: string;
Expand Down
21 changes: 16 additions & 5 deletions packages/lodestar/src/network/metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@ import {IBeaconChain} from "../chain";
import {FAR_FUTURE_EPOCH} from "../constants";
import {getCurrentAndNextFork} from "./forks";

export enum ENRKey {
tcp = "tcp",
eth2 = "eth2",
attnets = "attnets",
syncnets = "syncnets",
}
export enum SubnetType {
attnets = "attnets",
syncnets = "syncnets",
}

export interface IMetadataOpts {
metadata?: altair.Metadata;
}
Expand Down Expand Up @@ -43,13 +54,13 @@ export class MetadataController {
// updateEth2Field() MUST be called with clock epoch
this.updateEth2Field(this.chain.clock.currentEpoch);

this.enr.set("attnets", ssz.phase0.AttestationSubnets.serialize(this._metadata.attnets));
this.enr.set(ENRKey.attnets, ssz.phase0.AttestationSubnets.serialize(this._metadata.attnets));
// Any fork after altair included

if (currentFork !== ForkName.phase0) {
// Only persist syncnets if altair fork is already activated. If currentFork is altair but head is phase0
// adding syncnets to the ENR is not a problem, we will just have a useless field for a few hours.
this.enr.set("syncnets", ssz.phase0.AttestationSubnets.serialize(this._metadata.syncnets));
this.enr.set(ENRKey.syncnets, ssz.phase0.AttestationSubnets.serialize(this._metadata.syncnets));
}
}
}
Expand All @@ -64,7 +75,7 @@ export class MetadataController {

set syncnets(syncnets: BitVector) {
if (this.enr) {
this.enr.set("syncnets", ssz.altair.SyncSubnets.serialize(syncnets));
this.enr.set(ENRKey.syncnets, ssz.altair.SyncSubnets.serialize(syncnets));
}
this._metadata.syncnets = syncnets;
}
Expand All @@ -75,7 +86,7 @@ export class MetadataController {

set attnets(attnets: BitVector) {
if (this.enr) {
this.enr.set("attnets", ssz.phase0.AttestationSubnets.serialize(attnets));
this.enr.set(ENRKey.attnets, ssz.phase0.AttestationSubnets.serialize(attnets));
}
this._metadata.seqNumber++;
this._metadata.attnets = attnets;
Expand All @@ -101,7 +112,7 @@ export class MetadataController {
if (this.enr) {
const enrForkId = ssz.phase0.ENRForkID.serialize(getENRForkID(this.config, epoch));
this.logger.verbose(`Updated ENR.eth2: ${toHexString(enrForkId)}`);
this.enr.set("eth2", enrForkId);
this.enr.set(ENRKey.eth2, enrForkId);
}
}
}
Expand Down
16 changes: 9 additions & 7 deletions packages/lodestar/src/network/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {AbortSignal} from "@chainsafe/abort-controller";
import {IBeaconConfig} from "@chainsafe/lodestar-config";
import {ILogger} from "@chainsafe/lodestar-utils";
import {ATTESTATION_SUBNET_COUNT, ForkName, SYNC_COMMITTEE_SUBNET_COUNT} from "@chainsafe/lodestar-params";
import {Discv5, Discv5Discovery, ENR} from "@chainsafe/discv5";
import {Discv5, ENR} from "@chainsafe/discv5";
import {computeEpochAtSlot} from "@chainsafe/lodestar-beacon-state-transition";
import {Epoch} from "@chainsafe/lodestar-types";
import {IMetrics} from "../metrics";
Expand Down Expand Up @@ -132,9 +132,12 @@ export class Network implements INetwork {

async start(): Promise<void> {
await this.libp2p.start();
// Stop latency monitor since we handle disconnects here and don't want additional load on the event loop
this.libp2p.connectionManager._latencyMonitor.stop();

this.reqResp.start();
this.metadata.start(this.getEnr(), this.config.getForkName(this.clock.currentSlot));
this.peerManager.start();
await this.peerManager.start();
this.gossip.start();
this.attnetsService.start();
this.syncnetsService.start();
Expand All @@ -145,7 +148,7 @@ export class Network implements INetwork {
async stop(): Promise<void> {
// Must goodbye and disconnect before stopping libp2p
await this.peerManager.goodbyeAndDisconnectAllPeers();
this.peerManager.stop();
await this.peerManager.stop();
this.gossip.stop();
this.reqResp.stop();
this.attnetsService.stop();
Expand All @@ -154,8 +157,8 @@ export class Network implements INetwork {
await this.libp2p.stop();
}

get discv5(): Discv5 {
return (this.libp2p._discovery.get("discv5") as Discv5Discovery)?.discv5;
get discv5(): Discv5 | undefined {
return this.peerManager["discovery"]?.discv5;
}

get localMultiaddrs(): Multiaddr[] {
Expand All @@ -167,8 +170,7 @@ export class Network implements INetwork {
}

getEnr(): ENR | undefined {
const discv5Discovery = this.libp2p._discovery.get("discv5") as Discv5Discovery;
return discv5Discovery?.discv5?.enr ?? undefined;
return this.peerManager["discovery"]?.discv5.enr;
}

getConnectionsByPeer(): Map<string, Connection[]> {
Expand Down
Loading

0 comments on commit 08dbb21

Please sign in to comment.