Skip to content

Commit

Permalink
EIP-4844: Dynamic topic lists for ReqResp and Gossip (#4848)
Browse files Browse the repository at this point in the history
* Dynamic topic lists for ReqResp and Gossip

* Schedule ALTAIR_FORK_EPOCH for ReqResp e2e tests

* Fix e2e test fork aware

* Fix unit tests

Co-authored-by: Tuyen Nguyen <vutuyen2636@gmail.com>
  • Loading branch information
dapplion and twoeths committed Dec 7, 2022
1 parent b7c7910 commit eb61572
Show file tree
Hide file tree
Showing 21 changed files with 341 additions and 498 deletions.
96 changes: 56 additions & 40 deletions packages/beacon-node/src/network/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import {ChainEvent, IBeaconChain, IBeaconClock} from "../chain/index.js";
import {BlockInput, BlockInputType, getBlockInput} from "../chain/blocks/types.js";
import {INetworkOptions} from "./options.js";
import {INetwork} from "./interface.js";
import {IReqRespBeaconNode, ReqRespBeaconNode, ReqRespHandlers} from "./reqresp/ReqRespBeaconNode.js";
import {Eth2Gossipsub, getGossipHandlers, GossipHandlers, GossipType} from "./gossip/index.js";
import {ReqRespBeaconNode, ReqRespHandlers} from "./reqresp/ReqRespBeaconNode.js";
import {Eth2Gossipsub, getGossipHandlers, GossipHandlers, GossipTopicTypeMap, GossipType} from "./gossip/index.js";
import {MetadataController} from "./metadata.js";
import {FORK_EPOCH_LOOKAHEAD, getActiveForks} from "./forks.js";
import {PeerManager} from "./peers/peerManager.js";
Expand All @@ -39,7 +39,7 @@ interface INetworkModules {

export class Network implements INetwork {
events: INetworkEventBus;
reqResp: IReqRespBeaconNode;
reqResp: ReqRespBeaconNode;
attnetsService: AttnetsService;
syncnetsService: SyncnetsService;
gossip: Eth2Gossipsub;
Expand Down Expand Up @@ -144,8 +144,16 @@ export class Network implements INetwork {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call
(this.libp2p.connectionManager as DefaultConnectionManager)["latencyMonitor"].stop();

this.reqResp.start();
this.metadata.start(this.getEnr(), this.config.getForkName(this.clock.currentSlot));
// Network spec decides version changes based on clock fork, not head fork
const forkCurrentSlot = this.config.getForkName(this.clock.currentSlot);

// Register only ReqResp protocols relevant to clock's fork
await this.reqResp.start();
this.reqResp.registerProtocolsAtFork(forkCurrentSlot);

// Initialize ENR with clock's fork
this.metadata.start(this.getEnr(), forkCurrentSlot);

await this.peerManager.start();
await this.gossip.start();
this.attnetsService.start();
Expand All @@ -162,7 +170,10 @@ export class Network implements INetwork {
await this.peerManager.goodbyeAndDisconnectAllPeers();
await this.peerManager.stop();
await this.gossip.stop();
this.reqResp.stop();

await this.reqResp.stop();
await this.reqResp.unregisterAllProtocols();

this.attnetsService.stop();
this.syncnetsService.stop();
await this.libp2p.stop();
Expand Down Expand Up @@ -349,9 +360,13 @@ export class Network implements INetwork {

// Before fork transition
if (epoch === forkEpoch - FORK_EPOCH_LOOKAHEAD) {
this.logger.info("Subscribing gossip topics to next fork", {nextFork});
// Don't subscribe to new fork if the node is not subscribed to any topic
if (this.isSubscribedToGossipCoreTopics()) this.subscribeCoreTopicsAtFork(nextFork);
if (this.isSubscribedToGossipCoreTopics()) {
this.subscribeCoreTopicsAtFork(nextFork);
this.logger.info("Subscribing gossip topics before fork", {nextFork});
} else {
this.logger.info("Skipping subscribing gossip topics before fork", {nextFork});
}
this.attnetsService.subscribeSubnetsToNextFork(nextFork);
this.syncnetsService.subscribeSubnetsToNextFork(nextFork);
}
Expand All @@ -360,6 +375,7 @@ export class Network implements INetwork {
if (epoch === forkEpoch) {
// updateEth2Field() MUST be called with clock epoch, onEpoch event is emitted in response to clock events
this.metadata.updateEth2Field(epoch);
this.reqResp.registerProtocolsAtFork(nextFork);
}

// After fork transition
Expand All @@ -380,53 +396,53 @@ export class Network implements INetwork {
if (this.subscribedForks.has(fork)) return;
this.subscribedForks.add(fork);

this.gossip.subscribeTopic({type: GossipType.beacon_block, fork});
this.gossip.subscribeTopic({type: GossipType.beacon_aggregate_and_proof, fork});
this.gossip.subscribeTopic({type: GossipType.voluntary_exit, fork});
this.gossip.subscribeTopic({type: GossipType.proposer_slashing, fork});
this.gossip.subscribeTopic({type: GossipType.attester_slashing, fork});
// Any fork after altair included
if (fork !== ForkName.phase0) {
this.gossip.subscribeTopic({type: GossipType.sync_committee_contribution_and_proof, fork});
this.gossip.subscribeTopic({type: GossipType.light_client_optimistic_update, fork});
this.gossip.subscribeTopic({type: GossipType.light_client_finality_update, fork});
}

if (this.opts.subscribeAllSubnets) {
for (let subnet = 0; subnet < ATTESTATION_SUBNET_COUNT; subnet++) {
this.gossip.subscribeTopic({type: GossipType.beacon_attestation, fork, subnet});
}
for (let subnet = 0; subnet < SYNC_COMMITTEE_SUBNET_COUNT; subnet++) {
this.gossip.subscribeTopic({type: GossipType.sync_committee, fork, subnet});
}
for (const topic of this.coreTopicsAtFork(fork)) {
this.gossip.subscribeTopic({...topic, fork});
}
};

private unsubscribeCoreTopicsAtFork = (fork: ForkName): void => {
if (!this.subscribedForks.has(fork)) return;
this.subscribedForks.delete(fork);

this.gossip.unsubscribeTopic({type: GossipType.beacon_block, fork});
this.gossip.unsubscribeTopic({type: GossipType.beacon_aggregate_and_proof, fork});
this.gossip.unsubscribeTopic({type: GossipType.voluntary_exit, fork});
this.gossip.unsubscribeTopic({type: GossipType.proposer_slashing, fork});
this.gossip.unsubscribeTopic({type: GossipType.attester_slashing, fork});
for (const topic of this.coreTopicsAtFork(fork)) {
this.gossip.unsubscribeTopic({...topic, fork});
}
};

/**
* De-duplicate logic to pick fork topics between subscribeCoreTopicsAtFork and unsubscribeCoreTopicsAtFork
*/
private coreTopicsAtFork(fork: ForkName): GossipTopicTypeMap[keyof GossipTopicTypeMap][] {
// Common topics for all forks
const topics: GossipTopicTypeMap[keyof GossipTopicTypeMap][] = [
{type: GossipType.beacon_block},
{type: GossipType.beacon_aggregate_and_proof},
{type: GossipType.voluntary_exit},
{type: GossipType.proposer_slashing},
{type: GossipType.attester_slashing},
];

// Any fork after altair included
if (fork !== ForkName.phase0) {
this.gossip.unsubscribeTopic({type: GossipType.sync_committee_contribution_and_proof, fork});
this.gossip.unsubscribeTopic({type: GossipType.light_client_optimistic_update, fork});
this.gossip.unsubscribeTopic({type: GossipType.light_client_finality_update, fork});
if (ForkSeq[fork] >= ForkSeq.altair) {
topics.push({type: GossipType.sync_committee_contribution_and_proof});
topics.push({type: GossipType.light_client_optimistic_update});
topics.push({type: GossipType.light_client_finality_update});
}

if (this.opts.subscribeAllSubnets) {
for (let subnet = 0; subnet < ATTESTATION_SUBNET_COUNT; subnet++) {
this.gossip.unsubscribeTopic({type: GossipType.beacon_attestation, fork, subnet});
topics.push({type: GossipType.beacon_attestation, subnet});
}
for (let subnet = 0; subnet < SYNC_COMMITTEE_SUBNET_COUNT; subnet++) {
this.gossip.unsubscribeTopic({type: GossipType.sync_committee, fork, subnet});
if (ForkSeq[fork] >= ForkSeq.altair) {
for (let subnet = 0; subnet < SYNC_COMMITTEE_SUBNET_COUNT; subnet++) {
topics.push({type: GossipType.sync_committee, subnet});
}
}
}
};

return topics;
}

private onLightClientFinalityUpdate = async (finalityUpdate: altair.LightClientFinalityUpdate): Promise<void> => {
if (this.hasAttachedSyncCommitteeMember()) {
Expand Down
112 changes: 90 additions & 22 deletions packages/beacon-node/src/network/reqresp/ReqRespBeaconNode.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {PeerId} from "@libp2p/interface-peer-id";
import {Libp2p} from "libp2p";
import {IBeaconConfig} from "@lodestar/config";
import {ForkName} from "@lodestar/params";
import {ForkName, ForkSeq} from "@lodestar/params";
import {
collectExactOne,
collectMaxResponse,
Expand Down Expand Up @@ -36,6 +36,9 @@ export {IReqRespBeaconNode};
/** This type helps response to beacon_block_by_range and beacon_block_by_root more efficiently */
export type ReqRespBlockResponse = EncodedPayload<allForks.SignedBeaconBlock>;

// eslint-disable-next-line @typescript-eslint/no-explicit-any
type ProtocolDefinitionAny = ProtocolDefinition<any, any>;

export interface ReqRespBeaconNodeModules {
libp2p: Libp2p;
peersData: PeersData;
Expand Down Expand Up @@ -73,6 +76,12 @@ export class ReqRespBeaconNode extends ReqResp implements IReqRespBeaconNode {
private readonly networkEventBus: INetworkEventBus;
private readonly peersData: PeersData;

/** Track registered fork to only send to known protocols */
private currentRegisteredFork: ForkSeq = ForkSeq.phase0;

private readonly config: IBeaconConfig;
protected readonly logger: ILogger;

constructor(modules: ReqRespBeaconNodeModules, options: ReqRespBeaconNodeOpts = {}) {
const {reqRespHandlers, networkEventBus, peersData, peerRpcScores, metadata, logger, metrics} = modules;

Expand All @@ -81,28 +90,15 @@ export class ReqRespBeaconNode extends ReqResp implements IReqRespBeaconNode {
this.reqRespHandlers = reqRespHandlers;
this.peerRpcScores = peerRpcScores;
this.peersData = peersData;
this.config = modules.config;
this.logger = modules.logger;
this.metadataController = metadata;
this.networkEventBus = networkEventBus;
this.inboundRateLimiter = new InboundRateLimiter(options, {
logger,
reportPeer: (peerId) => peerRpcScores.applyAction(peerId, PeerAction.Fatal, "rate_limit_rpc"),
metrics,
});

// TODO: Do not register everything! Some protocols are fork dependant
this.registerProtocol(messages.Ping(this.onPing.bind(this)));
this.registerProtocol(messages.Status(modules, this.onStatus.bind(this)));
this.registerProtocol(messages.Metadata(modules, this.onMetadata.bind(this)));
this.registerProtocol(messages.MetadataV2(modules, this.onMetadata.bind(this)));
this.registerProtocol(messages.Goodbye(modules, this.onGoodbye.bind(this)));
this.registerProtocol(messages.BeaconBlocksByRange(modules, this.onBeaconBlocksByRange.bind(this)));
this.registerProtocol(messages.BeaconBlocksByRangeV2(modules, this.onBeaconBlocksByRange.bind(this)));
this.registerProtocol(messages.BeaconBlocksByRoot(modules, this.onBeaconBlocksByRoot.bind(this)));
this.registerProtocol(messages.BeaconBlocksByRootV2(modules, this.onBeaconBlocksByRoot.bind(this)));
this.registerProtocol(messages.LightClientBootstrap(modules, reqRespHandlers.onLightClientBootstrap));
this.registerProtocol(messages.LightClientFinalityUpdate(modules, reqRespHandlers.onLightClientFinalityUpdate));
this.registerProtocol(messages.LightClientOptimisticUpdate(modules, reqRespHandlers.onLightClientOptimisticUpdate));
this.registerProtocol(messages.LightClientUpdatesByRange(modules, reqRespHandlers.onLightClientUpdatesByRange));
}

async start(): Promise<void> {
Expand All @@ -119,6 +115,30 @@ export class ReqRespBeaconNode extends ReqResp implements IReqRespBeaconNode {
this.inboundRateLimiter.prune(peerId);
}

registerProtocolsAtFork(fork: ForkName): void {
this.currentRegisteredFork = ForkSeq[fork];

const mustSubscribeProtocols = this.getProtocolsAtFork(fork);
const mustSubscribeProtocolIDs = new Set(mustSubscribeProtocols.map((protocol) => this.formatProtocolID(protocol)));

// Un-subscribe not required protocols
for (const protocolID of this.getRegisteredProtocols()) {
if (!mustSubscribeProtocolIDs.has(protocolID)) {
// Async because of writing to peerstore -_- should never throw
this.unregisterProtocol(protocolID).catch((e) => {
this.logger.error("Error on ReqResp.unregisterProtocol", {protocolID}, e);
});
}
}

// Subscribe required protocols, prevent libp2p for throwing if already registered
for (const protocol of mustSubscribeProtocols) {
this.registerProtocol(protocol, {ignoreIfDuplicate: true}).catch((e) => {
this.logger.error("Error on ReqResp.registerProtocol", {protocolID: this.formatProtocolID(protocol)}, e);
});
}
}

async status(peerId: PeerId, request: phase0.Status): Promise<phase0.Status> {
return collectExactOne(
this.sendRequest<phase0.Status, phase0.Status>(peerId, ReqRespMethod.Status, [Version.V1], request)
Expand All @@ -143,10 +163,16 @@ export class ReqRespBeaconNode extends ReqResp implements IReqRespBeaconNode {
);
}

async metadata(peerId: PeerId, fork?: ForkName): Promise<allForks.Metadata> {
// Only request V1 if forcing phase0 fork. It's safe to not specify `fork` and let stream negotiation pick the version
const versions = fork === ForkName.phase0 ? [Version.V1] : [Version.V2, Version.V1];
return collectExactOne(this.sendRequest<null, allForks.Metadata>(peerId, ReqRespMethod.Metadata, versions, null));
async metadata(peerId: PeerId): Promise<allForks.Metadata> {
return collectExactOne(
this.sendRequest<null, allForks.Metadata>(
peerId,
ReqRespMethod.Metadata,
// Before altair, prioritize V2. After altair only request V2
this.currentRegisteredFork >= ForkSeq.altair ? [Version.V2] : [(Version.V2, Version.V1)],
null
)
);
}

async beaconBlocksByRange(
Expand All @@ -157,7 +183,8 @@ export class ReqRespBeaconNode extends ReqResp implements IReqRespBeaconNode {
this.sendRequest<phase0.BeaconBlocksByRangeRequest, allForks.SignedBeaconBlock>(
peerId,
ReqRespMethod.BeaconBlocksByRange,
[Version.V2, Version.V1], // Prioritize V2
// Before altair, prioritize V2. After altair only request V2
this.currentRegisteredFork >= ForkSeq.altair ? [Version.V2] : [(Version.V2, Version.V1)],
request
),
request
Expand All @@ -172,7 +199,8 @@ export class ReqRespBeaconNode extends ReqResp implements IReqRespBeaconNode {
this.sendRequest<phase0.BeaconBlocksByRootRequest, allForks.SignedBeaconBlock>(
peerId,
ReqRespMethod.BeaconBlocksByRoot,
[Version.V2, Version.V1], // Prioritize V2
// Before altair, prioritize V2. After altair only request V2
this.currentRegisteredFork >= ForkSeq.altair ? [Version.V2] : [(Version.V2, Version.V1)],
request
),
request.length
Expand Down Expand Up @@ -242,6 +270,46 @@ export class ReqRespBeaconNode extends ReqResp implements IReqRespBeaconNode {
);
}

/**
* Returns the list of protocols that must be subscribed during a specific fork.
* Any protocol not in this list must be un-subscribed.
*/
private getProtocolsAtFork(fork: ForkName): ProtocolDefinitionAny[] {
const modules = {config: this.config};
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const protocols: ProtocolDefinition<any, any>[] = [
messages.Ping(this.onPing.bind(this)),
messages.Status(modules, this.onStatus.bind(this)),
messages.Goodbye(modules, this.onGoodbye.bind(this)),
// Support V2 methods as soon as implemented (for altair)
// Ref https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/altair/p2p-interface.md#transitioning-from-v1-to-v2
messages.MetadataV2(modules, this.onMetadata.bind(this)),
messages.BeaconBlocksByRangeV2(modules, this.onBeaconBlocksByRange.bind(this)),
messages.BeaconBlocksByRootV2(modules, this.onBeaconBlocksByRoot.bind(this)),
];

if (ForkSeq[fork] < ForkSeq.altair) {
// Unregister V1 topics at the fork boundary, so only declare for pre-altair
protocols.push(
messages.Metadata(modules, this.onMetadata.bind(this)),
messages.BeaconBlocksByRange(modules, this.onBeaconBlocksByRange.bind(this)),
messages.BeaconBlocksByRoot(modules, this.onBeaconBlocksByRoot.bind(this))
);
}

if (ForkSeq[fork] >= ForkSeq.altair) {
// Should be okay to enable before altair, but for consistency only enable afterwards
protocols.push(
messages.LightClientBootstrap(modules, this.reqRespHandlers.onLightClientBootstrap),
messages.LightClientFinalityUpdate(modules, this.reqRespHandlers.onLightClientFinalityUpdate),
messages.LightClientOptimisticUpdate(modules, this.reqRespHandlers.onLightClientOptimisticUpdate),
messages.LightClientUpdatesByRange(modules, this.reqRespHandlers.onLightClientUpdatesByRange)
);
}

return protocols;
}

protected sendRequest<Req, Resp>(peerId: PeerId, method: string, versions: number[], body: Req): AsyncIterable<Resp> {
// Remember prefered encoding
const encoding = this.peersData.getEncodingPreference(peerId.toString()) ?? Encoding.SSZ_SNAPPY;
Expand Down
3 changes: 1 addition & 2 deletions packages/beacon-node/src/network/reqresp/interface.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import {PeerId} from "@libp2p/interface-peer-id";
import {ForkName} from "@lodestar/params";
import {allForks, altair, eip4844, phase0} from "@lodestar/types";

export interface IReqRespBeaconNode {
Expand All @@ -8,7 +7,7 @@ export interface IReqRespBeaconNode {
status(peerId: PeerId, request: phase0.Status): Promise<phase0.Status>;
goodbye(peerId: PeerId, request: phase0.Goodbye): Promise<void>;
ping(peerId: PeerId): Promise<phase0.Ping>;
metadata(peerId: PeerId, fork?: ForkName): Promise<allForks.Metadata>;
metadata(peerId: PeerId): Promise<allForks.Metadata>;
beaconBlocksByRange(
peerId: PeerId,
request: phase0.BeaconBlocksByRangeRequest
Expand Down
Loading

0 comments on commit eb61572

Please sign in to comment.