Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use BeaconChain this in processBlock fns #4377

Merged
merged 4 commits into from Aug 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
144 changes: 56 additions & 88 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Expand Up @@ -2,24 +2,12 @@ import {altair, ssz} from "@lodestar/types";
import {SLOTS_PER_EPOCH} from "@lodestar/params";
import {toHexString} from "@chainsafe/ssz";
import {CachedBeaconStateAltair, computeEpochAtSlot, RootCache} from "@lodestar/state-transition";
import {IForkChoice, ForkChoiceError, ForkChoiceErrorCode} from "@lodestar/fork-choice";
import {ILogger} from "@lodestar/utils";
import {IChainForkConfig} from "@lodestar/config";
import {IMetrics} from "../../metrics/index.js";
import {IExecutionEngine} from "../../execution/engine/interface.js";
import {IBeaconDb} from "../../db/index.js";
import {ForkChoiceError, ForkChoiceErrorCode} from "@lodestar/fork-choice";
import {ZERO_HASH_HEX} from "../../constants/index.js";
import {CheckpointStateCache, StateContextCache, toCheckpointHex} from "../stateCache/index.js";
import {toCheckpointHex} from "../stateCache/index.js";
import {ChainEvent} from "../emitter.js";
import {ChainEventEmitter} from "../emitter.js";
import {LightClientServer} from "../lightClient/index.js";
import {SeenAggregatedAttestations} from "../seenCache/seenAggregateAndProof.js";
import {SeenBlockAttesters} from "../seenCache/seenBlockAttesters.js";
import {IEth1ForBlockProduction} from "../../eth1/index.js";
import {BeaconProposerCache} from "../beaconProposerCache.js";
import {IBeaconClock} from "../clock/index.js";
import {ReprocessController, REPROCESS_MIN_TIME_TO_NEXT_SLOT_SEC} from "../reprocess.js";
import {CheckpointBalancesCache} from "../balancesCache.js";
import {REPROCESS_MIN_TIME_TO_NEXT_SLOT_SEC} from "../reprocess.js";
import type {BeaconChain} from "../chain.js";
import {FullyVerifiedBlock, ImportBlockOpts} from "./types.js";
import {PendingEvents} from "./utils/pendingEvents.js";
import {getCheckpointFromState} from "./utils/checkpoint.js";
Expand All @@ -29,26 +17,6 @@ import {getCheckpointFromState} from "./utils/checkpoint.js";
*/
const FORK_CHOICE_ATT_EPOCH_LIMIT = 1;

export type ImportBlockModules = {
db: IBeaconDb;
forkChoice: IForkChoice;
stateCache: StateContextCache;
checkpointStateCache: CheckpointStateCache;
seenAggregatedAttestations: SeenAggregatedAttestations;
seenBlockAttesters: SeenBlockAttesters;
beaconProposerCache: BeaconProposerCache;
checkpointBalancesCache: CheckpointBalancesCache;
reprocessController: ReprocessController;
lightClientServer: LightClientServer;
eth1: IEth1ForBlockProduction;
executionEngine: IExecutionEngine;
emitter: ChainEventEmitter;
config: IChainForkConfig;
clock: IBeaconClock;
logger: ILogger;
metrics: IMetrics | null;
};

/**
* Imports a fully verified block into the chain state. Produces multiple permanent side-effects.
*
Expand All @@ -69,12 +37,12 @@ export type ImportBlockModules = {
* - Send events after everything is done
*/
export async function importBlock(
chain: ImportBlockModules,
this: BeaconChain,
fullyVerifiedBlock: FullyVerifiedBlock,
opts: ImportBlockOpts
): Promise<void> {
const {block, postState, parentBlockSlot, executionStatus} = fullyVerifiedBlock;
const pendingEvents = new PendingEvents(chain.emitter);
const pendingEvents = new PendingEvents(this.emitter);

// - Observe attestations
// TODO
Expand All @@ -87,17 +55,17 @@ export async function importBlock(

// - Register block with fork-hoice

const prevFinalizedEpoch = chain.forkChoice.getFinalizedCheckpoint().epoch;
const blockDelaySec = (fullyVerifiedBlock.seenTimestampSec - postState.genesisTime) % chain.config.SECONDS_PER_SLOT;
const blockRoot = toHexString(chain.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message));
const prevFinalizedEpoch = this.forkChoice.getFinalizedCheckpoint().epoch;
const blockDelaySec = (fullyVerifiedBlock.seenTimestampSec - postState.genesisTime) % this.config.SECONDS_PER_SLOT;
const blockRoot = toHexString(this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message));
// Should compute checkpoint balances before forkchoice.onBlock
chain.checkpointBalancesCache.processState(blockRoot, postState);
chain.forkChoice.onBlock(block.message, postState, blockDelaySec, chain.clock.currentSlot, executionStatus);
this.checkpointBalancesCache.processState(blockRoot, postState);
this.forkChoice.onBlock(block.message, postState, blockDelaySec, this.clock.currentSlot, executionStatus);

// - Register state and block to the validator monitor
// TODO

const currentEpoch = computeEpochAtSlot(chain.forkChoice.getTime());
const currentEpoch = computeEpochAtSlot(this.forkChoice.getTime());
const blockEpoch = computeEpochAtSlot(block.message.slot);

// - For each attestation
Expand All @@ -118,7 +86,7 @@ export async function importBlock(
const {target, slot, beaconBlockRoot} = attestation.data;

const attDataRoot = toHexString(ssz.phase0.AttestationData.hashTreeRoot(indexedAttestation.data));
chain.seenAggregatedAttestations.add(
this.seenAggregatedAttestations.add(
target.epoch,
attDataRoot,
{aggregationBits: attestation.aggregationBits, trueBitCount: indexedAttestation.attestingIndices.length},
Expand All @@ -127,14 +95,14 @@ export async function importBlock(
// Duplicated logic from fork-choice onAttestation validation logic.
// Attestations outside of this range will be dropped as Errors, so no need to import
if (target.epoch <= currentEpoch && target.epoch >= currentEpoch - FORK_CHOICE_ATT_EPOCH_LIMIT) {
chain.forkChoice.onAttestation(indexedAttestation, attDataRoot);
this.forkChoice.onAttestation(indexedAttestation, attDataRoot);
}

// Note: To avoid slowing down sync, only register attestations within FORK_CHOICE_ATT_EPOCH_LIMIT
chain.seenBlockAttesters.addIndices(blockEpoch, indexedAttestation.attestingIndices);
this.seenBlockAttesters.addIndices(blockEpoch, indexedAttestation.attestingIndices);

const correctHead = ssz.Root.equals(rootCache.getBlockRootAtSlot(slot), beaconBlockRoot);
chain.metrics?.registerAttestationInBlock(indexedAttestation, parentBlockSlot, correctHead);
this.metrics?.registerAttestationInBlock(indexedAttestation, parentBlockSlot, correctHead);

// don't want to log the processed attestations here as there are so many attestations and it takes too much disc space,
// users may want to keep more log files instead of unnecessary processed attestations log
Expand All @@ -152,13 +120,13 @@ export async function importBlock(
}
} else {
// always log other errors
chain.logger.warn("Error processing attestation from block", {slot: block.message.slot}, e as Error);
this.logger.warn("Error processing attestation from block", {slot: block.message.slot}, e as Error);
}
}
}

for (const {error, count} of invalidAttestationErrorsByCode.values()) {
chain.logger.warn(
this.logger.warn(
"Error processing attestations from block",
{slot: block.message.slot, erroredAttestations: count},
error
Expand All @@ -172,54 +140,54 @@ export async function importBlock(
// Cache state to preserve epoch transition work
const checkpointState = postState;
const cp = getCheckpointFromState(checkpointState);
chain.checkpointStateCache.add(cp, checkpointState);
this.checkpointStateCache.add(cp, checkpointState);
pendingEvents.push(ChainEvent.checkpoint, cp, checkpointState);

// Note: in-lined code from previos handler of ChainEvent.checkpoint
chain.logger.verbose("Checkpoint processed", toCheckpointHex(cp));
this.logger.verbose("Checkpoint processed", toCheckpointHex(cp));

chain.metrics?.currentValidators.set(
this.metrics?.currentValidators.set(
{status: "active"},
checkpointState.epochCtx.currentShuffling.activeIndices.length
);
const parentBlockSummary = chain.forkChoice.getBlock(checkpointState.latestBlockHeader.parentRoot);
const parentBlockSummary = this.forkChoice.getBlock(checkpointState.latestBlockHeader.parentRoot);

if (parentBlockSummary) {
const justifiedCheckpoint = checkpointState.currentJustifiedCheckpoint;
const justifiedEpoch = justifiedCheckpoint.epoch;
const preJustifiedEpoch = parentBlockSummary.justifiedEpoch;
if (justifiedEpoch > preJustifiedEpoch) {
chain.emitter.emit(ChainEvent.justified, justifiedCheckpoint, checkpointState);
chain.logger.verbose("Checkpoint justified", toCheckpointHex(cp));
chain.metrics?.previousJustifiedEpoch.set(checkpointState.previousJustifiedCheckpoint.epoch);
chain.metrics?.currentJustifiedEpoch.set(cp.epoch);
this.emitter.emit(ChainEvent.justified, justifiedCheckpoint, checkpointState);
this.logger.verbose("Checkpoint justified", toCheckpointHex(cp));
this.metrics?.previousJustifiedEpoch.set(checkpointState.previousJustifiedCheckpoint.epoch);
this.metrics?.currentJustifiedEpoch.set(cp.epoch);
}
const finalizedCheckpoint = checkpointState.finalizedCheckpoint;
const finalizedEpoch = finalizedCheckpoint.epoch;
const preFinalizedEpoch = parentBlockSummary.finalizedEpoch;
if (finalizedEpoch > preFinalizedEpoch) {
chain.emitter.emit(ChainEvent.finalized, finalizedCheckpoint, checkpointState);
chain.logger.verbose("Checkpoint finalized", toCheckpointHex(cp));
chain.metrics?.finalizedEpoch.set(cp.epoch);
this.emitter.emit(ChainEvent.finalized, finalizedCheckpoint, checkpointState);
this.logger.verbose("Checkpoint finalized", toCheckpointHex(cp));
this.metrics?.finalizedEpoch.set(cp.epoch);
}
}
}

// Emit ChainEvent.forkChoiceHead event
const oldHead = chain.forkChoice.getHead();
const newHead = chain.forkChoice.updateHead();
const currFinalizedEpoch = chain.forkChoice.getFinalizedCheckpoint().epoch;
const oldHead = this.forkChoice.getHead();
const newHead = this.forkChoice.updateHead();
const currFinalizedEpoch = this.forkChoice.getFinalizedCheckpoint().epoch;

if (newHead.blockRoot !== oldHead.blockRoot) {
// new head
pendingEvents.push(ChainEvent.forkChoiceHead, newHead);
chain.metrics?.forkChoiceChangedHead.inc();
this.metrics?.forkChoiceChangedHead.inc();

const distance = chain.forkChoice.getCommonAncestorDistance(oldHead, newHead);
const distance = this.forkChoice.getCommonAncestorDistance(oldHead, newHead);
if (distance !== null) {
// chain reorg
chain.metrics?.forkChoiceReorg.inc();
chain.logger.verbose("Chain reorg", {
this.metrics?.forkChoiceReorg.inc();
this.logger.verbose("Chain reorg", {
depth: distance,
previousHead: oldHead.blockRoot,
previousHeadParent: oldHead.parentRoot,
Expand All @@ -229,22 +197,22 @@ export async function importBlock(
newSlot: newHead.slot,
});
pendingEvents.push(ChainEvent.forkChoiceReorg, newHead, oldHead, distance);
chain.metrics?.forkChoiceReorg.inc();
chain.metrics?.forkChoiceReorgDistance.observe(distance);
this.metrics?.forkChoiceReorg.inc();
this.metrics?.forkChoiceReorgDistance.observe(distance);
}

// Lightclient server support (only after altair)
// - Persist state witness
// - Use block's syncAggregate
if (blockEpoch >= chain.config.ALTAIR_FORK_EPOCH) {
if (blockEpoch >= this.config.ALTAIR_FORK_EPOCH) {
try {
chain.lightClientServer.onImportBlockHead(
this.lightClientServer.onImportBlockHead(
block.message as altair.BeaconBlock,
postState as CachedBeaconStateAltair,
parentBlockSlot
);
} catch (e) {
chain.logger.error("Error lightClientServer.onImportBlock", {slot: block.message.slot}, e as Error);
this.logger.error("Error lightClientServer.onImportBlock", {slot: block.message.slot}, e as Error);
}
}
}
Expand All @@ -261,17 +229,17 @@ export async function importBlock(
* - `headBlockHash !== null` -> Pre BELLATRIX_EPOCH
* - `headBlockHash !== ZERO_HASH` -> Pre TTD
*/
const headBlockHash = chain.forkChoice.getHead().executionPayloadBlockHash ?? ZERO_HASH_HEX;
const headBlockHash = this.forkChoice.getHead().executionPayloadBlockHash ?? ZERO_HASH_HEX;
/**
* After BELLATRIX_EPOCH and TTD it's okay to send a zero hash block hash for the finalized block. This will happen if
* the current finalized block does not contain any execution payload at all (pre MERGE_EPOCH) or if it contains a
* zero block hash (pre TTD)
*/
const safeBlockHash = chain.forkChoice.getJustifiedBlock().executionPayloadBlockHash ?? ZERO_HASH_HEX;
const finalizedBlockHash = chain.forkChoice.getFinalizedBlock().executionPayloadBlockHash ?? ZERO_HASH_HEX;
const safeBlockHash = this.forkChoice.getJustifiedBlock().executionPayloadBlockHash ?? ZERO_HASH_HEX;
const finalizedBlockHash = this.forkChoice.getFinalizedBlock().executionPayloadBlockHash ?? ZERO_HASH_HEX;
if (headBlockHash !== ZERO_HASH_HEX) {
chain.executionEngine.notifyForkchoiceUpdate(headBlockHash, safeBlockHash, finalizedBlockHash).catch((e) => {
chain.logger.error("Error pushing notifyForkchoiceUpdate()", {headBlockHash, finalizedBlockHash}, e);
this.executionEngine.notifyForkchoiceUpdate(headBlockHash, safeBlockHash, finalizedBlockHash).catch((e) => {
this.logger.error("Error pushing notifyForkchoiceUpdate()", {headBlockHash, finalizedBlockHash}, e);
});
}
}
Expand All @@ -281,29 +249,29 @@ export async function importBlock(
// TODO: Move internal emitter onBlock() code here
// MUST happen before any other block is processed
// This adds the state necessary to process the next block
chain.stateCache.add(postState);
await chain.db.block.add(block);
this.stateCache.add(postState);
await this.db.block.add(block);

// - head_tracker.register_block(block_root, parent_root, slot)

// - Send event after everything is done

// Emit all events at once after fully completing importBlock()
chain.emitter.emit(ChainEvent.block, block, postState);
this.emitter.emit(ChainEvent.block, block, postState);
pendingEvents.emit();

// Register stat metrics about the block after importing it
chain.metrics?.parentBlockDistance.observe(block.message.slot - parentBlockSlot);
chain.metrics?.proposerBalanceDeltaAny.observe(fullyVerifiedBlock.proposerBalanceDelta);
chain.metrics?.registerImportedBlock(block.message, fullyVerifiedBlock);
this.metrics?.parentBlockDistance.observe(block.message.slot - parentBlockSlot);
this.metrics?.proposerBalanceDeltaAny.observe(fullyVerifiedBlock.proposerBalanceDelta);
this.metrics?.registerImportedBlock(block.message, fullyVerifiedBlock);

const advancedSlot = chain.clock.slotWithFutureTolerance(REPROCESS_MIN_TIME_TO_NEXT_SLOT_SEC);
const advancedSlot = this.clock.slotWithFutureTolerance(REPROCESS_MIN_TIME_TO_NEXT_SLOT_SEC);

chain.reprocessController.onBlockImported({slot: block.message.slot, root: blockRoot}, advancedSlot);
this.reprocessController.onBlockImported({slot: block.message.slot, root: blockRoot}, advancedSlot);

chain.logger.verbose("Block processed", {
this.logger.verbose("Block processed", {
slot: block.message.slot,
root: blockRoot,
delaySec: chain.clock.secFromSlot(block.message.slot),
delaySec: this.clock.secFromSlot(block.message.slot),
});
}