Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add clone option to state caches #6512

Merged
merged 2 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ export async function importBlock(
const checkpointState = postState;
const cp = getCheckpointFromState(checkpointState);
this.regen.addCheckpointState(cp, checkpointState);
// consumers should not mutate or get the transfered cache
this.emitter.emit(ChainEvent.checkpoint, cp, checkpointState.clone(true));

// Note: in-lined code from previos handler of ChainEvent.checkpoint
Expand Down
1 change: 1 addition & 0 deletions packages/beacon-node/src/chain/blocks/verifyBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ export async function verifyBlocksInEpoch(
// TODO: Skip in process chain segment
// Retrieve preState from cache (regen)
const preState0 = await this.regen
// transfer cache to process faster, postState will be in block state cache
.getPreState(block0.message, {dontTransferCache: false}, RegenCaller.processBlocksInEpoch)
.catch((e) => {
throw new BlockError(block0, {code: BlockErrorCode.PRESTATE_MISSING, error: e as Error});
Expand Down
1 change: 1 addition & 0 deletions packages/beacon-node/src/chain/prepareNextSlot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ export class PrepareNextSlotScheduler {
headRoot,
prepareSlot,
// the slot 0 of next epoch will likely use this Previous Root Checkpoint state for state transition so we transfer cache here
// the resulting state with cache will be cached in Checkpoint State Cache which is used for the upcoming block processing
// for other slots dontTransferCached=true because we don't run state transition on this state
{dontTransferCache: !isEpochTransition},
RegenCaller.precomputeEpoch
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/regen/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,5 @@ export interface IStateRegeneratorInternal {
/**
* Return the exact state with `stateRoot`
*/
getState(stateRoot: RootHex, rCaller: RegenCaller): Promise<CachedBeaconStateAllForks>;
getState(stateRoot: RootHex, rCaller: RegenCaller, opts?: StateCloneOpts): Promise<CachedBeaconStateAllForks>;
}
53 changes: 40 additions & 13 deletions packages/beacon-node/src/chain/regen/queued.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,23 @@ export class QueuedStateRegenerator implements IStateRegenerator {
return [...this.stateCache.dumpSummary(), ...this.checkpointStateCache.dumpSummary()];
}

/**
* Get a state from block state cache.
* This is not for block processing so don't transfer cache
*/
getStateSync(stateRoot: RootHex): CachedBeaconStateAllForks | null {
return this.stateCache.get(stateRoot);
return this.stateCache.get(stateRoot, {dontTransferCache: true});
}

getPreStateSync(block: allForks.BeaconBlock): CachedBeaconStateAllForks | null {
/**
* Get state for block processing.
* By default, do not transfer cache except for the block at clock slot
* which is usually the gossip block.
*/
getPreStateSync(
block: allForks.BeaconBlock,
opts: StateCloneOpts = {dontTransferCache: true}
): CachedBeaconStateAllForks | null {
const parentRoot = toHexString(block.parentRoot);
const parentBlock = this.forkChoice.getBlockHex(parentRoot);
if (!parentBlock) {
Expand All @@ -91,7 +103,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {

// Check the checkpoint cache (if the pre-state is a checkpoint state)
if (parentEpoch < blockEpoch) {
const checkpointState = this.checkpointStateCache.getLatest(parentRoot, blockEpoch);
const checkpointState = this.checkpointStateCache.getLatest(parentRoot, blockEpoch, opts);
if (checkpointState && computeEpochAtSlot(checkpointState.slot) === blockEpoch) {
return checkpointState;
}
Expand All @@ -101,7 +113,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {
// Otherwise the state transition may not be cached and wasted. Queue for regen since the
// work required will still be significant.
if (parentEpoch === blockEpoch) {
const state = this.stateCache.get(parentBlock.stateRoot);
const state = this.stateCache.get(parentBlock.stateRoot, opts);
if (state) {
return state;
}
Expand All @@ -114,12 +126,21 @@ export class QueuedStateRegenerator implements IStateRegenerator {
return this.checkpointStateCache.getStateOrBytes(cp);
}

/**
* Get checkpoint state from cache, this function is not for block processing so don't transfer cache
*/
getCheckpointStateSync(cp: CheckpointHex): CachedBeaconStateAllForks | null {
return this.checkpointStateCache.get(cp);
return this.checkpointStateCache.get(cp, {dontTransferCache: true});
}

/**
* Get state closest to head, this function is not for block processing so don't transfer cache
*/
getClosestHeadState(head: ProtoBlock): CachedBeaconStateAllForks | null {
return this.checkpointStateCache.getLatest(head.blockRoot, Infinity) || this.stateCache.get(head.stateRoot);
const opts = {dontTransferCache: true};
return (
this.checkpointStateCache.getLatest(head.blockRoot, Infinity, opts) || this.stateCache.get(head.stateRoot, opts)
);
}

pruneOnCheckpoint(finalizedEpoch: Epoch, justifiedEpoch: Epoch, headStateRoot: RootHex): void {
Expand All @@ -144,10 +165,12 @@ export class QueuedStateRegenerator implements IStateRegenerator {
}

updateHeadState(newHeadStateRoot: RootHex, maybeHeadState: CachedBeaconStateAllForks): void {
// the resulting state will be added to block state cache so we transfer the cache in this flow
const cloneOpts = {dontTransferCache: true};
const headState =
newHeadStateRoot === toHexString(maybeHeadState.hashTreeRoot())
? maybeHeadState
: this.stateCache.get(newHeadStateRoot);
: this.stateCache.get(newHeadStateRoot, cloneOpts);

if (headState) {
this.stateCache.setHeadState(headState);
Expand All @@ -160,7 +183,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {
// up memory for regen step below. During regen, node won't be functional but eventually head will be available
// for legacy StateContextCache only
this.stateCache.setHeadState(null);
this.regen.getState(newHeadStateRoot, RegenCaller.processBlock, allowDiskReload).then(
this.regen.getState(newHeadStateRoot, RegenCaller.processBlock, cloneOpts, allowDiskReload).then(
(headStateRegen) => this.stateCache.setHeadState(headStateRegen),
(e) => this.logger.error("Error on head state regen", {}, e)
);
Expand All @@ -183,7 +206,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {
this.metrics?.regenFnCallTotal.inc({caller: rCaller, entrypoint: RegenFnName.getPreState});

// First attempt to fetch the state from caches before queueing
const cachedState = this.getPreStateSync(block);
const cachedState = this.getPreStateSync(block, opts);

if (cachedState !== null) {
return cachedState;
Expand All @@ -202,7 +225,7 @@ export class QueuedStateRegenerator implements IStateRegenerator {
this.metrics?.regenFnCallTotal.inc({caller: rCaller, entrypoint: RegenFnName.getCheckpointState});

// First attempt to fetch the state from cache before queueing
const checkpointState = this.checkpointStateCache.get(toCheckpointHex(cp));
const checkpointState = this.checkpointStateCache.get(toCheckpointHex(cp), opts);
if (checkpointState) {
return checkpointState;
}
Expand Down Expand Up @@ -230,18 +253,22 @@ export class QueuedStateRegenerator implements IStateRegenerator {
return this.jobQueue.push({key: "getBlockSlotState", args: [blockRoot, slot, opts, rCaller]});
}

async getState(stateRoot: RootHex, rCaller: RegenCaller): Promise<CachedBeaconStateAllForks> {
async getState(
stateRoot: RootHex,
rCaller: RegenCaller,
opts: StateCloneOpts = {dontTransferCache: true}
): Promise<CachedBeaconStateAllForks> {
this.metrics?.regenFnCallTotal.inc({caller: rCaller, entrypoint: RegenFnName.getState});

// First attempt to fetch the state from cache before queueing
const state = this.stateCache.get(stateRoot);
const state = this.stateCache.get(stateRoot, opts);
if (state) {
return state;
}

// The state is not immediately available in the cache, enqueue the job
this.metrics?.regenFnQueuedTotal.inc({caller: rCaller, entrypoint: RegenFnName.getState});
return this.jobQueue.push({key: "getState", args: [stateRoot, rCaller]});
return this.jobQueue.push({key: "getState", args: [stateRoot, rCaller, opts]});
}

private jobQueueProcessor = async (regenRequest: RegenRequest): Promise<CachedBeaconStateAllForks> => {
Expand Down
19 changes: 11 additions & 8 deletions packages/beacon-node/src/chain/regen/regen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ export class StateRegenerator implements IStateRegeneratorInternal {
}

// Otherwise, get the state normally.
return this.getState(parentBlock.stateRoot, regenCaller, allowDiskReload);
return this.getState(parentBlock.stateRoot, regenCaller, opts, allowDiskReload);
}

/**
Expand Down Expand Up @@ -121,8 +121,8 @@ export class StateRegenerator implements IStateRegeneratorInternal {
const {checkpointStateCache} = this.modules;
const epoch = computeEpochAtSlot(slot);
const latestCheckpointStateCtx = allowDiskReload
? await checkpointStateCache.getOrReloadLatest(blockRoot, epoch)
: checkpointStateCache.getLatest(blockRoot, epoch);
? await checkpointStateCache.getOrReloadLatest(blockRoot, epoch, opts)
: checkpointStateCache.getLatest(blockRoot, epoch, opts);

// If a checkpoint state exists with the given checkpoint root, it either is in requested epoch
// or needs to have empty slots processed until the requested epoch
Expand All @@ -133,7 +133,7 @@ export class StateRegenerator implements IStateRegeneratorInternal {
// Otherwise, use the fork choice to get the stateRoot from block at the checkpoint root
// regenerate that state,
// then process empty slots until the requested epoch
const blockStateCtx = await this.getState(block.stateRoot, regenCaller, allowDiskReload);
const blockStateCtx = await this.getState(block.stateRoot, regenCaller, opts, allowDiskReload);
return processSlotsByCheckpoint(this.modules, blockStateCtx, slot, regenCaller, opts);
}

Expand All @@ -145,10 +145,12 @@ export class StateRegenerator implements IStateRegeneratorInternal {
async getState(
stateRoot: RootHex,
_rCaller: RegenCaller,
opts?: StateCloneOpts,
// internal option, don't want to expose to external caller
allowDiskReload = false
): Promise<CachedBeaconStateAllForks> {
// Trivial case, state at stateRoot is already cached
const cachedStateCtx = this.modules.stateCache.get(stateRoot);
const cachedStateCtx = this.modules.stateCache.get(stateRoot, opts);
if (cachedStateCtx) {
return cachedStateCtx;
}
Expand All @@ -165,14 +167,14 @@ export class StateRegenerator implements IStateRegeneratorInternal {
const {checkpointStateCache} = this.modules;
// iterateAncestorBlocks only returns ancestor blocks, not the block itself
for (const b of this.modules.forkChoice.iterateAncestorBlocks(block.blockRoot)) {
state = this.modules.stateCache.get(b.stateRoot);
state = this.modules.stateCache.get(b.stateRoot, opts);
if (state) {
break;
}
const epoch = computeEpochAtSlot(blocksToReplay[blocksToReplay.length - 1].slot - 1);
state = allowDiskReload
? await checkpointStateCache.getOrReloadLatest(b.blockRoot, epoch)
: checkpointStateCache.getLatest(b.blockRoot, epoch);
? await checkpointStateCache.getOrReloadLatest(b.blockRoot, epoch, opts)
: checkpointStateCache.getLatest(b.blockRoot, epoch, opts);
if (state) {
break;
}
Expand Down Expand Up @@ -319,6 +321,7 @@ async function processSlotsToNearestCheckpoint(
const checkpointState = postState;
const cp = getCheckpointFromState(checkpointState);
checkpointStateCache.add(cp, checkpointState);
// consumers should not mutate or get the transfered cache
emitter.emit(ChainEvent.checkpoint, cp, checkpointState.clone(true));

// this avoids keeping our node busy processing blocks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {routes} from "@lodestar/api";
import {Metrics} from "../../metrics/index.js";
import {LinkedList} from "../../util/array.js";
import {StateCloneOpts} from "../regen/interface.js";
import {MapTracker} from "./mapMetrics.js";
import {BlockStateCache} from "./types.js";

Expand Down Expand Up @@ -70,7 +71,7 @@ export class FIFOBlockStateCache implements BlockStateCache {
/**
* Get a state from this cache given a state root hex.
*/
get(rootHex: RootHex): CachedBeaconStateAllForks | null {
get(rootHex: RootHex, opts?: StateCloneOpts): CachedBeaconStateAllForks | null {
this.metrics?.lookups.inc();
const item = this.cache.get(rootHex);
if (!item) {
Expand All @@ -80,7 +81,7 @@ export class FIFOBlockStateCache implements BlockStateCache {
this.metrics?.hits.inc();
this.metrics?.stateClonedCount.observe(item.clonedCount);

return item.clone(true);
return item.clone(opts?.dontTransferCache);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {Metrics} from "../../metrics/index.js";
import {IClock} from "../../util/clock.js";
import {ShufflingCache} from "../shufflingCache.js";
import {BufferPool, BufferWithKey} from "../../util/bufferPool.js";
import {StateCloneOpts} from "../regen/interface.js";
import {MapTracker} from "./mapMetrics.js";
import {CPStateDatastore, DatastoreKey, datastoreKeyToCheckpoint} from "./datastore/index.js";
import {CheckpointHex, CacheItemType, CheckpointStateCache} from "./types.js";
Expand Down Expand Up @@ -184,10 +185,10 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
* - Get block for processing
* - Regen head state
*/
async getOrReload(cp: CheckpointHex): Promise<CachedBeaconStateAllForks | null> {
const stateOrStateBytesData = await this.getStateOrLoadDb(cp);
async getOrReload(cp: CheckpointHex, opts?: StateCloneOpts): Promise<CachedBeaconStateAllForks | null> {
const stateOrStateBytesData = await this.getStateOrLoadDb(cp, opts);
if (stateOrStateBytesData === null || isCachedBeaconState(stateOrStateBytesData)) {
return stateOrStateBytesData?.clone(true) ?? null;
return stateOrStateBytesData?.clone(opts?.dontTransferCache) ?? null;
}
const {persistedKey, stateBytes} = stateOrStateBytesData;
const logMeta = {persistedKey: toHexString(persistedKey)};
Expand Down Expand Up @@ -242,7 +243,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
this.cache.set(cpKey, {type: CacheItemType.inMemory, state: newCachedState, persistedKey});
this.epochIndex.getOrDefault(cp.epoch).add(cp.rootHex);
// don't prune from memory here, call it at the last 1/3 of slot 0 of an epoch
return newCachedState.clone(true);
return newCachedState.clone(opts?.dontTransferCache);
} catch (e) {
this.logger.debug("Reload: error loading cached state", logMeta, e as Error);
return null;
Expand All @@ -253,7 +254,8 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
* Return either state or state bytes loaded from db.
*/
async getStateOrBytes(cp: CheckpointHex): Promise<CachedBeaconStateAllForks | Uint8Array | null> {
const stateOrLoadedState = await this.getStateOrLoadDb(cp);
// don't have to transfer cache for this specific api
const stateOrLoadedState = await this.getStateOrLoadDb(cp, {dontTransferCache: true});
if (stateOrLoadedState === null || isCachedBeaconState(stateOrLoadedState)) {
return stateOrLoadedState;
}
Expand All @@ -263,9 +265,12 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
/**
* Return either state or state bytes with persisted key loaded from db.
*/
async getStateOrLoadDb(cp: CheckpointHex): Promise<CachedBeaconStateAllForks | LoadedStateBytesData | null> {
async getStateOrLoadDb(
cp: CheckpointHex,
opts?: StateCloneOpts
): Promise<CachedBeaconStateAllForks | LoadedStateBytesData | null> {
const cpKey = toCacheKey(cp);
const inMemoryState = this.get(cpKey);
const inMemoryState = this.get(cpKey, opts);
if (inMemoryState) {
return inMemoryState;
}
Expand Down Expand Up @@ -294,7 +299,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
/**
* Similar to get() api without reloading from disk
*/
get(cpOrKey: CheckpointHex | string): CachedBeaconStateAllForks | null {
get(cpOrKey: CheckpointHex | string, opts?: StateCloneOpts): CachedBeaconStateAllForks | null {
this.metrics?.lookups.inc();
const cpKey = typeof cpOrKey === "string" ? cpOrKey : toCacheKey(cpOrKey);
const cacheItem = this.cache.get(cpKey);
Expand All @@ -312,7 +317,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
if (isInMemoryCacheItem(cacheItem)) {
const {state} = cacheItem;
this.metrics?.stateClonedCount.observe(state.clonedCount);
return state.clone(true);
return state.clone(opts?.dontTransferCache);
}

return null;
Expand Down Expand Up @@ -345,14 +350,14 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
/**
* Searches in-memory state for the latest cached state with a `root` without reload, starting with `epoch` and descending
*/
getLatest(rootHex: RootHex, maxEpoch: Epoch): CachedBeaconStateAllForks | null {
getLatest(rootHex: RootHex, maxEpoch: Epoch, opts?: StateCloneOpts): CachedBeaconStateAllForks | null {
// sort epochs in descending order, only consider epochs lte `epoch`
const epochs = Array.from(this.epochIndex.keys())
.sort((a, b) => b - a)
.filter((e) => e <= maxEpoch);
for (const epoch of epochs) {
if (this.epochIndex.get(epoch)?.has(rootHex)) {
const inMemoryClonedState = this.get({rootHex, epoch});
const inMemoryClonedState = this.get({rootHex, epoch}, opts);
if (inMemoryClonedState) {
return inMemoryClonedState;
}
Expand All @@ -368,15 +373,19 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
* - Get block for processing
* - Regen head state
*/
async getOrReloadLatest(rootHex: RootHex, maxEpoch: Epoch): Promise<CachedBeaconStateAllForks | null> {
async getOrReloadLatest(
rootHex: RootHex,
maxEpoch: Epoch,
opts?: StateCloneOpts
): Promise<CachedBeaconStateAllForks | null> {
// sort epochs in descending order, only consider epochs lte `epoch`
const epochs = Array.from(this.epochIndex.keys())
.sort((a, b) => b - a)
.filter((e) => e <= maxEpoch);
for (const epoch of epochs) {
if (this.epochIndex.get(epoch)?.has(rootHex)) {
try {
const clonedState = await this.getOrReload({rootHex, epoch});
const clonedState = await this.getOrReload({rootHex, epoch}, opts);
if (clonedState) {
return clonedState;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {Epoch, RootHex} from "@lodestar/types";
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {routes} from "@lodestar/api";
import {Metrics} from "../../metrics/index.js";
import {StateCloneOpts} from "../regen/interface.js";
import {MapTracker} from "./mapMetrics.js";
import {BlockStateCache} from "./types.js";

Expand Down Expand Up @@ -38,7 +39,7 @@ export class StateContextCache implements BlockStateCache {
}
}

get(rootHex: RootHex): CachedBeaconStateAllForks | null {
get(rootHex: RootHex, opts?: StateCloneOpts): CachedBeaconStateAllForks | null {
this.metrics?.lookups.inc();
const item = this.head?.stateRoot === rootHex ? this.head.state : this.cache.get(rootHex);
if (!item) {
Expand All @@ -48,7 +49,7 @@ export class StateContextCache implements BlockStateCache {
this.metrics?.hits.inc();
this.metrics?.stateClonedCount.observe(item.clonedCount);

return item.clone(true);
return item.clone(opts?.dontTransferCache);
}

add(item: CachedBeaconStateAllForks): void {
Expand Down