Skip to content

Commit

Permalink
Merge fc3b470 into dc8bdfc
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths committed Jul 13, 2022
2 parents dc8bdfc + fc3b470 commit 3ca87fd
Show file tree
Hide file tree
Showing 16 changed files with 488 additions and 332 deletions.
52 changes: 52 additions & 0 deletions packages/beacon-node/src/chain/balancesCache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import {
CachedBeaconStateAllForks,
computeStartSlotAtEpoch,
EffectiveBalanceIncrements,
getBlockRootAtSlot,
getEffectiveBalanceIncrementsZeroInactive,
} from "@lodestar/state-transition";
import {CheckpointWithHex} from "@lodestar/fork-choice";
import {Epoch, RootHex} from "@lodestar/types";
import {toHexString} from "@lodestar/utils";

/** The number of validator balance sets that are cached within `CheckpointBalancesCache`. */
const MAX_BALANCE_CACHE_SIZE = 4;

type BalancesCacheItem = {
rootHex: RootHex;
epoch: Epoch;
balances: EffectiveBalanceIncrements;
};

/**
* Cache EffectiveBalanceIncrements of checkpoint blocks
*/
export class CheckpointBalancesCache {
private readonly items: BalancesCacheItem[] = [];

/**
* Inspect the given `state` and determine the root of the block at the first slot of
* `state.current_epoch`. If there is not already some entry for the given block root, then
* add the effective balances from the `state` to the cache.
*/
processState(blockRootHex: RootHex, state: CachedBeaconStateAllForks): void {
const epoch = state.epochCtx.currentShuffling.epoch;
const epochBoundarySlot = computeStartSlotAtEpoch(epoch);
const epochBoundaryRoot =
epochBoundarySlot === state.slot ? blockRootHex : toHexString(getBlockRootAtSlot(state, epochBoundarySlot));

const index = this.items.findIndex((item) => item.epoch === epoch && item.rootHex == epochBoundaryRoot);
if (index === -1) {
if (this.items.length === MAX_BALANCE_CACHE_SIZE) {
this.items.shift();
}
// expect to reach this once per epoch
this.items.push({epoch, rootHex: epochBoundaryRoot, balances: getEffectiveBalanceIncrementsZeroInactive(state)});
}
}

get(checkpoint: CheckpointWithHex): EffectiveBalanceIncrements | undefined {
const {rootHex, epoch} = checkpoint;
return this.items.find((item) => item.epoch === epoch && item.rootHex === rootHex)?.balances;
}
}
120 changes: 10 additions & 110 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
import {altair, allForks, ssz} from "@lodestar/types";
import {altair, ssz} from "@lodestar/types";
import {SLOTS_PER_EPOCH} from "@lodestar/params";
import {toHexString} from "@chainsafe/ssz";
import {
CachedBeaconStateAllForks,
CachedBeaconStateAltair,
computeStartSlotAtEpoch,
getEffectiveBalanceIncrementsZeroInactive,
computeEpochAtSlot,
RootCache,
} from "@lodestar/state-transition";
import {IForkChoice, OnBlockPrecachedData, ForkChoiceError, ForkChoiceErrorCode} from "@lodestar/fork-choice";
import {CachedBeaconStateAltair, computeEpochAtSlot, RootCache} from "@lodestar/state-transition";
import {IForkChoice, ForkChoiceError, ForkChoiceErrorCode} from "@lodestar/fork-choice";
import {ILogger} from "@lodestar/utils";
import {IChainForkConfig} from "@lodestar/config";
import {IMetrics} from "../../metrics/index.js";
Expand All @@ -26,6 +19,7 @@ import {IEth1ForBlockProduction} from "../../eth1/index.js";
import {BeaconProposerCache} from "../beaconProposerCache.js";
import {IBeaconClock} from "../clock/index.js";
import {ReprocessController, REPROCESS_MIN_TIME_TO_NEXT_SLOT_SEC} from "../reprocess.js";
import {CheckpointBalancesCache} from "../balancesCache.js";
import {FullyVerifiedBlock} from "./types.js";
import {PendingEvents} from "./utils/pendingEvents.js";
import {getCheckpointFromState} from "./utils/checkpoint.js";
Expand All @@ -43,6 +37,7 @@ export type ImportBlockModules = {
seenAggregatedAttestations: SeenAggregatedAttestations;
seenBlockAttesters: SeenBlockAttesters;
beaconProposerCache: BeaconProposerCache;
checkpointBalancesCache: CheckpointBalancesCache;
reprocessController: ReprocessController;
lightClientServer: LightClientServer;
eth1: IEth1ForBlockProduction;
Expand Down Expand Up @@ -93,19 +88,12 @@ export async function importBlock(chain: ImportBlockModules, fullyVerifiedBlock:
//
// current justified checkpoint should be prev epoch or current epoch if it's just updated
// it should always have epochBalances there bc it's a checkpoint state, ie got through processEpoch
const justifiedCheckpoint = postState.currentJustifiedCheckpoint;

const onBlockPrecachedData: OnBlockPrecachedData = {
executionStatus,
blockDelaySec: (Math.floor(Date.now() / 1000) - postState.genesisTime) % chain.config.SECONDS_PER_SLOT,
};
if (justifiedCheckpoint.epoch > chain.forkChoice.getJustifiedCheckpoint().epoch) {
const state = getStateForJustifiedBalances(chain, postState, block);
onBlockPrecachedData.justifiedBalances = getEffectiveBalanceIncrementsZeroInactive(state);
}

const prevFinalizedEpoch = chain.forkChoice.getFinalizedCheckpoint().epoch;
chain.forkChoice.onBlock(block.message, postState, onBlockPrecachedData);
const blockDelaySec = (Math.floor(Date.now() / 1000) - postState.genesisTime) % chain.config.SECONDS_PER_SLOT;
const blockRoot = toHexString(chain.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message));
// Should compute checkpoint balances before forkchoice.onBlock
chain.checkpointBalancesCache.processState(blockRoot, postState);
chain.forkChoice.onBlock(block.message, postState, blockDelaySec, executionStatus);

// - Register state and block to the validator monitor
// TODO
Expand Down Expand Up @@ -312,8 +300,6 @@ export async function importBlock(chain: ImportBlockModules, fullyVerifiedBlock:
chain.metrics?.proposerBalanceDiffAny.observe(fullyVerifiedBlock.proposerBalanceDiff);
chain.metrics?.registerImportedBlock(block.message, fullyVerifiedBlock);

// Note: in-lined from previous handler of ChainEvent.block
const blockRoot = toHexString(chain.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message));
const advancedSlot = chain.clock.slotWithFutureTolerance(REPROCESS_MIN_TIME_TO_NEXT_SLOT_SEC);

chain.reprocessController.onBlockImported({slot: block.message.slot, root: blockRoot}, advancedSlot);
Expand All @@ -324,89 +310,3 @@ export async function importBlock(chain: ImportBlockModules, fullyVerifiedBlock:
delaySec: chain.clock.secFromSlot(block.message.slot),
});
}

/**
* Returns the closest state to postState.currentJustifiedCheckpoint in the same fork as postState
*
* From the spec https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/fork-choice.md#get_latest_attesting_balance
* The state from which to read balances is:
*
* ```python
* state = store.checkpoint_states[store.justified_checkpoint]
* ```
*
* ```python
* def store_target_checkpoint_state(store: Store, target: Checkpoint) -> None:
* # Store target checkpoint state if not yet seen
* if target not in store.checkpoint_states:
* base_state = copy(store.block_states[target.root])
* if base_state.slot < compute_start_slot_at_epoch(target.epoch):
* process_slots(base_state, compute_start_slot_at_epoch(target.epoch))
* store.checkpoint_states[target] = base_state
* ```
*
* So the state to get justified balances is the post state of `checkpoint.root` dialed forward to the first slot in
* `checkpoint.epoch` if that block is not in `checkpoint.epoch`.
*/
function getStateForJustifiedBalances(
chain: ImportBlockModules,
postState: CachedBeaconStateAllForks,
block: allForks.SignedBeaconBlock
): CachedBeaconStateAllForks {
const justifiedCheckpoint = postState.currentJustifiedCheckpoint;
const checkpointHex = toCheckpointHex(justifiedCheckpoint);
const checkpointSlot = computeStartSlotAtEpoch(checkpointHex.epoch);

// First, check if the checkpoint block in the checkpoint epoch, by getting the block summary from the fork-choice
const checkpointBlock = chain.forkChoice.getBlockHex(checkpointHex.rootHex);
if (!checkpointBlock) {
// Should never happen
return postState;
}

// NOTE: The state of block checkpointHex.rootHex may be prior to the justified checkpoint if it was a skipped slot.
if (checkpointBlock.slot >= checkpointSlot) {
const checkpointBlockState = chain.stateCache.get(checkpointBlock.stateRoot);
if (checkpointBlockState) {
return checkpointBlockState;
}
}

// If here, the first slot of `checkpoint.epoch` is a skipped slot. Check if the state is in the checkpoint cache.
// NOTE: This state and above are correct with the spec.
// NOTE: If the first slot of the epoch was skipped and the node is syncing, this state won't be in the cache.
const state = chain.checkpointStateCache.get(checkpointHex);
if (state) {
return state;
}

// If it's not found, then find the oldest state in the same chain as this one
// NOTE: If `block.message.parentRoot` is not in the fork-choice, `iterateAncestorBlocks()` returns `[]`
// NOTE: This state is not be correct with the spec, it may have extra modifications from multiple blocks.
// However, it's a best effort before triggering an async regen process. In the future this should be fixed
// to use regen and get the correct state.
let oldestState = postState;
for (const parentBlock of chain.forkChoice.iterateAncestorBlocks(toHexString(block.message.parentRoot))) {
// We want at least a state at the slot 0 of checkpoint.epoch
if (parentBlock.slot < checkpointSlot) {
break;
}

const parentBlockState = chain.stateCache.get(parentBlock.stateRoot);
if (parentBlockState) {
oldestState = parentBlockState;
}
}

// TODO: Use regen to get correct state. Note that making this function async can break the import flow.
// Also note that it can dead lock regen and block processing since both have a concurrency of 1.

chain.logger.error("State for currentJustifiedCheckpoint not available, using closest state", {
checkpointEpoch: checkpointHex.epoch,
checkpointRoot: checkpointHex.rootHex,
stateSlot: oldestState.slot,
stateRoot: toHexString(oldestState.hashTreeRoot()),
});

return oldestState;
}
8 changes: 4 additions & 4 deletions packages/beacon-node/src/chain/blocks/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@ export type FullyVerifiedBlockFlags = {
* Used by range sync.
*/
ignoreIfFinalized?: boolean;
/**
* If the execution payload couldnt be verified because of EL syncing status, used in optimistic sync or for merge block
*/
executionStatus?: ExecutionStatus;
};

export type PartiallyVerifiedBlockFlags = FullyVerifiedBlockFlags & {
Expand Down Expand Up @@ -52,6 +48,10 @@ export type FullyVerifiedBlock = FullyVerifiedBlockFlags & {
postState: CachedBeaconStateAllForks;
parentBlockSlot: Slot;
proposerBalanceDiff: number;
/**
* If the execution payload couldnt be verified because of EL syncing status, used in optimistic sync or for merge block
*/
executionStatus: ExecutionStatus;
};

/**
Expand Down
97 changes: 97 additions & 0 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import path from "node:path";
import {
BeaconStateAllForks,
CachedBeaconStateAllForks,
computeEpochAtSlot,
computeStartSlotAtEpoch,
createCachedBeaconState,
EffectiveBalanceIncrements,
getEffectiveBalanceIncrementsZeroInactive,
Index2PubkeyCache,
PubkeyIndexMap,
} from "@lodestar/state-transition";
Expand Down Expand Up @@ -53,6 +56,7 @@ import {ReprocessController} from "./reprocess.js";
import {SeenAggregatedAttestations} from "./seenCache/seenAggregateAndProof.js";
import {SeenBlockAttesters} from "./seenCache/seenBlockAttesters.js";
import {BeaconProposerCache} from "./beaconProposerCache.js";
import {CheckpointBalancesCache} from "./balancesCache.js";
import {ChainEvent} from "./index.js";

export class BeaconChain implements IBeaconChain {
Expand Down Expand Up @@ -97,6 +101,7 @@ export class BeaconChain implements IBeaconChain {
readonly index2pubkey: Index2PubkeyCache;

readonly beaconProposerCache: BeaconProposerCache;
readonly checkpointBalancesCache: CheckpointBalancesCache;

protected readonly blockProcessor: BlockProcessor;
protected readonly db: IBeaconDb;
Expand Down Expand Up @@ -159,6 +164,7 @@ export class BeaconChain implements IBeaconChain {
this.index2pubkey = [];

this.beaconProposerCache = new BeaconProposerCache(opts);
this.checkpointBalancesCache = new CheckpointBalancesCache();

// Restore state caches
const cachedState = createCachedBeaconState(anchorState, {
Expand All @@ -176,6 +182,7 @@ export class BeaconChain implements IBeaconChain {
clock.currentSlot,
cachedState,
opts.proposerBoostEnabled,
this.justifiedBalancesGetter.bind(this),
metrics
);
const regen = new QueuedStateRegenerator({
Expand Down Expand Up @@ -208,6 +215,7 @@ export class BeaconChain implements IBeaconChain {
seenAggregatedAttestations: this.seenAggregatedAttestations,
seenBlockAttesters: this.seenBlockAttesters,
beaconProposerCache: this.beaconProposerCache,
checkpointBalancesCache: this.checkpointBalancesCache,
reprocessController: this.reprocessController,
emitter,
config,
Expand Down Expand Up @@ -353,6 +361,95 @@ export class BeaconChain implements IBeaconChain {
}
}

/**
* `ForkChoice.onBlock` must never throw for a block that is valid with respect to the network
* `justifiedBalancesGetter()` must never throw and it should always return a state.
* @param blockState state that declares justified checkpoint `checkpoint`
*/
private justifiedBalancesGetter(
checkpoint: CheckpointWithHex,
blockState: CachedBeaconStateAllForks
): EffectiveBalanceIncrements {
this.metrics?.balancesCache.requests.inc();

const effectiveBalances = this.checkpointBalancesCache.get(checkpoint);
if (effectiveBalances) {
return effectiveBalances;
} else {
// not expected, need metrics
this.metrics?.balancesCache.misses.inc();
this.logger.debug("checkpointBalances cache miss", {
epoch: checkpoint.epoch,
root: checkpoint.rootHex,
});

const {state, stateId, shouldWarn} = this.closestJustifiedBalancesStateToCheckpoint(checkpoint, blockState);
this.metrics?.balancesCache.closestStateResult.inc({stateId});
if (shouldWarn) {
this.logger.warn("currentJustifiedCheckpoint state not avail, using closest state", {
checkpointEpoch: checkpoint.epoch,
checkpointRoot: checkpoint.rootHex,
stateId,
stateSlot: state.slot,
stateRoot: toHex(state.hashTreeRoot()),
});
}

return getEffectiveBalanceIncrementsZeroInactive(state);
}
}

/**
* - Assumptions + invariant this function is based on:
* - Our cache can only persist X states at once to prevent OOM
* - Some old states (including to-be justified checkpoint) may / must be dropped from the cache
* - Thus, there is no guarantee that the state for a justified checkpoint will be available in the cache
* @param blockState state that declares justified checkpoint `checkpoint`
*/
private closestJustifiedBalancesStateToCheckpoint(
checkpoint: CheckpointWithHex,
blockState: CachedBeaconStateAllForks
): {state: CachedBeaconStateAllForks; stateId: string; shouldWarn: boolean} {
const state = this.checkpointStateCache.get(checkpoint);
if (state) {
return {state, stateId: "checkpoint_state", shouldWarn: false};
}

// Check if blockState is in the same epoch, not need to iterate the fork-choice then
if (computeEpochAtSlot(blockState.slot) === checkpoint.epoch) {
return {state: blockState, stateId: "block_state_same_epoch", shouldWarn: true};
}

// Find a state in the same branch of checkpoint at same epoch. Balances should exactly the same
for (const descendantBlock of this.forkChoice.forwardIterateDescendants(checkpoint.rootHex)) {
if (computeEpochAtSlot(descendantBlock.slot) === checkpoint.epoch) {
const descendantBlockState = this.stateCache.get(descendantBlock.stateRoot);
if (descendantBlockState) {
return {state: descendantBlockState, stateId: "descendant_state_same_epoch", shouldWarn: true};
}
}
}

// Check if blockState is in the next epoch, not need to iterate the fork-choice then
if (computeEpochAtSlot(blockState.slot) === checkpoint.epoch + 1) {
return {state: blockState, stateId: "block_state_next_epoch", shouldWarn: true};
}

// Find a state in the same branch of checkpoint at a latter epoch. Balances are not the same, but should be close
// Note: must call .forwardIterateDescendants() again since nodes are not sorted
for (const descendantBlock of this.forkChoice.forwardIterateDescendants(checkpoint.rootHex)) {
if (computeEpochAtSlot(descendantBlock.slot) > checkpoint.epoch) {
const descendantBlockState = this.stateCache.get(descendantBlock.stateRoot);
if (descendantBlockState) {
return {state: blockState, stateId: "descendant_state_latter_epoch", shouldWarn: true};
}
}
}

// If there's no state available in the same branch of checkpoint use blockState regardless of its epoch
return {state: blockState, stateId: "block_state_any_epoch", shouldWarn: true};
}

private async persistInvalidSszObject(
typeName: string,
bytes: Uint8Array,
Expand Down

0 comments on commit 3ca87fd

Please sign in to comment.