Skip to content

Commit

Permalink
Merge 0f363ae into 918924e
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths committed Mar 6, 2024
2 parents 918924e + 0f363ae commit 839606c
Show file tree
Hide file tree
Showing 11 changed files with 108 additions and 53 deletions.
1 change: 1 addition & 0 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ export async function importBlock(
const checkpointState = postState;
const cp = getCheckpointFromState(checkpointState);
this.regen.addCheckpointState(cp, checkpointState);
// consumers should not mutate or get the transfered cache
this.emitter.emit(ChainEvent.checkpoint, cp, checkpointState.clone(true));

// Note: in-lined code from previos handler of ChainEvent.checkpoint
Expand Down
1 change: 1 addition & 0 deletions packages/beacon-node/src/chain/blocks/verifyBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ export async function verifyBlocksInEpoch(
// TODO: Skip in process chain segment
// Retrieve preState from cache (regen)
const preState0 = await this.regen
// transfer cache to process faster, postState will be in block state cache
.getPreState(block0.message, {dontTransferCache: false}, RegenCaller.processBlocksInEpoch)
.catch((e) => {
throw new BlockError(block0, {code: BlockErrorCode.PRESTATE_MISSING, error: e as Error});
Expand Down
1 change: 1 addition & 0 deletions packages/beacon-node/src/chain/prepareNextSlot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ export class PrepareNextSlotScheduler {
headRoot,
prepareSlot,
// the slot 0 of next epoch will likely use this Previous Root Checkpoint state for state transition so we transfer cache here
// the resulting state with cache will be cached in Checkpoint State Cache which is used for the upcoming block processing
// for other slots dontTransferCached=true because we don't run state transition on this state
{dontTransferCache: !isEpochTransition},
RegenCaller.precomputeEpoch
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/regen/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,5 @@ export interface IStateRegeneratorInternal {
/**
* Return the exact state with `stateRoot`
*/
getState(stateRoot: RootHex, rCaller: RegenCaller): Promise<CachedBeaconStateAllForks>;
getState(stateRoot: RootHex, rCaller: RegenCaller, opts?: StateCloneOpts): Promise<CachedBeaconStateAllForks>;
}
53 changes: 40 additions & 13 deletions packages/beacon-node/src/chain/regen/queued.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,23 @@ export class QueuedStateRegenerator implements IStateRegenerator {
return [...this.stateCache.dumpSummary(), ...this.checkpointStateCache.dumpSummary()];
}

/**
* Get a state from block state cache.
* This is not for block processing so don't transfer cache
*/
getStateSync(stateRoot: RootHex): CachedBeaconStateAllForks | null {
return this.stateCache.get(stateRoot);
return this.stateCache.get(stateRoot, {dontTransferCache: true});
}

getPreStateSync(block: allForks.BeaconBlock): CachedBeaconStateAllForks | null {
/**
* Get state for block processing.
* By default, do not transfer cache except for the block at clock slot
* which is usually the gossip block.
*/
getPreStateSync(
block: allForks.BeaconBlock,
opts: StateCloneOpts = {dontTransferCache: true}
): CachedBeaconStateAllForks | null {
const parentRoot = toHexString(block.parentRoot);
const parentBlock = this.forkChoice.getBlockHex(parentRoot);
if (!parentBlock) {
Expand All @@ -91,7 +103,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {

// Check the checkpoint cache (if the pre-state is a checkpoint state)
if (parentEpoch < blockEpoch) {
const checkpointState = this.checkpointStateCache.getLatest(parentRoot, blockEpoch);
const checkpointState = this.checkpointStateCache.getLatest(parentRoot, blockEpoch, opts);
if (checkpointState && computeEpochAtSlot(checkpointState.slot) === blockEpoch) {
return checkpointState;
}
Expand All @@ -101,7 +113,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {
// Otherwise the state transition may not be cached and wasted. Queue for regen since the
// work required will still be significant.
if (parentEpoch === blockEpoch) {
const state = this.stateCache.get(parentBlock.stateRoot);
const state = this.stateCache.get(parentBlock.stateRoot, opts);
if (state) {
return state;
}
Expand All @@ -114,12 +126,21 @@ export class QueuedStateRegenerator implements IStateRegenerator {
return this.checkpointStateCache.getStateOrBytes(cp);
}

/**
* Get checkpoint state from cache, this function is not for block processing so don't transfer cache
*/
getCheckpointStateSync(cp: CheckpointHex): CachedBeaconStateAllForks | null {
return this.checkpointStateCache.get(cp);
return this.checkpointStateCache.get(cp, {dontTransferCache: true});
}

/**
* Get state closest to head, this function is not for block processing so don't transfer cache
*/
getClosestHeadState(head: ProtoBlock): CachedBeaconStateAllForks | null {
return this.checkpointStateCache.getLatest(head.blockRoot, Infinity) || this.stateCache.get(head.stateRoot);
const opts = {dontTransferCache: true};
return (
this.checkpointStateCache.getLatest(head.blockRoot, Infinity, opts) || this.stateCache.get(head.stateRoot, opts)
);
}

pruneOnCheckpoint(finalizedEpoch: Epoch, justifiedEpoch: Epoch, headStateRoot: RootHex): void {
Expand All @@ -144,10 +165,12 @@ export class QueuedStateRegenerator implements IStateRegenerator {
}

updateHeadState(newHeadStateRoot: RootHex, maybeHeadState: CachedBeaconStateAllForks): void {
// the resulting state will be added to block state cache so we transfer the cache in this flow
const cloneOpts = {dontTransferCache: true};
const headState =
newHeadStateRoot === toHexString(maybeHeadState.hashTreeRoot())
? maybeHeadState
: this.stateCache.get(newHeadStateRoot);
: this.stateCache.get(newHeadStateRoot, cloneOpts);

if (headState) {
this.stateCache.setHeadState(headState);
Expand All @@ -160,7 +183,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {
// up memory for regen step below. During regen, node won't be functional but eventually head will be available
// for legacy StateContextCache only
this.stateCache.setHeadState(null);
this.regen.getState(newHeadStateRoot, RegenCaller.processBlock, allowDiskReload).then(
this.regen.getState(newHeadStateRoot, RegenCaller.processBlock, cloneOpts, allowDiskReload).then(
(headStateRegen) => this.stateCache.setHeadState(headStateRegen),
(e) => this.logger.error("Error on head state regen", {}, e)
);
Expand All @@ -183,7 +206,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {
this.metrics?.regenFnCallTotal.inc({caller: rCaller, entrypoint: RegenFnName.getPreState});

// First attempt to fetch the state from caches before queueing
const cachedState = this.getPreStateSync(block);
const cachedState = this.getPreStateSync(block, opts);

if (cachedState !== null) {
return cachedState;
Expand All @@ -202,7 +225,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {
this.metrics?.regenFnCallTotal.inc({caller: rCaller, entrypoint: RegenFnName.getCheckpointState});

// First attempt to fetch the state from cache before queueing
const checkpointState = this.checkpointStateCache.get(toCheckpointHex(cp));
const checkpointState = this.checkpointStateCache.get(toCheckpointHex(cp), opts);
if (checkpointState) {
return checkpointState;
}
Expand Down Expand Up @@ -230,18 +253,22 @@ export class QueuedStateRegenerator implements IStateRegenerator {
return this.jobQueue.push({key: "getBlockSlotState", args: [blockRoot, slot, opts, rCaller]});
}

async getState(stateRoot: RootHex, rCaller: RegenCaller): Promise<CachedBeaconStateAllForks> {
async getState(
stateRoot: RootHex,
rCaller: RegenCaller,
opts: StateCloneOpts = {dontTransferCache: true}
): Promise<CachedBeaconStateAllForks> {
this.metrics?.regenFnCallTotal.inc({caller: rCaller, entrypoint: RegenFnName.getState});

// First attempt to fetch the state from cache before queueing
const state = this.stateCache.get(stateRoot);
const state = this.stateCache.get(stateRoot, opts);
if (state) {
return state;
}

// The state is not immediately available in the cache, enqueue the job
this.metrics?.regenFnQueuedTotal.inc({caller: rCaller, entrypoint: RegenFnName.getState});
return this.jobQueue.push({key: "getState", args: [stateRoot, rCaller]});
return this.jobQueue.push({key: "getState", args: [stateRoot, rCaller, opts]});
}

private jobQueueProcessor = async (regenRequest: RegenRequest): Promise<CachedBeaconStateAllForks> => {
Expand Down
19 changes: 11 additions & 8 deletions packages/beacon-node/src/chain/regen/regen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ export class StateRegenerator implements IStateRegeneratorInternal {
}

// Otherwise, get the state normally.
return this.getState(parentBlock.stateRoot, regenCaller, allowDiskReload);
return this.getState(parentBlock.stateRoot, regenCaller, opts, allowDiskReload);
}

/**
Expand Down Expand Up @@ -121,8 +121,8 @@ export class StateRegenerator implements IStateRegeneratorInternal {
const {checkpointStateCache} = this.modules;
const epoch = computeEpochAtSlot(slot);
const latestCheckpointStateCtx = allowDiskReload
? await checkpointStateCache.getOrReloadLatest(blockRoot, epoch)
: checkpointStateCache.getLatest(blockRoot, epoch);
? await checkpointStateCache.getOrReloadLatest(blockRoot, epoch, opts)
: checkpointStateCache.getLatest(blockRoot, epoch, opts);

// If a checkpoint state exists with the given checkpoint root, it either is in requested epoch
// or needs to have empty slots processed until the requested epoch
Expand All @@ -133,7 +133,7 @@ export class StateRegenerator implements IStateRegeneratorInternal {
// Otherwise, use the fork choice to get the stateRoot from block at the checkpoint root
// regenerate that state,
// then process empty slots until the requested epoch
const blockStateCtx = await this.getState(block.stateRoot, regenCaller, allowDiskReload);
const blockStateCtx = await this.getState(block.stateRoot, regenCaller, opts, allowDiskReload);
return processSlotsByCheckpoint(this.modules, blockStateCtx, slot, regenCaller, opts);
}

Expand All @@ -145,10 +145,12 @@ export class StateRegenerator implements IStateRegeneratorInternal {
async getState(
stateRoot: RootHex,
_rCaller: RegenCaller,
opts?: StateCloneOpts,
// internal option, don't want to expose to external caller
allowDiskReload = false
): Promise<CachedBeaconStateAllForks> {
// Trivial case, state at stateRoot is already cached
const cachedStateCtx = this.modules.stateCache.get(stateRoot);
const cachedStateCtx = this.modules.stateCache.get(stateRoot, opts);
if (cachedStateCtx) {
return cachedStateCtx;
}
Expand All @@ -165,14 +167,14 @@ export class StateRegenerator implements IStateRegeneratorInternal {
const {checkpointStateCache} = this.modules;
// iterateAncestorBlocks only returns ancestor blocks, not the block itself
for (const b of this.modules.forkChoice.iterateAncestorBlocks(block.blockRoot)) {
state = this.modules.stateCache.get(b.stateRoot);
state = this.modules.stateCache.get(b.stateRoot, opts);
if (state) {
break;
}
const epoch = computeEpochAtSlot(blocksToReplay[blocksToReplay.length - 1].slot - 1);
state = allowDiskReload
? await checkpointStateCache.getOrReloadLatest(b.blockRoot, epoch)
: checkpointStateCache.getLatest(b.blockRoot, epoch);
? await checkpointStateCache.getOrReloadLatest(b.blockRoot, epoch, opts)
: checkpointStateCache.getLatest(b.blockRoot, epoch, opts);
if (state) {
break;
}
Expand Down Expand Up @@ -319,6 +321,7 @@ async function processSlotsToNearestCheckpoint(
const checkpointState = postState;
const cp = getCheckpointFromState(checkpointState);
checkpointStateCache.add(cp, checkpointState);
// consumers should not mutate or get the transfered cache
emitter.emit(ChainEvent.checkpoint, cp, checkpointState.clone(true));

// this avoids keeping our node busy processing blocks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {routes} from "@lodestar/api";
import {Metrics} from "../../metrics/index.js";
import {LinkedList} from "../../util/array.js";
import {StateCloneOpts} from "../regen/interface.js";
import {MapTracker} from "./mapMetrics.js";
import {BlockStateCache} from "./types.js";

Expand Down Expand Up @@ -70,7 +71,7 @@ export class FIFOBlockStateCache implements BlockStateCache {
/**
* Get a state from this cache given a state root hex.
*/
get(rootHex: RootHex): CachedBeaconStateAllForks | null {
get(rootHex: RootHex, opts?: StateCloneOpts): CachedBeaconStateAllForks | null {
this.metrics?.lookups.inc();
const item = this.cache.get(rootHex);
if (!item) {
Expand All @@ -80,7 +81,7 @@ export class FIFOBlockStateCache implements BlockStateCache {
this.metrics?.hits.inc();
this.metrics?.stateClonedCount.observe(item.clonedCount);

return item.clone(true);
return item.clone(opts?.dontTransferCache);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {Metrics} from "../../metrics/index.js";
import {IClock} from "../../util/clock.js";
import {ShufflingCache} from "../shufflingCache.js";
import {BufferPool, BufferWithKey} from "../../util/bufferPool.js";
import {StateCloneOpts} from "../regen/interface.js";
import {MapTracker} from "./mapMetrics.js";
import {CPStateDatastore, DatastoreKey, datastoreKeyToCheckpoint} from "./datastore/index.js";
import {CheckpointHex, CacheItemType, CheckpointStateCache} from "./types.js";
Expand Down Expand Up @@ -184,10 +185,10 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
* - Get block for processing
* - Regen head state
*/
async getOrReload(cp: CheckpointHex): Promise<CachedBeaconStateAllForks | null> {
const stateOrStateBytesData = await this.getStateOrLoadDb(cp);
async getOrReload(cp: CheckpointHex, opts?: StateCloneOpts): Promise<CachedBeaconStateAllForks | null> {
const stateOrStateBytesData = await this.getStateOrLoadDb(cp, opts);
if (stateOrStateBytesData === null || isCachedBeaconState(stateOrStateBytesData)) {
return stateOrStateBytesData?.clone(true) ?? null;
return stateOrStateBytesData?.clone(opts?.dontTransferCache) ?? null;
}
const {persistedKey, stateBytes} = stateOrStateBytesData;
const logMeta = {persistedKey: toHexString(persistedKey)};
Expand Down Expand Up @@ -242,7 +243,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
this.cache.set(cpKey, {type: CacheItemType.inMemory, state: newCachedState, persistedKey});
this.epochIndex.getOrDefault(cp.epoch).add(cp.rootHex);
// don't prune from memory here, call it at the last 1/3 of slot 0 of an epoch
return newCachedState.clone(true);
return newCachedState.clone(opts?.dontTransferCache);
} catch (e) {
this.logger.debug("Reload: error loading cached state", logMeta, e as Error);
return null;
Expand All @@ -253,7 +254,8 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
* Return either state or state bytes loaded from db.
*/
async getStateOrBytes(cp: CheckpointHex): Promise<CachedBeaconStateAllForks | Uint8Array | null> {
const stateOrLoadedState = await this.getStateOrLoadDb(cp);
// don't have to transfer cache for this specific api
const stateOrLoadedState = await this.getStateOrLoadDb(cp, {dontTransferCache: true});
if (stateOrLoadedState === null || isCachedBeaconState(stateOrLoadedState)) {
return stateOrLoadedState;
}
Expand All @@ -263,9 +265,12 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
/**
* Return either state or state bytes with persisted key loaded from db.
*/
async getStateOrLoadDb(cp: CheckpointHex): Promise<CachedBeaconStateAllForks | LoadedStateBytesData | null> {
async getStateOrLoadDb(
cp: CheckpointHex,
opts?: StateCloneOpts
): Promise<CachedBeaconStateAllForks | LoadedStateBytesData | null> {
const cpKey = toCacheKey(cp);
const inMemoryState = this.get(cpKey);
const inMemoryState = this.get(cpKey, opts);
if (inMemoryState) {
return inMemoryState;
}
Expand Down Expand Up @@ -294,7 +299,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
/**
* Similar to get() api without reloading from disk
*/
get(cpOrKey: CheckpointHex | string): CachedBeaconStateAllForks | null {
get(cpOrKey: CheckpointHex | string, opts?: StateCloneOpts): CachedBeaconStateAllForks | null {
this.metrics?.lookups.inc();
const cpKey = typeof cpOrKey === "string" ? cpOrKey : toCacheKey(cpOrKey);
const cacheItem = this.cache.get(cpKey);
Expand All @@ -312,7 +317,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
if (isInMemoryCacheItem(cacheItem)) {
const {state} = cacheItem;
this.metrics?.stateClonedCount.observe(state.clonedCount);
return state.clone(true);
return state.clone(opts?.dontTransferCache);
}

return null;
Expand Down Expand Up @@ -345,14 +350,14 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
/**
* Searches in-memory state for the latest cached state with a `root` without reload, starting with `epoch` and descending
*/
getLatest(rootHex: RootHex, maxEpoch: Epoch): CachedBeaconStateAllForks | null {
getLatest(rootHex: RootHex, maxEpoch: Epoch, opts?: StateCloneOpts): CachedBeaconStateAllForks | null {
// sort epochs in descending order, only consider epochs lte `epoch`
const epochs = Array.from(this.epochIndex.keys())
.sort((a, b) => b - a)
.filter((e) => e <= maxEpoch);
for (const epoch of epochs) {
if (this.epochIndex.get(epoch)?.has(rootHex)) {
const inMemoryClonedState = this.get({rootHex, epoch});
const inMemoryClonedState = this.get({rootHex, epoch}, opts);
if (inMemoryClonedState) {
return inMemoryClonedState;
}
Expand All @@ -368,15 +373,19 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
* - Get block for processing
* - Regen head state
*/
async getOrReloadLatest(rootHex: RootHex, maxEpoch: Epoch): Promise<CachedBeaconStateAllForks | null> {
async getOrReloadLatest(
rootHex: RootHex,
maxEpoch: Epoch,
opts?: StateCloneOpts
): Promise<CachedBeaconStateAllForks | null> {
// sort epochs in descending order, only consider epochs lte `epoch`
const epochs = Array.from(this.epochIndex.keys())
.sort((a, b) => b - a)
.filter((e) => e <= maxEpoch);
for (const epoch of epochs) {
if (this.epochIndex.get(epoch)?.has(rootHex)) {
try {
const clonedState = await this.getOrReload({rootHex, epoch});
const clonedState = await this.getOrReload({rootHex, epoch}, opts);
if (clonedState) {
return clonedState;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {Epoch, RootHex} from "@lodestar/types";
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {routes} from "@lodestar/api";
import {Metrics} from "../../metrics/index.js";
import {StateCloneOpts} from "../regen/interface.js";
import {MapTracker} from "./mapMetrics.js";
import {BlockStateCache} from "./types.js";

Expand Down Expand Up @@ -38,7 +39,7 @@ export class StateContextCache implements BlockStateCache {
}
}

get(rootHex: RootHex): CachedBeaconStateAllForks | null {
get(rootHex: RootHex, opts?: StateCloneOpts): CachedBeaconStateAllForks | null {
this.metrics?.lookups.inc();
const item = this.head?.stateRoot === rootHex ? this.head.state : this.cache.get(rootHex);
if (!item) {
Expand All @@ -48,7 +49,7 @@ export class StateContextCache implements BlockStateCache {
this.metrics?.hits.inc();
this.metrics?.stateClonedCount.observe(item.clonedCount);

return item.clone(true);
return item.clone(opts?.dontTransferCache);
}

add(item: CachedBeaconStateAllForks): void {
Expand Down

0 comments on commit 839606c

Please sign in to comment.