Skip to content

Commit

Permalink
Subscribe to reqresp methods on forks
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Nov 26, 2022
1 parent 99cf400 commit 6796207
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 40 deletions.
4 changes: 2 additions & 2 deletions packages/beacon-node/src/network/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {INetworkEventBus} from "./events.js";
import {Eth2Gossipsub} from "./gossip/index.js";
import {MetadataController} from "./metadata.js";
import {PeerAction} from "./peers/index.js";
import {IReqRespBeaconNode} from "./reqresp/ReqRespBeaconNode.js";
import {ReqRespBeaconNode} from "./reqresp/ReqRespBeaconNode.js";
import {IAttnetsService, ISubnetsService, CommitteeSubscription} from "./subnets/index.js";

export type PeerSearchOptions = {
Expand All @@ -18,7 +18,7 @@ export type PeerSearchOptions = {

export interface INetwork {
events: INetworkEventBus;
reqResp: IReqRespBeaconNode;
reqResp: ReqRespBeaconNode;
attnetsService: IAttnetsService;
syncnetsService: ISubnetsService;
gossip: Eth2Gossipsub;
Expand Down
15 changes: 11 additions & 4 deletions packages/beacon-node/src/network/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ interface INetworkModules {

export class Network implements INetwork {
events: INetworkEventBus;
reqResp: IReqRespBeaconNode;
reqResp: ReqRespBeaconNode;
attnetsService: AttnetsService;
syncnetsService: SyncnetsService;
gossip: Eth2Gossipsub;
Expand Down Expand Up @@ -145,8 +145,13 @@ export class Network implements INetwork {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call
(this.libp2p.connectionManager as DefaultConnectionManager)["latencyMonitor"].stop();

this.reqResp.start();
this.metadata.start(this.getEnr(), this.config.getForkName(this.clock.currentSlot));
// Network spec decides version changes based on clock fork, not head fork
const forkCurrentSlot = this.config.getForkName(this.clock.currentSlot);

await this.reqResp.start();
this.reqResp.registerProtocolsAtFork(forkCurrentSlot);

this.metadata.start(this.getEnr(), forkCurrentSlot);
await this.peerManager.start();
await this.gossip.start();
this.attnetsService.start();
Expand All @@ -163,7 +168,8 @@ export class Network implements INetwork {
await this.peerManager.goodbyeAndDisconnectAllPeers();
await this.peerManager.stop();
await this.gossip.stop();
this.reqResp.stop();
await this.reqResp.stop();
await this.reqResp.unregisterAllProtocols();
this.attnetsService.stop();
this.syncnetsService.stop();
await this.libp2p.stop();
Expand Down Expand Up @@ -417,6 +423,7 @@ export class Network implements INetwork {
if (epoch === forkEpoch) {
// updateEth2Field() MUST be called with clock epoch, onEpoch event is emitted in response to clock events
this.metadata.updateEth2Field(epoch);
this.reqResp.registerProtocolsAtFork(nextFork);
}

// After fork transition
Expand Down
126 changes: 109 additions & 17 deletions packages/beacon-node/src/network/reqresp/ReqRespBeaconNode.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {PeerId} from "@libp2p/interface-peer-id";
import {Libp2p} from "libp2p";
import {IBeaconConfig} from "@lodestar/config";
import {ForkName} from "@lodestar/params";
import {ForkName, ForkSeq} from "@lodestar/params";
import {
collectExactOne,
collectMaxResponse,
Expand All @@ -15,7 +15,7 @@ import {
} from "@lodestar/reqresp";
import {ReqRespOpts} from "@lodestar/reqresp/lib/ReqResp.js";
import * as messages from "@lodestar/reqresp/messages";
import {allForks, altair, phase0, Root} from "@lodestar/types";
import {allForks, altair, eip4844, phase0, Root} from "@lodestar/types";
import {ILogger} from "@lodestar/utils";
import {IMetrics} from "../../metrics/metrics.js";
import {INetworkEventBus, NetworkEvent} from "../events.js";
Expand All @@ -36,6 +36,9 @@ export {IReqRespBeaconNode};
/** This type helps response to beacon_block_by_range and beacon_block_by_root more efficiently */
export type ReqRespBlockResponse = EncodedPayload<allForks.SignedBeaconBlock>;

// eslint-disable-next-line @typescript-eslint/no-explicit-any
type ProtocolDefinitionAny = ProtocolDefinition<any, any>;

export interface ReqRespBeaconNodeModules {
libp2p: Libp2p;
peersData: PeersData;
Expand Down Expand Up @@ -73,6 +76,9 @@ export class ReqRespBeaconNode extends ReqResp implements IReqRespBeaconNode {
private readonly networkEventBus: INetworkEventBus;
private readonly peersData: PeersData;

private readonly config: IBeaconConfig;
protected readonly logger: ILogger;

constructor(modules: ReqRespBeaconNodeModules, options: ReqRespBeaconNodeOpts = {}) {
const {reqRespHandlers, networkEventBus, peersData, peerRpcScores, metadata, logger, metrics} = modules;

Expand All @@ -81,28 +87,15 @@ export class ReqRespBeaconNode extends ReqResp implements IReqRespBeaconNode {
this.reqRespHandlers = reqRespHandlers;
this.peerRpcScores = peerRpcScores;
this.peersData = peersData;
this.config = modules.config;
this.logger = modules.logger;
this.metadataController = metadata;
this.networkEventBus = networkEventBus;
this.inboundRateLimiter = new InboundRateLimiter(options, {
logger,
reportPeer: (peerId) => peerRpcScores.applyAction(peerId, PeerAction.Fatal, "rate_limit_rpc"),
metrics,
});

// TODO: Do not register everything! Some protocols are fork dependant
this.registerProtocol(messages.Ping(this.onPing.bind(this)));
this.registerProtocol(messages.Status(modules, this.onStatus.bind(this)));
this.registerProtocol(messages.Metadata(modules, this.onMetadata.bind(this)));
this.registerProtocol(messages.MetadataV2(modules, this.onMetadata.bind(this)));
this.registerProtocol(messages.Goodbye(modules, this.onGoodbye.bind(this)));
this.registerProtocol(messages.BeaconBlocksByRange(modules, this.onBeaconBlocksByRange.bind(this)));
this.registerProtocol(messages.BeaconBlocksByRangeV2(modules, this.onBeaconBlocksByRange.bind(this)));
this.registerProtocol(messages.BeaconBlocksByRoot(modules, this.onBeaconBlocksByRoot.bind(this)));
this.registerProtocol(messages.BeaconBlocksByRootV2(modules, this.onBeaconBlocksByRoot.bind(this)));
this.registerProtocol(messages.LightClientBootstrap(modules, reqRespHandlers.onLightClientBootstrap));
this.registerProtocol(messages.LightClientFinalityUpdate(modules, reqRespHandlers.onLightClientFinalityUpdate));
this.registerProtocol(messages.LightClientOptimisticUpdate(modules, reqRespHandlers.onLightClientOptimisticUpdate));
this.registerProtocol(messages.LightClientUpdatesByRange(modules, reqRespHandlers.onLightClientUpdatesByRange));
}

async start(): Promise<void> {
Expand All @@ -119,6 +112,28 @@ export class ReqRespBeaconNode extends ReqResp implements IReqRespBeaconNode {
this.inboundRateLimiter.prune(peerId);
}

registerProtocolsAtFork(fork: ForkName): void {
const mustSubscribeProtocols = this.getProtocolsAtFork(fork);
const mustSubscribeProtocolIDs = new Set(mustSubscribeProtocols.map((protocol) => this.formatProtocolID(protocol)));

// Un-subscribe not required protocols
for (const protocolID of this.getRegisteredProtocols()) {
if (!mustSubscribeProtocolIDs.has(protocolID)) {
// Async because of writing to peerstore -_- should never throw
this.unregisterProtocol(protocolID).catch((e) => {
this.logger.error("Error on ReqResp.unregisterProtocol", {protocolID}, e);
});
}
}

// Subscribe required protocols, prevent libp2p for throwing if already registered
for (const protocol of mustSubscribeProtocols) {
this.registerProtocol(protocol, {ignoreIfDuplicate: true}).catch((e) => {
this.logger.error("Error on ReqResp.registerProtocol", {protocolID: this.formatProtocolID(protocol)}, e);
});
}
}

async status(peerId: PeerId, request: phase0.Status): Promise<phase0.Status> {
return collectExactOne(
this.sendRequest<phase0.Status, phase0.Status>(peerId, ReqRespMethod.Status, [Version.V1], request)
Expand Down Expand Up @@ -227,6 +242,83 @@ export class ReqRespBeaconNode extends ReqResp implements IReqRespBeaconNode {
);
}

async beaconBlockAndBlobsSidecarByRoot(
peerId: PeerId,
request: eip4844.BeaconBlockAndBlobsSidecarByRootRequest
): Promise<eip4844.SignedBeaconBlockAndBlobsSidecar[]> {
return collectMaxResponse(
this.sendRequest<eip4844.BeaconBlockAndBlobsSidecarByRootRequest, eip4844.SignedBeaconBlockAndBlobsSidecar>(
peerId,
ReqRespMethod.BeaconBlockAndBlobsSidecarByRoot,
[Version.V1],
request
),
request.length
);
}

async blobsSidecarsByRange(
peerId: PeerId,
request: eip4844.BlobsSidecarsByRangeRequest
): Promise<eip4844.BlobsSidecar[]> {
return collectMaxResponse(
this.sendRequest<eip4844.BlobsSidecarsByRangeRequest, eip4844.BlobsSidecar>(
peerId,
ReqRespMethod.BlobsSidecarsByRange,
[Version.V1],
request
),
request.count
);
}

/**
* Returns the list of protocols that must be subscribed during a specific fork.
* Any protocol not in this list must be un-subscribed.
*/
private getProtocolsAtFork(fork: ForkName): ProtocolDefinitionAny[] {
const modules = {config: this.config};
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const protocols: ProtocolDefinition<any, any>[] = [
messages.Ping(this.onPing.bind(this)),
messages.Status(modules, this.onStatus.bind(this)),
messages.Goodbye(modules, this.onGoodbye.bind(this)),
// Support V2 methods as soon as implemented (for altair)
// Ref https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/altair/p2p-interface.md#transitioning-from-v1-to-v2
messages.MetadataV2(modules, this.onMetadata.bind(this)),
messages.BeaconBlocksByRangeV2(modules, this.onBeaconBlocksByRange.bind(this)),
messages.BeaconBlocksByRootV2(modules, this.onBeaconBlocksByRoot.bind(this)),
];

if (ForkSeq[fork] < ForkSeq.altair) {
// Unregister V1 topics at the fork boundary, so only declare for pre-altair
protocols.push(
messages.Metadata(modules, this.onMetadata.bind(this)),
messages.BeaconBlocksByRange(modules, this.onBeaconBlocksByRange.bind(this)),
messages.BeaconBlocksByRoot(modules, this.onBeaconBlocksByRoot.bind(this))
);
}

if (ForkSeq[fork] >= ForkSeq.altair) {
// Should be okay to enable before altair, but for consistency only enable afterwards
protocols.push(
messages.LightClientBootstrap(modules, this.reqRespHandlers.onLightClientBootstrap),
messages.LightClientFinalityUpdate(modules, this.reqRespHandlers.onLightClientFinalityUpdate),
messages.LightClientOptimisticUpdate(modules, this.reqRespHandlers.onLightClientOptimisticUpdate),
messages.LightClientUpdatesByRange(modules, this.reqRespHandlers.onLightClientUpdatesByRange)
);
}

if (ForkSeq[fork] >= ForkSeq.eip4844) {
protocols.push(
messages.BeaconBlockAndBlobsSidecarByRoot(modules, this.reqRespHandlers.onBeaconBlockAndBlobsSidecarByRoot),
messages.BlobsSidecarsByRange(modules, this.reqRespHandlers.onBlobsSidecarsByRange)
);
}

return protocols;
}

protected sendRequest<Req, Resp>(peerId: PeerId, method: string, versions: number[], body: Req): AsyncIterable<Resp> {
// Remember prefered encoding
const encoding = this.peersData.getEncodingPreference(peerId.toString()) ?? Encoding.SSZ_SNAPPY;
Expand Down
72 changes: 55 additions & 17 deletions packages/reqresp/src/ReqResp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ export interface ReqRespOpts extends SendRequestOpts {
getPeerLogMetadata?: (peerId: string) => string;
}

export interface ReqRespRegisterOpts {
ignoreIfDuplicate?: boolean;
}

/**
* Implementation of Ethereum Consensus p2p Req/Resp domain.
* For the spec that this code is based on, see:
Expand All @@ -33,15 +37,15 @@ export interface ReqRespOpts extends SendRequestOpts {
*/
export class ReqResp {
private readonly libp2p: Libp2p;
private readonly logger: ILogger;
protected readonly logger: ILogger;
private readonly metrics: Metrics | null;
private controller = new AbortController();
/** Tracks request and responses in a sequential counter */
private reqCount = 0;
private readonly protocolPrefix: string;

/** `${protocolPrefix}/${method}/${version}/${encoding}` */
private readonly supportedProtocols = new Map<ProtocolID, ProtocolDefinition>();
private readonly registeredProtocols = new Map<ProtocolID, ProtocolDefinition>();

constructor(modules: ReqRespProtocolModules, private readonly opts: ReqRespOpts = {}) {
this.libp2p = modules.libp2p;
Expand All @@ -50,27 +54,61 @@ export class ReqResp {
this.protocolPrefix = opts.protocolPrefix ?? DEFAULT_PROTOCOL_PREFIX;
}

registerProtocol<Req, Resp>(protocol: ProtocolDefinition<Req, Resp>): void {
const {method, version, encoding} = protocol;
const protocolID = this.formatProtocolID(method, version, encoding);
this.supportedProtocols.set(protocolID, protocol as ProtocolDefinition);
/**
* Register protocol as supported and to libp2p.
* async because libp2p registar persists the new protocol list in the peer-store.
* Throws if the same protocol is registered twice.
* Can be called at any time, no concept of started / stopped
*/
async registerProtocol<Req, Resp>(
protocol: ProtocolDefinition<Req, Resp>,
opts?: ReqRespRegisterOpts
): Promise<void> {
const protocolID = this.formatProtocolID(protocol);

// libp2p will throw on error on duplicates, allow to overwrite behaviour
if (opts?.ignoreIfDuplicate && this.registeredProtocols.has(protocolID)) {
return;
}

this.registeredProtocols.set(protocolID, protocol as ProtocolDefinition);

return this.libp2p.handle(protocolID, this.getRequestHandler(protocol));
}

/**
* Remove protocol as supported and from libp2p.
* async because libp2p registar persists the new protocol list in the peer-store.
* Does NOT throw if the protocolID is unknown.
* Can be called at any time, no concept of started / stopped
*/
async unregisterProtocol(protocolID: ProtocolID): Promise<void> {
this.registeredProtocols.delete(protocolID);

return this.libp2p.unhandle(protocolID);
}

/**
* Remove all registered protocols from libp2p
*/
async unregisterAllProtocols(): Promise<void> {
for (const protocolID of this.registeredProtocols.keys()) {
await this.unregisterProtocol(protocolID);
}
}

getRegisteredProtocols(): ProtocolID[] {
return Array.from(this.registeredProtocols.values()).map((protocol) => this.formatProtocolID(protocol));
}

async start(): Promise<void> {
this.controller = new AbortController();
// We set infinity to prevent MaxListenersExceededWarning which get logged when listeners > 10
// Since it is perfectly fine to have listeners > 10
setMaxListeners(Infinity, this.controller.signal);

for (const [protocolID, protocol] of this.supportedProtocols) {
await this.libp2p.handle(protocolID, this.getRequestHandler(protocol));
}
}

async stop(): Promise<void> {
for (const protocolID of this.supportedProtocols.keys()) {
await this.libp2p.unhandle(protocolID);
}
this.controller.abort();
}

Expand All @@ -90,8 +128,8 @@ export class ReqResp {
const protocolIDs: string[] = [];

for (const version of versions) {
const protocolID = this.formatProtocolID(method, version, encoding);
const protocol = this.supportedProtocols.get(protocolID);
const protocolID = this.formatProtocolID({method, version, encoding});
const protocol = this.registeredProtocols.get(protocolID);
if (!protocol) {
throw Error(`Request to send to protocol ${protocolID} but it has not been declared`);
}
Expand Down Expand Up @@ -175,7 +213,7 @@ export class ReqResp {
* ```
* https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/phase0/p2p-interface.md#protocol-identification
*/
protected formatProtocolID(method: string, version: number, encoding: Encoding): string {
return formatProtocolID(this.protocolPrefix, method, version, encoding);
protected formatProtocolID(protocol: Pick<ProtocolDefinition, "method" | "version" | "encoding">): string {
return formatProtocolID(this.protocolPrefix, protocol.method, protocol.version, protocol.encoding);
}
}
25 changes: 25 additions & 0 deletions packages/reqresp/src/messages/BeaconBlockAndBlobsSidecarByRoot.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import {eip4844, ssz} from "@lodestar/types";
import {toHex} from "@lodestar/utils";
import {ContextBytesType, Encoding, ProtocolDefinitionGenerator} from "../types.js";

// eslint-disable-next-line @typescript-eslint/naming-convention
export const BeaconBlockAndBlobsSidecarByRoot: ProtocolDefinitionGenerator<
eip4844.BeaconBlockAndBlobsSidecarByRootRequest,
eip4844.SignedBeaconBlockAndBlobsSidecar
> = (modules, handler) => {
return {
method: "beacon_block_and_blobs_sidecar_by_root",
version: 1,
encoding: Encoding.SSZ_SNAPPY,
handler,
requestType: () => ssz.eip4844.BeaconBlockAndBlobsSidecarByRootRequest,
// TODO: Make it fork compliant
responseType: () => ssz.eip4844.SignedBeaconBlockAndBlobsSidecar,
renderRequestBody: (req) => req.map((root) => toHex(root)).join(","),
contextBytes: {
type: ContextBytesType.ForkDigest,
forkDigestContext: modules.config,
forkFromResponse: ({beaconBlock}) => modules.config.getForkName(beaconBlock.message.slot),
},
};
};
Loading

0 comments on commit 6796207

Please sign in to comment.