Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(network)!: use StatusCache in PeerManager #5451

Merged
merged 1 commit into from May 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 15 additions & 1 deletion packages/beacon-node/src/network/network.ts
Expand Up @@ -40,6 +40,7 @@ import {createNodeJsLibp2p} from "./nodejs/util.js";
import {NetworkProcessor} from "./processor/index.js";
import {PendingGossipsubMessage} from "./processor/types.js";
import {createNetworkCoreMetrics} from "./core/metrics.js";
import {LocalStatusCache} from "./statusCache.js";

// How many changes to batch cleanup
const CACHED_BLS_BATCH_CLEANUP_LIMIT = 10;
Expand All @@ -61,6 +62,7 @@ type NetworkModules = {
attnetsService: AttnetsService;
syncnetsService: SyncnetsService;
peerManager: PeerManager;
statusCache: LocalStatusCache;
};

export type NetworkInitModules = {
Expand Down Expand Up @@ -90,6 +92,7 @@ export class Network implements INetwork {

private readonly networkProcessor: NetworkProcessor;
private readonly peerManager: PeerManager;
private readonly statusCache: LocalStatusCache;
private readonly libp2p: Libp2p;
private readonly logger: Logger;
private readonly config: BeaconConfig;
Expand Down Expand Up @@ -119,6 +122,7 @@ export class Network implements INetwork {
attnetsService,
syncnetsService,
peerManager,
statusCache,
} = modules;
this.opts = opts;
this.config = config;
Expand All @@ -136,8 +140,10 @@ export class Network implements INetwork {
this.attnetsService = attnetsService;
this.syncnetsService = syncnetsService;
this.peerManager = peerManager;
this.statusCache = statusCache;

this.chain.clock.on(ClockEvent.epoch, this.onEpoch);
this.chain.emitter.on(routes.events.EventType.head, this.onHead);
this.chain.emitter.on(routes.events.EventType.lightClientFinalityUpdate, this.onLightClientFinalityUpdate);
this.chain.emitter.on(routes.events.EventType.lightClientOptimisticUpdate, this.onLightClientOptimisticUpdate);
modules.signal.addEventListener("abort", this.close.bind(this), {once: true});
Expand Down Expand Up @@ -214,6 +220,7 @@ export class Network implements INetwork {

const syncnetsService = new SyncnetsService(config, chain.clock, gossip, metadata, logger, metricsCore, opts);

const statusCache = new LocalStatusCache(chain.getStatus());
const peerManager = new PeerManager(
{
libp2p,
Expand All @@ -223,7 +230,8 @@ export class Network implements INetwork {
syncnetsService,
logger,
metrics: metricsCore,
chain,
clock,
statusCache,
config,
peerRpcScores,
networkEventBus,
Expand Down Expand Up @@ -277,6 +285,7 @@ export class Network implements INetwork {
attnetsService,
syncnetsService,
peerManager,
statusCache,
});
}

Expand All @@ -285,6 +294,7 @@ export class Network implements INetwork {
if (this.closed) return;

this.chain.emitter.off(ClockEvent.epoch, this.onEpoch);
this.chain.emitter.off(routes.events.EventType.head, this.onHead);
this.chain.emitter.off(routes.events.EventType.lightClientFinalityUpdate, this.onLightClientFinalityUpdate);
this.chain.emitter.off(routes.events.EventType.lightClientOptimisticUpdate, this.onLightClientOptimisticUpdate);

Expand Down Expand Up @@ -532,6 +542,10 @@ export class Network implements INetwork {
}
};

private onHead = (): void => {
this.statusCache.update(this.chain.getStatus());
};

private subscribeCoreTopicsAtFork = (fork: ForkName): void => {
if (this.subscribedForks.has(fork)) return;
this.subscribedForks.add(fork);
Expand Down
20 changes: 12 additions & 8 deletions packages/beacon-node/src/network/peers/peerManager.ts
Expand Up @@ -6,7 +6,6 @@ import {SYNC_COMMITTEE_SUBNET_COUNT} from "@lodestar/params";
import {BeaconConfig} from "@lodestar/config";
import {allForks, altair, phase0} from "@lodestar/types";
import {Logger} from "@lodestar/utils";
import {IBeaconChain} from "../../chain/index.js";
import {GoodByeReasonCode, GOODBYE_KNOWN_CODES, Libp2pEvent} from "../../constants/index.js";
import {NetworkCoreMetrics} from "../core/metrics.js";
import {NetworkEvent, INetworkEventBus} from "../events.js";
Expand All @@ -16,6 +15,8 @@ import {getConnection, getConnectionsMap, prettyPrintPeerId} from "../util.js";
import {SubnetsService} from "../subnets/index.js";
import {SubnetType} from "../metadata.js";
import {Eth2Gossipsub} from "../gossip/gossipsub.js";
import {StatusCache} from "../statusCache.js";
import {IClock} from "../../util/clock.js";
import {PeersData, PeerData} from "./peersData.js";
import {PeerDiscovery, SubnetDiscvQueryMs} from "./discover.js";
import {IPeerRpcScoreStore, ScoreState, updateGossipsubScores} from "./score.js";
Expand Down Expand Up @@ -88,7 +89,8 @@ export type PeerManagerModules = {
gossip: Eth2Gossipsub;
attnetsService: SubnetsService;
syncnetsService: SubnetsService;
chain: IBeaconChain;
statusCache: StatusCache;
clock: IClock;
config: BeaconConfig;
peerRpcScores: IPeerRpcScoreStore;
networkEventBus: INetworkEventBus;
Expand Down Expand Up @@ -119,7 +121,8 @@ export class PeerManager {
private gossipsub: Eth2Gossipsub;
private attnetsService: SubnetsService;
private syncnetsService: SubnetsService;
private chain: IBeaconChain;
private statusCache: StatusCache;
private clock: IClock;
private config: BeaconConfig;
private peerRpcScores: IPeerRpcScoreStore;
/** If null, discovery is disabled */
Expand All @@ -140,7 +143,8 @@ export class PeerManager {
this.gossipsub = modules.gossip;
this.attnetsService = modules.attnetsService;
this.syncnetsService = modules.syncnetsService;
this.chain = modules.chain;
this.statusCache = modules.statusCache;
this.clock = modules.clock;
this.config = modules.config;
this.peerRpcScores = modules.peerRpcScores;
this.networkEventBus = modules.networkEventBus;
Expand Down Expand Up @@ -313,7 +317,7 @@ export class PeerManager {

let isIrrelevant: boolean;
try {
const irrelevantReasonType = assertPeerRelevance(status, this.chain);
const irrelevantReasonType = assertPeerRelevance(status, this.statusCache.get(), this.clock.currentSlot);
if (irrelevantReasonType === null) {
isIrrelevant = false;
} else {
Expand Down Expand Up @@ -379,7 +383,7 @@ export class PeerManager {

private async requestStatusMany(peers: PeerId[]): Promise<void> {
try {
const localStatus = this.chain.getStatus();
const localStatus = this.statusCache.get();
await Promise.all(peers.map(async (peer) => this.requestStatus(peer, localStatus)));
} catch (e) {
this.logger.verbose("Error requesting new status to peers", {}, e as Error);
Expand Down Expand Up @@ -445,7 +449,7 @@ export class PeerManager {
subnet: query.subnet,
type,
maxPeersToDiscover: query.maxPeersToDiscover,
toUnixMs: 1000 * (this.chain.genesisTime + query.toSlot * this.config.SECONDS_PER_SLOT),
toUnixMs: 1000 * (this.clock.genesisTime + query.toSlot * this.config.SECONDS_PER_SLOT),
});
}

Expand Down Expand Up @@ -567,7 +571,7 @@ export class PeerManager {
if (direction === "outbound") {
//this.pingAndStatusTimeouts();
void this.requestPing(peer);
void this.requestStatus(peer, this.chain.getStatus());
void this.requestStatus(peer, this.statusCache.get());
}

// AgentVersion was set in libp2p IdentifyService, 'peer:connect' event handler
Expand Down
@@ -1,8 +1,5 @@
import {computeStartSlotAtEpoch, getBlockRootAtSlot} from "@lodestar/state-transition";
import {SLOTS_PER_HISTORICAL_ROOT} from "@lodestar/params";
import {Epoch, ForkDigest, Root, phase0, ssz} from "@lodestar/types";
import {ForkDigest, Root, Slot, phase0, ssz} from "@lodestar/types";
import {toHexString} from "@chainsafe/ssz";
import {IBeaconChain} from "../../../chain/index.js";

// TODO: Why this value? (From Lighthouse)
const FUTURE_SLOT_TOLERANCE = 1;
Expand All @@ -22,9 +19,11 @@ type IrrelevantPeerType =
* Process a `Status` message to determine if a peer is relevant to us. If the peer is
* irrelevant the reason is returned.
*/
export function assertPeerRelevance(remote: phase0.Status, chain: IBeaconChain): IrrelevantPeerType | null {
const local = chain.getStatus();

export function assertPeerRelevance(
remote: phase0.Status,
local: phase0.Status,
currentSlot: Slot
): IrrelevantPeerType | null {
// The node is on a different network/fork
if (!ssz.ForkDigest.equals(local.forkDigest, remote.forkDigest)) {
return {
Expand All @@ -37,7 +36,7 @@ export function assertPeerRelevance(remote: phase0.Status, chain: IBeaconChain):
// The remote's head is on a slot that is significantly ahead of what we consider the
// current slot. This could be because they are using a different genesis time, or that
// their or our system's clock is incorrect.
const slotDiff = remote.headSlot - Math.max(chain.clock.currentSlot, 0);
const slotDiff = remote.headSlot - Math.max(currentSlot, 0);
if (slotDiff > FUTURE_SLOT_TOLERANCE) {
return {code: IrrelevantPeerCode.DIFFERENT_CLOCKS, slotDiff};
}
Expand All @@ -52,11 +51,7 @@ export function assertPeerRelevance(remote: phase0.Status, chain: IBeaconChain):
!isZeroRoot(local.finalizedRoot)
) {
const remoteRoot = remote.finalizedRoot;
const expectedRoot =
remote.finalizedEpoch === local.finalizedEpoch
? local.finalizedRoot
: // This will get the latest known block at the start of the epoch.
getRootAtHistoricalEpoch(chain, remote.finalizedEpoch);
const expectedRoot = remote.finalizedEpoch === local.finalizedEpoch ? local.finalizedRoot : null;

if (expectedRoot !== null && !ssz.Root.equals(remoteRoot, expectedRoot)) {
return {
Expand All @@ -76,32 +71,6 @@ export function isZeroRoot(root: Root): boolean {
return ssz.Root.equals(root, ZERO_ROOT);
}

function getRootAtHistoricalEpoch(chain: IBeaconChain, epoch: Epoch): Root | null {
const headState = chain.getHeadState();
const slot = computeStartSlotAtEpoch(epoch);

if (slot < headState.slot - SLOTS_PER_HISTORICAL_ROOT) {
// TODO: If the slot is very old, go to the historical blocks DB and fetch the block with less or equal `slot`.
// Note that our db schema will have to be updated to persist the block root to prevent re-hashing.
// For now peers will be accepted, since it's better than throwing an error on `getBlockRootAtSlot()`
return null;
}

// This will get the latest known block at the start of the epoch.
// NOTE: Throws if the epoch if from a long-ago epoch
return getBlockRootAtSlot(headState, slot);

// NOTE: Previous code tolerated long-ago epochs
// ^^^^
// finalized checkpoint of status is from an old long-ago epoch.
// We need to ask the chain for most recent canonical block at the finalized checkpoint start slot.
// The problem is that the slot may be a skip slot.
// And the block root may be from multiple epochs back even.
// The epoch in the checkpoint is there to checkpoint the tail end of skip slots, even if there is no block.
// TODO: accepted for now. Need to maintain either a list of finalized block roots,
// or inefficiently loop from finalized slot backwards, until we find the block we need to check against.
}

export function renderIrrelevantPeerType(type: IrrelevantPeerType): string {
switch (type.code) {
case IrrelevantPeerCode.INCOMPATIBLE_FORKS:
Expand Down
17 changes: 17 additions & 0 deletions packages/beacon-node/src/network/statusCache.ts
@@ -0,0 +1,17 @@
import {phase0} from "@lodestar/types";

export interface StatusCache {
get(): phase0.Status;
}

export class LocalStatusCache implements StatusCache {
constructor(private status: phase0.Status) {}

get(): phase0.Status {
return this.status;
}

update(localStatus: phase0.Status): void {
this.status = localStatus;
}
}
29 changes: 14 additions & 15 deletions packages/beacon-node/test/e2e/network/peers/peerManager.test.ts
Expand Up @@ -13,12 +13,13 @@ import {PeerRpcScoreStore, PeerManager} from "../../../../src/network/peers/inde
import {Eth2Gossipsub, getConnectionsMap, NetworkEvent, NetworkEventBus} from "../../../../src/network/index.js";
import {PeersData} from "../../../../src/network/peers/peersData.js";
import {createNode, getAttnets, getSyncnets} from "../../../utils/network.js";
import {MockBeaconChain} from "../../../utils/mocks/chain/chain.js";
import {generateState} from "../../../utils/state.js";
import {waitForEvent} from "../../../utils/events/resolver.js";
import {testLogger} from "../../../utils/logger.js";
import {getValidPeerId} from "../../../utils/peer.js";
import {IAttnetsService} from "../../../../src/network/subnets/index.js";
import {Clock} from "../../../../src/util/clock.js";
import {LocalStatusCache} from "../../../../src/network/statusCache.js";

const logger = testLogger();

Expand All @@ -44,17 +45,14 @@ describe("network / peers / PeerManager", function () {
},
});
const beaconConfig = createBeaconConfig(config, state.genesisValidatorsRoot);
const chain = new MockBeaconChain({
genesisTime: 0,
chainId: 0,
networkId: BigInt(0),
state,
config: beaconConfig,
});
const controller = new AbortController();
const clock = new Clock({config: beaconConfig, genesisTime: 0, signal: controller.signal});
const status = ssz.phase0.Status.defaultValue();
const statusCache = new LocalStatusCache(status);
const libp2p = await createNode("/ip4/127.0.0.1/tcp/0");

afterEachCallbacks.push(async () => {
await chain.close();
controller.abort();
await libp2p.stop();
});

Expand All @@ -78,7 +76,8 @@ describe("network / peers / PeerManager", function () {
reqResp,
logger,
metrics: null,
chain,
clock,
statusCache,
config: beaconConfig,
peerRpcScores,
networkEventBus,
Expand All @@ -96,7 +95,7 @@ describe("network / peers / PeerManager", function () {
);
await peerManager.start();

return {chain, libp2p, reqResp, peerManager, networkEventBus};
return {statusCache, clock, libp2p, reqResp, peerManager, networkEventBus};
}

// Create a real event emitter with stubbed methods
Expand Down Expand Up @@ -159,7 +158,7 @@ describe("network / peers / PeerManager", function () {
} as Connection;

it("Should emit peer connected event on relevant peer status", async function () {
const {chain, libp2p, networkEventBus} = await mockModules();
const {statusCache, libp2p, networkEventBus} = await mockModules();

// Simualate a peer connection, get() should return truthy
getConnectionsMap(libp2p.connectionManager).set(peerId1.toString(), [libp2pConnectionOutboud]);
Expand All @@ -168,14 +167,14 @@ describe("network / peers / PeerManager", function () {
const peerConnectedPromise = waitForEvent(networkEventBus, NetworkEvent.peerConnected, this.timeout() / 2);

// Send the local status and remote status, which always passes the assertPeerRelevance function
const remoteStatus = chain.getStatus();
const remoteStatus = statusCache.get();
networkEventBus.emit(NetworkEvent.reqRespRequest, {method: ReqRespMethod.Status, body: remoteStatus}, peerId1);

await peerConnectedPromise;
});

it("On peerConnect handshake flow", async function () {
const {chain, libp2p, reqResp, peerManager, networkEventBus} = await mockModules();
const {statusCache, libp2p, reqResp, peerManager, networkEventBus} = await mockModules();

// Simualate a peer connection, get() should return truthy
getConnectionsMap(libp2p.connectionManager).set(peerId1.toString(), [libp2pConnectionOutboud]);
Expand All @@ -184,7 +183,7 @@ describe("network / peers / PeerManager", function () {
const peerConnectedPromise = waitForEvent(networkEventBus, NetworkEvent.peerConnected, this.timeout() / 2);

// Simulate peer1 returning a PING and STATUS message
const remoteStatus = chain.getStatus();
const remoteStatus = statusCache.get();
const remoteMetadata: altair.Metadata = {seqNumber: BigInt(1), attnets: getAttnets(), syncnets: getSyncnets()};
reqResp.ping.resolves(remoteMetadata.seqNumber);
reqResp.status.resolves(remoteStatus);
Expand Down
@@ -1,8 +1,6 @@
import {expect} from "chai";
import {phase0} from "@lodestar/types";
import {MockBeaconChain} from "../../../../utils/mocks/chain/chain.js";
import {assertPeerRelevance, IrrelevantPeerCode} from "../../../../../src/network/peers/utils/assertPeerRelevance.js";
import {IClock} from "../../../../../src/util/clock.js";

describe("network / peers / utils / assertPeerRelevance", () => {
const correctForkDigest = Buffer.alloc(4, 0);
Expand Down Expand Up @@ -81,21 +79,15 @@ describe("network / peers / utils / assertPeerRelevance", () => {

for (const {id, remote, currentSlot, irrelevantType} of testCases) {
it(id, async () => {
// Partial instance with only the methods needed for the test
const chain = {
getStatus: () => ({
forkDigest: correctForkDigest,
finalizedRoot: ZERO_HASH,
finalizedEpoch: 0,
headRoot: ZERO_HASH,
headSlot: 0,
}),
clock: {
currentSlot: currentSlot ?? 0,
} as Partial<IClock>,
} as Partial<MockBeaconChain> as MockBeaconChain;
const local = {
forkDigest: correctForkDigest,
finalizedRoot: ZERO_HASH,
finalizedEpoch: 0,
headRoot: ZERO_HASH,
headSlot: 0,
};

expect(assertPeerRelevance(remote, chain)).to.deep.equal(irrelevantType);
expect(assertPeerRelevance(remote, local, currentSlot ?? 0)).to.deep.equal(irrelevantType);
});
}
});