Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Checkpoint balances cache #4290

Merged
merged 7 commits into from
Jul 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 52 additions & 0 deletions packages/beacon-node/src/chain/balancesCache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import {
CachedBeaconStateAllForks,
computeStartSlotAtEpoch,
EffectiveBalanceIncrements,
getBlockRootAtSlot,
getEffectiveBalanceIncrementsZeroInactive,
} from "@lodestar/state-transition";
import {CheckpointWithHex} from "@lodestar/fork-choice";
import {Epoch, RootHex} from "@lodestar/types";
import {toHexString} from "@lodestar/utils";

/** The number of validator balance sets that are cached within `CheckpointBalancesCache`. */
const MAX_BALANCE_CACHE_SIZE = 4;

type BalancesCacheItem = {
rootHex: RootHex;
epoch: Epoch;
balances: EffectiveBalanceIncrements;
};

/**
* Cache EffectiveBalanceIncrements of checkpoint blocks
*/
export class CheckpointBalancesCache {
private readonly items: BalancesCacheItem[] = [];

/**
* Inspect the given `state` and determine the root of the block at the first slot of
* `state.current_epoch`. If there is not already some entry for the given block root, then
* add the effective balances from the `state` to the cache.
*/
processState(blockRootHex: RootHex, state: CachedBeaconStateAllForks): void {
twoeths marked this conversation as resolved.
Show resolved Hide resolved
const epoch = state.epochCtx.currentShuffling.epoch;
dapplion marked this conversation as resolved.
Show resolved Hide resolved
const epochBoundarySlot = computeStartSlotAtEpoch(epoch);
const epochBoundaryRoot =
epochBoundarySlot === state.slot ? blockRootHex : toHexString(getBlockRootAtSlot(state, epochBoundarySlot));

const index = this.items.findIndex((item) => item.epoch === epoch && item.rootHex == epochBoundaryRoot);
if (index === -1) {
if (this.items.length === MAX_BALANCE_CACHE_SIZE) {
this.items.shift();
}
// expect to reach this once per epoch
this.items.push({epoch, rootHex: epochBoundaryRoot, balances: getEffectiveBalanceIncrementsZeroInactive(state)});
}
}

get(checkpoint: CheckpointWithHex): EffectiveBalanceIncrements | undefined {
const {rootHex, epoch} = checkpoint;
return this.items.find((item) => item.epoch === epoch && item.rootHex === rootHex)?.balances;
}
}
120 changes: 10 additions & 110 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
import {altair, allForks, ssz} from "@lodestar/types";
import {altair, ssz} from "@lodestar/types";
import {SLOTS_PER_EPOCH} from "@lodestar/params";
import {toHexString} from "@chainsafe/ssz";
import {
CachedBeaconStateAllForks,
CachedBeaconStateAltair,
computeStartSlotAtEpoch,
getEffectiveBalanceIncrementsZeroInactive,
computeEpochAtSlot,
RootCache,
} from "@lodestar/state-transition";
import {IForkChoice, OnBlockPrecachedData, ForkChoiceError, ForkChoiceErrorCode} from "@lodestar/fork-choice";
import {CachedBeaconStateAltair, computeEpochAtSlot, RootCache} from "@lodestar/state-transition";
import {IForkChoice, ForkChoiceError, ForkChoiceErrorCode} from "@lodestar/fork-choice";
import {ILogger} from "@lodestar/utils";
import {IChainForkConfig} from "@lodestar/config";
import {IMetrics} from "../../metrics/index.js";
Expand All @@ -26,6 +19,7 @@ import {IEth1ForBlockProduction} from "../../eth1/index.js";
import {BeaconProposerCache} from "../beaconProposerCache.js";
import {IBeaconClock} from "../clock/index.js";
import {ReprocessController, REPROCESS_MIN_TIME_TO_NEXT_SLOT_SEC} from "../reprocess.js";
import {CheckpointBalancesCache} from "../balancesCache.js";
import {FullyVerifiedBlock} from "./types.js";
import {PendingEvents} from "./utils/pendingEvents.js";
import {getCheckpointFromState} from "./utils/checkpoint.js";
Expand All @@ -43,6 +37,7 @@ export type ImportBlockModules = {
seenAggregatedAttestations: SeenAggregatedAttestations;
seenBlockAttesters: SeenBlockAttesters;
beaconProposerCache: BeaconProposerCache;
checkpointBalancesCache: CheckpointBalancesCache;
reprocessController: ReprocessController;
lightClientServer: LightClientServer;
eth1: IEth1ForBlockProduction;
Expand Down Expand Up @@ -93,19 +88,12 @@ export async function importBlock(chain: ImportBlockModules, fullyVerifiedBlock:
//
// current justified checkpoint should be prev epoch or current epoch if it's just updated
// it should always have epochBalances there bc it's a checkpoint state, ie got through processEpoch
const justifiedCheckpoint = postState.currentJustifiedCheckpoint;

const onBlockPrecachedData: OnBlockPrecachedData = {
executionStatus,
blockDelaySec: (Math.floor(Date.now() / 1000) - postState.genesisTime) % chain.config.SECONDS_PER_SLOT,
};
if (justifiedCheckpoint.epoch > chain.forkChoice.getJustifiedCheckpoint().epoch) {
const state = getStateForJustifiedBalances(chain, postState, block);
onBlockPrecachedData.justifiedBalances = getEffectiveBalanceIncrementsZeroInactive(state);
}

const prevFinalizedEpoch = chain.forkChoice.getFinalizedCheckpoint().epoch;
chain.forkChoice.onBlock(block.message, postState, onBlockPrecachedData);
const blockDelaySec = (Math.floor(Date.now() / 1000) - postState.genesisTime) % chain.config.SECONDS_PER_SLOT;
const blockRoot = toHexString(chain.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message));
// Should compute checkpoint balances before forkchoice.onBlock
chain.checkpointBalancesCache.processState(blockRoot, postState);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling fc_store.onVerifiedBlock() in a similar pattern as lighthouse sounds better than here.

chain.forkChoice.onBlock(block.message, postState, blockDelaySec, executionStatus);

// - Register state and block to the validator monitor
// TODO
Expand Down Expand Up @@ -312,8 +300,6 @@ export async function importBlock(chain: ImportBlockModules, fullyVerifiedBlock:
chain.metrics?.proposerBalanceDiffAny.observe(fullyVerifiedBlock.proposerBalanceDiff);
chain.metrics?.registerImportedBlock(block.message, fullyVerifiedBlock);

// Note: in-lined from previous handler of ChainEvent.block
const blockRoot = toHexString(chain.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message));
const advancedSlot = chain.clock.slotWithFutureTolerance(REPROCESS_MIN_TIME_TO_NEXT_SLOT_SEC);

chain.reprocessController.onBlockImported({slot: block.message.slot, root: blockRoot}, advancedSlot);
Expand All @@ -324,89 +310,3 @@ export async function importBlock(chain: ImportBlockModules, fullyVerifiedBlock:
delaySec: chain.clock.secFromSlot(block.message.slot),
});
}

/**
* Returns the closest state to postState.currentJustifiedCheckpoint in the same fork as postState
*
* From the spec https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/fork-choice.md#get_latest_attesting_balance
* The state from which to read balances is:
*
* ```python
* state = store.checkpoint_states[store.justified_checkpoint]
* ```
*
* ```python
* def store_target_checkpoint_state(store: Store, target: Checkpoint) -> None:
* # Store target checkpoint state if not yet seen
* if target not in store.checkpoint_states:
* base_state = copy(store.block_states[target.root])
* if base_state.slot < compute_start_slot_at_epoch(target.epoch):
* process_slots(base_state, compute_start_slot_at_epoch(target.epoch))
* store.checkpoint_states[target] = base_state
* ```
*
* So the state to get justified balances is the post state of `checkpoint.root` dialed forward to the first slot in
* `checkpoint.epoch` if that block is not in `checkpoint.epoch`.
*/
function getStateForJustifiedBalances(
chain: ImportBlockModules,
postState: CachedBeaconStateAllForks,
block: allForks.SignedBeaconBlock
): CachedBeaconStateAllForks {
const justifiedCheckpoint = postState.currentJustifiedCheckpoint;
const checkpointHex = toCheckpointHex(justifiedCheckpoint);
const checkpointSlot = computeStartSlotAtEpoch(checkpointHex.epoch);

// First, check if the checkpoint block in the checkpoint epoch, by getting the block summary from the fork-choice
const checkpointBlock = chain.forkChoice.getBlockHex(checkpointHex.rootHex);
if (!checkpointBlock) {
// Should never happen
return postState;
}

// NOTE: The state of block checkpointHex.rootHex may be prior to the justified checkpoint if it was a skipped slot.
if (checkpointBlock.slot >= checkpointSlot) {
const checkpointBlockState = chain.stateCache.get(checkpointBlock.stateRoot);
if (checkpointBlockState) {
return checkpointBlockState;
}
}

// If here, the first slot of `checkpoint.epoch` is a skipped slot. Check if the state is in the checkpoint cache.
// NOTE: This state and above are correct with the spec.
// NOTE: If the first slot of the epoch was skipped and the node is syncing, this state won't be in the cache.
const state = chain.checkpointStateCache.get(checkpointHex);
if (state) {
return state;
}

// If it's not found, then find the oldest state in the same chain as this one
// NOTE: If `block.message.parentRoot` is not in the fork-choice, `iterateAncestorBlocks()` returns `[]`
// NOTE: This state is not be correct with the spec, it may have extra modifications from multiple blocks.
// However, it's a best effort before triggering an async regen process. In the future this should be fixed
// to use regen and get the correct state.
let oldestState = postState;
for (const parentBlock of chain.forkChoice.iterateAncestorBlocks(toHexString(block.message.parentRoot))) {
// We want at least a state at the slot 0 of checkpoint.epoch
if (parentBlock.slot < checkpointSlot) {
break;
}

const parentBlockState = chain.stateCache.get(parentBlock.stateRoot);
if (parentBlockState) {
oldestState = parentBlockState;
}
}

// TODO: Use regen to get correct state. Note that making this function async can break the import flow.
// Also note that it can dead lock regen and block processing since both have a concurrency of 1.

chain.logger.error("State for currentJustifiedCheckpoint not available, using closest state", {
checkpointEpoch: checkpointHex.epoch,
checkpointRoot: checkpointHex.rootHex,
stateSlot: oldestState.slot,
stateRoot: toHexString(oldestState.hashTreeRoot()),
});

return oldestState;
}
8 changes: 4 additions & 4 deletions packages/beacon-node/src/chain/blocks/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@ export type FullyVerifiedBlockFlags = {
* Used by range sync.
*/
ignoreIfFinalized?: boolean;
/**
* If the execution payload couldnt be verified because of EL syncing status, used in optimistic sync or for merge block
*/
executionStatus?: ExecutionStatus;
};

export type PartiallyVerifiedBlockFlags = FullyVerifiedBlockFlags & {
Expand Down Expand Up @@ -52,6 +48,10 @@ export type FullyVerifiedBlock = FullyVerifiedBlockFlags & {
postState: CachedBeaconStateAllForks;
parentBlockSlot: Slot;
proposerBalanceDiff: number;
/**
* If the execution payload couldnt be verified because of EL syncing status, used in optimistic sync or for merge block
*/
executionStatus: ExecutionStatus;
};

/**
Expand Down
97 changes: 97 additions & 0 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import path from "node:path";
import {
BeaconStateAllForks,
CachedBeaconStateAllForks,
computeEpochAtSlot,
computeStartSlotAtEpoch,
createCachedBeaconState,
EffectiveBalanceIncrements,
getEffectiveBalanceIncrementsZeroInactive,
Index2PubkeyCache,
PubkeyIndexMap,
} from "@lodestar/state-transition";
Expand Down Expand Up @@ -53,6 +56,7 @@ import {ReprocessController} from "./reprocess.js";
import {SeenAggregatedAttestations} from "./seenCache/seenAggregateAndProof.js";
import {SeenBlockAttesters} from "./seenCache/seenBlockAttesters.js";
import {BeaconProposerCache} from "./beaconProposerCache.js";
import {CheckpointBalancesCache} from "./balancesCache.js";
import {ChainEvent} from "./index.js";

export class BeaconChain implements IBeaconChain {
Expand Down Expand Up @@ -97,6 +101,7 @@ export class BeaconChain implements IBeaconChain {
readonly index2pubkey: Index2PubkeyCache;

readonly beaconProposerCache: BeaconProposerCache;
readonly checkpointBalancesCache: CheckpointBalancesCache;

protected readonly blockProcessor: BlockProcessor;
protected readonly db: IBeaconDb;
Expand Down Expand Up @@ -159,6 +164,7 @@ export class BeaconChain implements IBeaconChain {
this.index2pubkey = [];

this.beaconProposerCache = new BeaconProposerCache(opts);
this.checkpointBalancesCache = new CheckpointBalancesCache();

// Restore state caches
const cachedState = createCachedBeaconState(anchorState, {
Expand All @@ -176,6 +182,7 @@ export class BeaconChain implements IBeaconChain {
clock.currentSlot,
cachedState,
opts.proposerBoostEnabled,
this.justifiedBalancesGetter.bind(this),
metrics
);
const regen = new QueuedStateRegenerator({
Expand Down Expand Up @@ -208,6 +215,7 @@ export class BeaconChain implements IBeaconChain {
seenAggregatedAttestations: this.seenAggregatedAttestations,
seenBlockAttesters: this.seenBlockAttesters,
beaconProposerCache: this.beaconProposerCache,
checkpointBalancesCache: this.checkpointBalancesCache,
reprocessController: this.reprocessController,
emitter,
config,
Expand Down Expand Up @@ -353,6 +361,95 @@ export class BeaconChain implements IBeaconChain {
}
}

/**
* `ForkChoice.onBlock` must never throw for a block that is valid with respect to the network
* `justifiedBalancesGetter()` must never throw and it should always return a state.
* @param blockState state that declares justified checkpoint `checkpoint`
*/
private justifiedBalancesGetter(
checkpoint: CheckpointWithHex,
blockState: CachedBeaconStateAllForks
): EffectiveBalanceIncrements {
this.metrics?.balancesCache.requests.inc();

const effectiveBalances = this.checkpointBalancesCache.get(checkpoint);
if (effectiveBalances) {
return effectiveBalances;
} else {
// not expected, need metrics
this.metrics?.balancesCache.misses.inc();
this.logger.debug("checkpointBalances cache miss", {
epoch: checkpoint.epoch,
root: checkpoint.rootHex,
});

const {state, stateId, shouldWarn} = this.closestJustifiedBalancesStateToCheckpoint(checkpoint, blockState);
this.metrics?.balancesCache.closestStateResult.inc({stateId});
if (shouldWarn) {
this.logger.warn("currentJustifiedCheckpoint state not avail, using closest state", {
checkpointEpoch: checkpoint.epoch,
checkpointRoot: checkpoint.rootHex,
stateId,
stateSlot: state.slot,
stateRoot: toHex(state.hashTreeRoot()),
});
}

return getEffectiveBalanceIncrementsZeroInactive(state);
}
}

/**
* - Assumptions + invariant this function is based on:
* - Our cache can only persist X states at once to prevent OOM
* - Some old states (including to-be justified checkpoint) may / must be dropped from the cache
* - Thus, there is no guarantee that the state for a justified checkpoint will be available in the cache
* @param blockState state that declares justified checkpoint `checkpoint`
*/
private closestJustifiedBalancesStateToCheckpoint(
checkpoint: CheckpointWithHex,
blockState: CachedBeaconStateAllForks
): {state: CachedBeaconStateAllForks; stateId: string; shouldWarn: boolean} {
const state = this.checkpointStateCache.get(checkpoint);
if (state) {
return {state, stateId: "checkpoint_state", shouldWarn: false};
}

// Check if blockState is in the same epoch, not need to iterate the fork-choice then
if (computeEpochAtSlot(blockState.slot) === checkpoint.epoch) {
return {state: blockState, stateId: "block_state_same_epoch", shouldWarn: true};
}

// Find a state in the same branch of checkpoint at same epoch. Balances should exactly the same
for (const descendantBlock of this.forkChoice.forwardIterateDescendants(checkpoint.rootHex)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we consume forwardIterateDescendants only once to improve performance a bit? either extract to an array, or that function could return an array itself

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's very unlikely to hit this code path I would not do that for simplicity. Metrics will clearly show if we have to run the iterator twice too often, only then optimize

if (computeEpochAtSlot(descendantBlock.slot) === checkpoint.epoch) {
const descendantBlockState = this.stateCache.get(descendantBlock.stateRoot);
if (descendantBlockState) {
return {state: descendantBlockState, stateId: "descendant_state_same_epoch", shouldWarn: true};
}
}
}

// Check if blockState is in the next epoch, not need to iterate the fork-choice then
if (computeEpochAtSlot(blockState.slot) === checkpoint.epoch + 1) {
return {state: blockState, stateId: "block_state_next_epoch", shouldWarn: true};
}

// Find a state in the same branch of checkpoint at a latter epoch. Balances are not the same, but should be close
// Note: must call .forwardIterateDescendants() again since nodes are not sorted
for (const descendantBlock of this.forkChoice.forwardIterateDescendants(checkpoint.rootHex)) {
if (computeEpochAtSlot(descendantBlock.slot) > checkpoint.epoch) {
const descendantBlockState = this.stateCache.get(descendantBlock.stateRoot);
if (descendantBlockState) {
return {state: blockState, stateId: "descendant_state_latter_epoch", shouldWarn: true};
}
}
}

// If there's no state available in the same branch of checkpoint use blockState regardless of its epoch
return {state: blockState, stateId: "block_state_any_epoch", shouldWarn: true};
}

private async persistInvalidSszObject(
typeName: string,
bytes: Uint8Array,
Expand Down