Skip to content

Commit

Permalink
refactor!: move clock events to clock class (#5446)
Browse files Browse the repository at this point in the history
* Generalize clock for network thread

* Move clock events to clock

---------

Co-authored-by: Cayman <caymannava@gmail.com>
  • Loading branch information
dapplion and wemeetagain committed May 1, 2023
1 parent ed047c3 commit 1e4ca97
Show file tree
Hide file tree
Showing 36 changed files with 189 additions and 193 deletions.
Expand Up @@ -20,7 +20,7 @@ import {ChainForkConfig} from "@lodestar/config";
import {ErrorAborted, Logger} from "@lodestar/utils";
import {IExecutionEngine} from "../../execution/engine/index.js";
import {BlockError, BlockErrorCode} from "../errors/index.js";
import {BeaconClock} from "../clock/index.js";
import {IClock} from "../../util/clock.js";
import {BlockProcessOpts} from "../options.js";
import {ExecutePayloadStatus} from "../../execution/engine/interface.js";
import {IEth1ForBlockProduction} from "../../eth1/index.js";
Expand All @@ -30,7 +30,7 @@ import {ImportBlockOpts} from "./types.js";
export type VerifyBlockExecutionPayloadModules = {
eth1: IEth1ForBlockProduction;
executionEngine: IExecutionEngine;
clock: BeaconClock;
clock: IClock;
logger: Logger;
metrics: Metrics | null;
forkChoice: IForkChoice;
Expand Down
Expand Up @@ -3,7 +3,7 @@ import {ChainForkConfig} from "@lodestar/config";
import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice";
import {Slot} from "@lodestar/types";
import {toHexString} from "@lodestar/utils";
import {BeaconClock} from "../clock/interface.js";
import {IClock} from "../../util/clock.js";
import {BlockError, BlockErrorCode} from "../errors/index.js";
import {BlockInput, ImportBlockOpts} from "./types.js";

Expand All @@ -20,7 +20,7 @@ import {BlockInput, ImportBlockOpts} from "./types.js";
* - Not already known
*/
export function verifyBlocksSanityChecks(
chain: {forkChoice: IForkChoice; clock: BeaconClock; config: ChainForkConfig},
chain: {forkChoice: IForkChoice; clock: IClock; config: ChainForkConfig},
blocks: BlockInput[],
opts: ImportBlockOpts
): {relevantBlocks: BlockInput[]; parentSlots: Slot[]; parentBlock: ProtoBlock | null} {
Expand Down
12 changes: 6 additions & 6 deletions packages/beacon-node/src/chain/chain.ts
Expand Up @@ -27,10 +27,10 @@ import {wrapError} from "../util/wrapError.js";
import {ckzg} from "../util/kzg.js";
import {IEth1ForBlockProduction} from "../eth1/index.js";
import {IExecutionEngine, IExecutionBuilder, TransitionConfigurationV1} from "../execution/index.js";
import {Clock, ClockEvent, IClock} from "../util/clock.js";
import {ensureDir, writeIfNotExist} from "../util/file.js";
import {CheckpointStateCache, StateContextCache} from "./stateCache/index.js";
import {BlockProcessor, ImportBlockOpts} from "./blocks/index.js";
import {BeaconClock, LocalClock} from "./clock/index.js";
import {ChainEventEmitter, ChainEvent} from "./emitter.js";
import {IBeaconChain, ProposerPreparationData} from "./interface.js";
import {IChainOptions} from "./options.js";
Expand Down Expand Up @@ -88,7 +88,7 @@ export class BeaconChain implements IBeaconChain {

readonly bls: IBlsVerifier;
readonly forkChoice: IForkChoice;
readonly clock: BeaconClock;
readonly clock: IClock;
readonly emitter: ChainEventEmitter;
readonly stateCache: StateContextCache;
readonly checkpointStateCache: CheckpointStateCache;
Expand Down Expand Up @@ -153,7 +153,7 @@ export class BeaconChain implements IBeaconChain {
logger: Logger;
processShutdownCallback: ProcessShutdownCallback;
/** Used for testing to supply fake clock */
clock?: BeaconClock;
clock?: IClock;
metrics: Metrics | null;
anchorState: BeaconStateAllForks;
eth1: IEth1ForBlockProduction;
Expand Down Expand Up @@ -185,7 +185,7 @@ export class BeaconChain implements IBeaconChain {
? new BlsSingleThreadVerifier({metrics})
: new BlsMultiThreadWorkerPool(opts, {logger, metrics});

if (!clock) clock = new LocalClock({config, emitter, genesisTime: this.genesisTime, signal});
if (!clock) clock = new Clock({config, genesisTime: this.genesisTime, signal});

const preAggregateCutOffTime = (2 / 3) * this.config.SECONDS_PER_SLOT;
this.attestationPool = new AttestationPool(clock, preAggregateCutOffTime, this.opts?.preaggregateSlotDistance);
Expand Down Expand Up @@ -271,8 +271,8 @@ export class BeaconChain implements IBeaconChain {
metrics?.opPool.aggregatedAttestationPoolSize.addCollect(() => this.onScrapeMetrics());

// Event handlers. emitter is created internally and dropped on close(). Not need to .removeListener()
emitter.addListener(ChainEvent.clockSlot, this.onClockSlot.bind(this));
emitter.addListener(ChainEvent.clockEpoch, this.onClockEpoch.bind(this));
clock.addListener(ClockEvent.slot, this.onClockSlot.bind(this));
clock.addListener(ClockEvent.epoch, this.onClockEpoch.bind(this));
emitter.addListener(ChainEvent.forkChoiceFinalized, this.onForkChoiceFinalized.bind(this));
emitter.addListener(ChainEvent.forkChoiceJustified, this.onForkChoiceJustified.bind(this));
}
Expand Down
2 changes: 0 additions & 2 deletions packages/beacon-node/src/chain/clock/index.ts

This file was deleted.

36 changes: 0 additions & 36 deletions packages/beacon-node/src/chain/clock/interface.ts

This file was deleted.

17 changes: 1 addition & 16 deletions packages/beacon-node/src/chain/emitter.ts
Expand Up @@ -2,7 +2,7 @@ import {EventEmitter} from "events";
import StrictEventEmitter from "strict-event-emitter-types";

import {routes} from "@lodestar/api";
import {phase0, Epoch, Slot} from "@lodestar/types";
import {phase0} from "@lodestar/types";
import {CheckpointWithHex} from "@lodestar/fork-choice";
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";

Expand All @@ -22,18 +22,6 @@ export const enum ChainEvent {
* This event is guaranteed to be called after _any_ checkpoint is processed, including skip-slot checkpoints, checkpoints that are formed as a result of processing blocks, etc.
*/
checkpoint = "checkpoint",
/**
* This event signals the start of a new slot, and that subsequent calls to `clock.currentSlot` will equal `slot`.
*
* This event is guaranteed to be emitted every `SECONDS_PER_SLOT` seconds.
*/
clockSlot = "clock:slot",
/**
* This event signals the start of a new epoch, and that subsequent calls to `clock.currentEpoch` will return `epoch`.
*
* This event is guaranteed to be emitted every `SECONDS_PER_SLOT * SLOTS_PER_EPOCH` seconds.
*/
clockEpoch = "clock:epoch",
/**
* This event signals that the fork choice store has been updated.
*
Expand All @@ -57,9 +45,6 @@ type ApiEvents = {[K in routes.events.EventType]: (data: routes.events.EventData
export type IChainEvents = ApiEvents & {
[ChainEvent.checkpoint]: (checkpoint: phase0.Checkpoint, state: CachedBeaconStateAllForks) => void;

[ChainEvent.clockSlot]: (slot: Slot) => void;
[ChainEvent.clockEpoch]: (epoch: Epoch) => void;

[ChainEvent.forkChoiceJustified]: (checkpoint: CheckpointWithHex) => void;
[ChainEvent.forkChoiceFinalized]: (checkpoint: CheckpointWithHex) => void;
};
Expand Down
1 change: 0 additions & 1 deletion packages/beacon-node/src/chain/index.ts
@@ -1,7 +1,6 @@
export * from "./interface.js";
export * from "./emitter.js";
export * from "./chain.js";
export {BeaconClock} from "./clock/index.js";
export * from "./forkChoice/index.js";
export * from "./initState.js";
export * from "./stateCache/index.js";
4 changes: 2 additions & 2 deletions packages/beacon-node/src/chain/interface.ts
Expand Up @@ -8,7 +8,7 @@ import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice";
import {IEth1ForBlockProduction} from "../eth1/index.js";
import {IExecutionEngine, IExecutionBuilder} from "../execution/index.js";
import {Metrics} from "../metrics/metrics.js";
import {BeaconClock} from "./clock/interface.js";
import {IClock} from "../util/clock.js";
import {ChainEventEmitter} from "./emitter.js";
import {IStateRegenerator, RegenCaller} from "./regen/index.js";
import {StateContextCache, CheckpointStateCache} from "./stateCache/index.js";
Expand Down Expand Up @@ -62,7 +62,7 @@ export interface IBeaconChain {

readonly bls: IBlsVerifier;
readonly forkChoice: IForkChoice;
readonly clock: BeaconClock;
readonly clock: IClock;
readonly emitter: ChainEventEmitter;
readonly stateCache: StateContextCache;
readonly checkpointStateCache: CheckpointStateCache;
Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/chain/opPools/attestationPool.ts
Expand Up @@ -3,7 +3,7 @@ import {PointFormat, Signature} from "@chainsafe/bls/types";
import bls from "@chainsafe/bls";
import {BitArray, toHexString} from "@chainsafe/ssz";
import {MapDef} from "@lodestar/utils";
import {BeaconClock} from "../clock/interface.js";
import {IClock} from "../../util/clock.js";
import {InsertOutcome, OpPoolError, OpPoolErrorCode} from "./types.js";
import {pruneBySlot, signatureFromBytesNoCheck} from "./utils.js";

Expand Down Expand Up @@ -62,7 +62,7 @@ export class AttestationPool {
private lowestPermissibleSlot = 0;

constructor(
private readonly clock: BeaconClock,
private readonly clock: IClock,
private readonly cutOffSecFromSlot: number,
private readonly preaggregateSlotDistance = 0
) {}
Expand Down
Expand Up @@ -4,7 +4,7 @@ import {SYNC_COMMITTEE_SIZE, SYNC_COMMITTEE_SUBNET_COUNT} from "@lodestar/params
import {altair, Root, Slot, SubcommitteeIndex} from "@lodestar/types";
import {BitArray, toHexString} from "@chainsafe/ssz";
import {MapDef} from "@lodestar/utils";
import {BeaconClock} from "../clock/interface.js";
import {IClock} from "../../util/clock.js";
import {InsertOutcome, OpPoolError, OpPoolErrorCode} from "./types.js";
import {pruneBySlot, signatureFromBytesNoCheck} from "./utils.js";

Expand Down Expand Up @@ -46,7 +46,7 @@ export class SyncCommitteeMessagePool {
private lowestPermissibleSlot = 0;

constructor(
private readonly clock: BeaconClock,
private readonly clock: IClock,
private readonly cutOffSecFromSlot: number,
private readonly preaggregateSlotDistance = 0
) {}
Expand Down
6 changes: 3 additions & 3 deletions packages/beacon-node/src/chain/prepareNextSlot.ts
Expand Up @@ -7,7 +7,7 @@ import {routes} from "@lodestar/api";
import {GENESIS_SLOT, ZERO_HASH_HEX} from "../constants/constants.js";
import {Metrics} from "../metrics/index.js";
import {TransitionConfigurationV1} from "../execution/engine/interface.js";
import {ChainEvent} from "./emitter.js";
import {ClockEvent} from "../util/clock.js";
import {prepareExecutionPayload, getPayloadAttributesForSSE} from "./produceBlock/produceBlockBody.js";
import {IBeaconChain} from "./interface.js";
import {RegenCaller} from "./regen/index.js";
Expand Down Expand Up @@ -38,11 +38,11 @@ export class PrepareNextSlotScheduler {
private readonly logger: Logger,
private readonly signal: AbortSignal
) {
this.chain.emitter.on(ChainEvent.clockSlot, this.prepareForNextSlot);
this.chain.clock.on(ClockEvent.slot, this.prepareForNextSlot);
this.signal.addEventListener(
"abort",
() => {
this.chain.emitter.off(ChainEvent.clockSlot, this.prepareForNextSlot);
this.chain.clock.off(ClockEvent.slot, this.prepareForNextSlot);
},
{once: true}
);
Expand Down
13 changes: 7 additions & 6 deletions packages/beacon-node/src/network/network.ts
Expand Up @@ -10,7 +10,8 @@ import {deneb, Epoch, phase0, allForks, altair} from "@lodestar/types";
import {routes} from "@lodestar/api";
import {PeerScoreStatsDump} from "@chainsafe/libp2p-gossipsub/score";
import {Metrics} from "../metrics/index.js";
import {ChainEvent, IBeaconChain, BeaconClock} from "../chain/index.js";
import {ClockEvent, IClock} from "../util/clock.js";
import {IBeaconChain} from "../chain/index.js";
import {BlockInput, BlockInputType} from "../chain/blocks/types.js";
import {isValidBlsToExecutionChangeForBlockInclusion} from "../chain/opPools/utils.js";
import {formatNodePeer} from "../api/impl/node/utils.js";
Expand Down Expand Up @@ -92,7 +93,7 @@ export class Network implements INetwork {
private readonly libp2p: Libp2p;
private readonly logger: Logger;
private readonly config: BeaconConfig;
private readonly clock: BeaconClock;
private readonly clock: IClock;
private readonly chain: IBeaconChain;
private readonly signal: AbortSignal;

Expand Down Expand Up @@ -136,7 +137,7 @@ export class Network implements INetwork {
this.syncnetsService = syncnetsService;
this.peerManager = peerManager;

this.chain.emitter.on(ChainEvent.clockEpoch, this.onEpoch);
this.chain.clock.on(ClockEvent.epoch, this.onEpoch);
this.chain.emitter.on(routes.events.EventType.lightClientFinalityUpdate, this.onLightClientFinalityUpdate);
this.chain.emitter.on(routes.events.EventType.lightClientOptimisticUpdate, this.onLightClientOptimisticUpdate);
modules.signal.addEventListener("abort", this.close.bind(this), {once: true});
Expand Down Expand Up @@ -195,7 +196,7 @@ export class Network implements INetwork {
},
};

const attnetsService = new AttnetsService(config, chain, _gossip, metadata, logger, metricsCore, opts);
const attnetsService = new AttnetsService(config, chain.clock, _gossip, metadata, logger, metricsCore, opts);

gossip = new Eth2Gossipsub(opts, {
config,
Expand All @@ -211,7 +212,7 @@ export class Network implements INetwork {
events: networkEventBus,
});

const syncnetsService = new SyncnetsService(config, chain, gossip, metadata, logger, metricsCore, opts);
const syncnetsService = new SyncnetsService(config, chain.clock, gossip, metadata, logger, metricsCore, opts);

const peerManager = new PeerManager(
{
Expand Down Expand Up @@ -283,7 +284,7 @@ export class Network implements INetwork {
async close(): Promise<void> {
if (this.closed) return;

this.chain.emitter.off(ChainEvent.clockEpoch, this.onEpoch);
this.chain.emitter.off(ClockEvent.epoch, this.onEpoch);
this.chain.emitter.off(routes.events.EventType.lightClientFinalityUpdate, this.onLightClientFinalityUpdate);
this.chain.emitter.off(routes.events.EventType.lightClientOptimisticUpdate, this.onLightClientOptimisticUpdate);

Expand Down
6 changes: 3 additions & 3 deletions packages/beacon-node/src/network/processor/index.ts
Expand Up @@ -5,7 +5,7 @@ import {IBeaconChain} from "../../chain/interface.js";
import {Metrics} from "../../metrics/metrics.js";
import {NetworkEvent, NetworkEventBus} from "../events.js";
import {GossipType} from "../gossip/interface.js";
import {ChainEvent} from "../../chain/emitter.js";
import {ClockEvent} from "../../util/clock.js";
import {GossipErrorCode} from "../../chain/errors/gossipValidation.js";
import {createGossipQueues} from "./gossipQueues.js";
import {NetworkWorker, NetworkWorkerModules} from "./worker.js";
Expand Down Expand Up @@ -129,7 +129,7 @@ export class NetworkProcessor {

events.on(NetworkEvent.pendingGossipsubMessage, this.onPendingGossipsubMessage.bind(this));
this.chain.emitter.on(routes.events.EventType.block, this.onBlockProcessed.bind(this));
this.chain.emitter.on(ChainEvent.clockSlot, this.onClockSlot.bind(this));
this.chain.clock.on(ClockEvent.slot, this.onClockSlot.bind(this));

this.awaitingGossipsubMessagesByRootBySlot = new MapDef(
() => new MapDef<RootHex, Set<PendingGossipsubMessage>>(() => new Set())
Expand All @@ -154,7 +154,7 @@ export class NetworkProcessor {
async stop(): Promise<void> {
this.events.off(NetworkEvent.pendingGossipsubMessage, this.onPendingGossipsubMessage);
this.chain.emitter.off(routes.events.EventType.block, this.onBlockProcessed);
this.chain.emitter.off(ChainEvent.clockSlot, this.onClockSlot);
this.chain.emitter.off(ClockEvent.slot, this.onClockSlot);
}

dropAllJobs(): void {
Expand Down

0 comments on commit 1e4ca97

Please sign in to comment.