Skip to content

Commit

Permalink
feat: subscribe to 2 long lived subnets per node (#5704)
Browse files Browse the repository at this point in the history
* feat: implement computeSubscribedSubnet()

* feat: implement DLLAttnetsService

* fix: deterministicLongLivedAttnets feature flag

* fix: optional nodeId

* chore: add subnet to word list
  • Loading branch information
twoeths committed Jul 11, 2023
1 parent 91e2e65 commit 0640e06
Show file tree
Hide file tree
Showing 15 changed files with 682 additions and 22 deletions.
1 change: 1 addition & 0 deletions .wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ runtime
sharding
ssz
stakers
subnet
subnets
tcp
testnet
Expand Down
4 changes: 4 additions & 0 deletions packages/beacon-node/src/network/core/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ export function createNetworkCoreMetrics(register: RegistryMetricCreator) {
name: "lodestar_attnets_service_random_subscriptions_total",
help: "Count of random subscriptions",
}),
longLivedSubscriptions: register.gauge({
name: "lodestar_attnets_service_long_lived_subscriptions_total",
help: "Count of long lived subscriptions",
}),
subscribeSubnets: register.gauge<"subnet" | "src">({
name: "lodestar_attnets_service_subscribe_subnets_total",
help: "Count of subscribe_subnets calls",
Expand Down
29 changes: 18 additions & 11 deletions packages/beacon-node/src/network/core/networkCore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import {LoggerNode} from "@lodestar/logger/node";
import {Epoch, phase0} from "@lodestar/types";
import {ForkName} from "@lodestar/params";
import {ResponseIncoming} from "@lodestar/reqresp";
import {fromHexString} from "@chainsafe/ssz";
import {ENR} from "@chainsafe/discv5";
import {Libp2p} from "../interface.js";
import {PeerManager} from "../peers/peerManager.js";
import {ReqRespBeaconNode} from "../reqresp/ReqRespBeaconNode.js";
Expand All @@ -18,7 +20,7 @@ import {AttnetsService} from "../subnets/attnetsService.js";
import {SyncnetsService} from "../subnets/syncnetsService.js";
import {FORK_EPOCH_LOOKAHEAD, getActiveForks} from "../forks.js";
import {NetworkOptions} from "../options.js";
import {CommitteeSubscription} from "../subnets/interface.js";
import {CommitteeSubscription, IAttnetsService} from "../subnets/interface.js";
import {MetadataController} from "../metadata.js";
import {createNodeJsLibp2p} from "../nodejs/util.js";
import {PeersData} from "../peers/peersData.js";
Expand All @@ -31,14 +33,15 @@ import {Discv5Worker} from "../discv5/index.js";
import {LocalStatusCache} from "../statusCache.js";
import {RegistryMetricCreator} from "../../metrics/index.js";
import {peerIdFromString, peerIdToString} from "../../util/peerId.js";
import {DLLAttnetsService} from "../subnets/dllAttnetsService.js";
import {NetworkCoreMetrics, createNetworkCoreMetrics} from "./metrics.js";
import {INetworkCore, MultiaddrStr, PeerIdStr} from "./types.js";

type Mods = {
libp2p: Libp2p;
gossip: Eth2Gossipsub;
reqResp: ReqRespBeaconNode;
attnetsService: AttnetsService;
attnetsService: IAttnetsService;
syncnetsService: SyncnetsService;
peerManager: PeerManager;
peersData: PeersData;
Expand Down Expand Up @@ -84,7 +87,7 @@ export type BaseNetworkInit = {
export class NetworkCore implements INetworkCore {
// Internal modules
private readonly libp2p: Libp2p;
private readonly attnetsService: AttnetsService;
private readonly attnetsService: IAttnetsService;
private readonly syncnetsService: SyncnetsService;
private readonly peerManager: PeerManager;
private readonly peersData: PeersData;
Expand Down Expand Up @@ -185,7 +188,18 @@ export class NetworkCore implements INetworkCore {
events,
});

const attnetsService = new AttnetsService(config, clock, gossip, metadata, logger, metrics, opts);
// Note: should not be necessary, already called in createNodeJsLibp2p()
await libp2p.start();

await reqResp.start();
// should be called before DLLAttnetsService constructor so that node subscribe to deterministic attnet topics
await gossip.start();

const enr = opts.discv5?.enr;
const nodeId = enr ? fromHexString(ENR.decodeTxt(enr).nodeId) : null;
const attnetsService = opts.deterministicLongLivedAttnets
? new DLLAttnetsService(config, clock, gossip, metadata, logger, metrics, nodeId, opts)
: new AttnetsService(config, clock, gossip, metadata, logger, metrics, opts);
const syncnetsService = new SyncnetsService(config, clock, gossip, metadata, logger, metrics, opts);

const peerManager = await PeerManager.init(
Expand All @@ -207,13 +221,6 @@ export class NetworkCore implements INetworkCore {
opts
);

// Note: should not be necessary, already called in createNodeJsLibp2p()
await libp2p.start();

await reqResp.start();

await gossip.start();

// Network spec decides version changes based on clock fork, not head fork
const forkCurrentSlot = config.getForkName(clock.currentSlot);
// Register only ReqResp protocols relevant to clock's fork
Expand Down
2 changes: 2 additions & 0 deletions packages/beacon-node/src/network/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {Eth2GossipsubOpts} from "./gossip/gossipsub.js";
import {PeerManagerOpts, PeerRpcScoreOpts} from "./peers/index.js";
import {ReqRespBeaconNodeOpts} from "./reqresp/ReqRespBeaconNode.js";
import {NetworkProcessorOpts} from "./processor/index.js";
import {SubnetsServiceOpts} from "./subnets/interface.js";

// Since Network is eventually intended to be run in a separate thread, ensure that all options are cloneable using structuredClone
export interface NetworkOptions
Expand All @@ -10,6 +11,7 @@ export interface NetworkOptions
Omit<ReqRespBeaconNodeOpts, "getPeerLogMetadata" | "onRateLimit">,
NetworkProcessorOpts,
PeerRpcScoreOpts,
SubnetsServiceOpts,
Eth2GossipsubOpts {
localMultiaddrs: string[];
bootMultiaddrs?: string[];
Expand Down
4 changes: 0 additions & 4 deletions packages/beacon-node/src/network/peers/utils/subnetMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ export class SubnetMap {
return toSlot !== undefined && toSlot >= slot; // ACTIVE: >=
}

activeUpToSlot(subnet: number): Slot | null {
return this.subnets.get(subnet) ?? null;
}

/** Return subnetIds with a `toSlot` equal greater than `currentSlot` */
getActive(currentSlot: Slot): number[] {
const subnetIds: number[] = [];
Expand Down
8 changes: 2 additions & 6 deletions packages/beacon-node/src/network/subnets/attnetsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
RandBetweenFn,
ShuffleFn,
GossipSubscriber,
SubnetsServiceTestOpts,
} from "./interface.js";

/**
Expand Down Expand Up @@ -78,7 +79,7 @@ export class AttnetsService implements IAttnetsService {
private readonly metadata: MetadataController,
private readonly logger: Logger,
private readonly metrics: NetworkCoreMetrics | null,
private readonly opts?: SubnetsServiceOpts
private readonly opts?: SubnetsServiceOpts & SubnetsServiceTestOpts
) {
// if subscribeAllSubnets, we act like we have >= ATTESTATION_SUBNET_COUNT validators connecting to this node
// so that we have enough subnet topic peers, see https://github.com/ChainSafe/lodestar/issues/4921
Expand Down Expand Up @@ -158,11 +159,6 @@ export class AttnetsService implements IAttnetsService {
return this.aggregatorSlotSubnet.getOrDefault(slot).has(subnet);
}

/** Returns the latest Slot subscription is active, null if no subscription */
activeUpToSlot(subnet: number): Slot | null {
return this.subscriptionsCommittee.activeUpToSlot(subnet);
}

/** Call ONLY ONCE: Two epoch before the fork, re-subscribe all existing random subscriptions to the new fork */
subscribeSubnetsToNextFork(nextFork: ForkName): void {
this.logger.info("Suscribing to random attnets to next fork", {nextFork});
Expand Down

0 comments on commit 0640e06

Please sign in to comment.