diff --git a/packages/beacon-node/src/chain/archiver/archiveStates.ts b/packages/beacon-node/src/chain/archiver/archiveStates.ts index e3ff48b02355..151f82008d42 100644 --- a/packages/beacon-node/src/chain/archiver/archiveStates.ts +++ b/packages/beacon-node/src/chain/archiver/archiveStates.ts @@ -5,6 +5,7 @@ import {computeEpochAtSlot, computeStartSlotAtEpoch} from "@lodestar/state-trans import {CheckpointWithHex} from "@lodestar/fork-choice"; import {IBeaconDb} from "../../db/index.js"; import {IStateRegenerator} from "../regen/interface.js"; +import {getStateSlotFromBytes} from "../../util/multifork.js"; /** * Minimum number of epochs between single temp archived states @@ -83,13 +84,26 @@ export class StatesArchiver { * Only the new finalized state is stored to disk */ async archiveState(finalized: CheckpointWithHex): Promise { - const finalizedState = this.regen.getCheckpointStateSync(finalized); - if (!finalizedState) { - throw Error("No state in cache for finalized checkpoint state epoch #" + finalized.epoch); + // starting from Jan 2024, the finalized state could be from disk or in memory + const finalizedStateOrBytes = await this.regen.getCheckpointStateOrBytes(finalized); + const {rootHex} = finalized; + if (!finalizedStateOrBytes) { + throw Error(`No state in cache for finalized checkpoint state epoch #${finalized.epoch} root ${rootHex}`); + } + if (finalizedStateOrBytes instanceof Uint8Array) { + const slot = getStateSlotFromBytes(finalizedStateOrBytes); + await this.db.stateArchive.putBinary(slot, finalizedStateOrBytes); + this.logger.verbose("Archived finalized state bytes", {epoch: finalized.epoch, slot, root: rootHex}); + } else { + // state + await this.db.stateArchive.put(finalizedStateOrBytes.slot, finalizedStateOrBytes); + // don't delete states before the finalized state, auto-prune will take care of it + this.logger.verbose("Archived finalized state", { + epoch: finalized.epoch, + slot: finalizedStateOrBytes.slot, + root: rootHex, + }); } - await this.db.stateArchive.put(finalizedState.slot, finalizedState); - // don't delete states before the finalized state, auto-prune will take care of it - this.logger.verbose("Archived finalized state", {finalizedEpoch: finalized.epoch}); } } diff --git a/packages/beacon-node/src/chain/blocks/importBlock.ts b/packages/beacon-node/src/chain/blocks/importBlock.ts index 12b43359fa4e..b92446945777 100644 --- a/packages/beacon-node/src/chain/blocks/importBlock.ts +++ b/packages/beacon-node/src/chain/blocks/importBlock.ts @@ -59,10 +59,11 @@ export async function importBlock( ): Promise { const {blockInput, postState, parentBlockSlot, executionStatus} = fullyVerifiedBlock; const {block, source} = blockInput; - const blockRoot = this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message); + const {slot: blockSlot} = block.message; + const blockRoot = this.config.getForkTypes(blockSlot).BeaconBlock.hashTreeRoot(block.message); const blockRootHex = toHexString(blockRoot); const currentEpoch = computeEpochAtSlot(this.forkChoice.getTime()); - const blockEpoch = computeEpochAtSlot(block.message.slot); + const blockEpoch = computeEpochAtSlot(blockSlot); const parentEpoch = computeEpochAtSlot(parentBlockSlot); const prevFinalizedEpoch = this.forkChoice.getFinalizedCheckpoint().epoch; const blockDelaySec = (fullyVerifiedBlock.seenTimestampSec - postState.genesisTime) % this.config.SECONDS_PER_SLOT; @@ -87,17 +88,16 @@ export async function importBlock( // This adds the state necessary to process the next block // Some block event handlers require state being in state cache so need to do this before emitting EventType.block - this.regen.addPostState(postState); + this.regen.processState(blockRootHex, postState); this.metrics?.importBlock.bySource.inc({source}); - this.logger.verbose("Added block to forkchoice and state cache", {slot: block.message.slot, root: blockRootHex}); + this.logger.verbose("Added block to forkchoice and state cache", {slot: blockSlot, root: blockRootHex}); // We want to import block asap so call all event handler in the next event loop setTimeout(() => { - const slot = block.message.slot; this.emitter.emit(routes.events.EventType.block, { block: blockRootHex, - slot, + slot: blockSlot, executionOptimistic: blockSummary != null && isOptimisticBlock(blockSummary), }); @@ -106,7 +106,7 @@ export async function importBlock( const {index, kzgCommitment} = blobSidecar; this.emitter.emit(routes.events.EventType.blobSidecar, { blockRoot: blockRootHex, - slot, + slot: blockSlot, index, kzgCommitment: toHexString(kzgCommitment), versionedHash: toHexString(kzgCommitmentToVersionedHash(kzgCommitment)), @@ -171,7 +171,7 @@ export async function importBlock( correctHead, missedSlotVote, blockRootHex, - block.message.slot + blockSlot ); } catch (e) { // a block has a lot of attestations and it may has same error, we don't want to log all of them @@ -185,7 +185,7 @@ export async function importBlock( } } else { // always log other errors - this.logger.warn("Error processing attestation from block", {slot: block.message.slot}, e as Error); + this.logger.warn("Error processing attestation from block", {slot: blockSlot}, e as Error); } } } @@ -193,7 +193,7 @@ export async function importBlock( for (const {error, count} of invalidAttestationErrorsByCode.values()) { this.logger.warn( "Error processing attestations from block", - {slot: block.message.slot, erroredAttestations: count}, + {slot: blockSlot, erroredAttestations: count}, error ); } @@ -214,7 +214,7 @@ export async function importBlock( // all AttesterSlashings are valid before reaching this this.forkChoice.onAttesterSlashing(slashing); } catch (e) { - this.logger.warn("Error processing AttesterSlashing from block", {slot: block.message.slot}, e as Error); + this.logger.warn("Error processing AttesterSlashing from block", {slot: blockSlot}, e as Error); } } } @@ -297,7 +297,7 @@ export async function importBlock( parentBlockSlot ); } catch (e) { - this.logger.verbose("Error lightClientServer.onImportBlock", {slot: block.message.slot}, e as Error); + this.logger.verbose("Error lightClientServer.onImportBlock", {slot: blockSlot}, e as Error); } }, 0); } @@ -351,15 +351,15 @@ export async function importBlock( if (parentEpoch < blockEpoch) { // current epoch and previous epoch are likely cached in previous states this.shufflingCache.processState(postState, postState.epochCtx.nextShuffling.epoch); - this.logger.verbose("Processed shuffling for next epoch", {parentEpoch, blockEpoch, slot: block.message.slot}); + this.logger.verbose("Processed shuffling for next epoch", {parentEpoch, blockEpoch, slot: blockSlot}); } - if (block.message.slot % SLOTS_PER_EPOCH === 0) { + if (blockSlot % SLOTS_PER_EPOCH === 0) { // Cache state to preserve epoch transition work const checkpointState = postState; const cp = getCheckpointFromState(checkpointState); this.regen.addCheckpointState(cp, checkpointState); - this.emitter.emit(ChainEvent.checkpoint, cp, checkpointState); + this.emitter.emit(ChainEvent.checkpoint, cp, checkpointState.clone()); // Note: in-lined code from previos handler of ChainEvent.checkpoint this.logger.verbose("Checkpoint processed", toCheckpointHex(cp)); @@ -397,7 +397,7 @@ export async function importBlock( // Send block events, only for recent enough blocks - if (this.clock.currentSlot - block.message.slot < EVENTSTREAM_EMIT_RECENT_BLOCK_SLOTS) { + if (this.clock.currentSlot - blockSlot < EVENTSTREAM_EMIT_RECENT_BLOCK_SLOTS) { // NOTE: Skip looping if there are no listeners from the API if (this.emitter.listenerCount(routes.events.EventType.voluntaryExit)) { for (const voluntaryExit of block.message.body.voluntaryExits) { @@ -417,10 +417,10 @@ export async function importBlock( } // Register stat metrics about the block after importing it - this.metrics?.parentBlockDistance.observe(block.message.slot - parentBlockSlot); + this.metrics?.parentBlockDistance.observe(blockSlot - parentBlockSlot); this.metrics?.proposerBalanceDeltaAny.observe(fullyVerifiedBlock.proposerBalanceDelta); this.metrics?.registerImportedBlock(block.message, fullyVerifiedBlock); - if (this.config.getForkSeq(block.message.slot) >= ForkSeq.altair) { + if (this.config.getForkSeq(blockSlot) >= ForkSeq.altair) { this.metrics?.registerSyncAggregateInBlock( blockEpoch, (block as altair.SignedBeaconBlock).message.body.syncAggregate, @@ -433,18 +433,18 @@ export async function importBlock( // Gossip blocks need to be imported as soon as possible, waiting attestations could be processed // in the next event loop. See https://github.com/ChainSafe/lodestar/issues/4789 setTimeout(() => { - this.reprocessController.onBlockImported({slot: block.message.slot, root: blockRootHex}, advancedSlot); + this.reprocessController.onBlockImported({slot: blockSlot, root: blockRootHex}, advancedSlot); }, 0); if (opts.seenTimestampSec !== undefined) { const recvToImportedBlock = Date.now() / 1000 - opts.seenTimestampSec; this.metrics?.gossipBlock.receivedToBlockImport.observe(recvToImportedBlock); - this.logger.verbose("Imported block", {slot: block.message.slot, recvToImportedBlock}); + this.logger.verbose("Imported block", {slot: blockSlot, recvToImportedBlock}); } this.logger.verbose("Block processed", { - slot: block.message.slot, + slot: blockSlot, root: blockRootHex, - delaySec: this.clock.secFromSlot(block.message.slot), + delaySec: this.clock.secFromSlot(blockSlot), }); } diff --git a/packages/beacon-node/src/chain/chain.ts b/packages/beacon-node/src/chain/chain.ts index f20bc0dbffa2..ef47ebafbece 100644 --- a/packages/beacon-node/src/chain/chain.ts +++ b/packages/beacon-node/src/chain/chain.ts @@ -41,6 +41,7 @@ import {IExecutionEngine, IExecutionBuilder} from "../execution/index.js"; import {Clock, ClockEvent, IClock} from "../util/clock.js"; import {ensureDir, writeIfNotExist} from "../util/file.js"; import {isOptimisticBlock} from "../util/forkChoice.js"; +import {BufferPool} from "../util/bufferPool.js"; import {BlockProcessor, ImportBlockOpts} from "./blocks/index.js"; import {ChainEventEmitter, ChainEvent} from "./emitter.js"; import {IBeaconChain, ProposerPreparationData, BlockHash, StateGetOpts, CommonBlockBody} from "./interface.js"; @@ -79,7 +80,11 @@ import {SeenAttestationDatas} from "./seenCache/seenAttestationData.js"; import {ShufflingCache} from "./shufflingCache.js"; import {StateContextCache} from "./stateCache/stateContextCache.js"; import {SeenGossipBlockInput} from "./seenCache/index.js"; -import {CheckpointStateCache} from "./stateCache/stateContextCheckpointsCache.js"; +import {InMemoryCheckpointStateCache} from "./stateCache/stateContextCheckpointsCache.js"; +import {FIFOBlockStateCache} from "./stateCache/fifoBlockStateCache.js"; +import {PersistentCheckpointStateCache} from "./stateCache/persistentCheckpointsCache.js"; +import {DbCPStateDatastore} from "./stateCache/datastore/db.js"; +import {FileCPStateDatastore} from "./stateCache/datastore/file.js"; /** * Arbitrary constants, blobs and payloads should be consumed immediately in the same slot @@ -237,9 +242,28 @@ export class BeaconChain implements IBeaconChain { this.pubkey2index = cachedState.epochCtx.pubkey2index; this.index2pubkey = cachedState.epochCtx.index2pubkey; - const stateCache = new StateContextCache({metrics}); - const checkpointStateCache = new CheckpointStateCache({metrics}); - + const fileDataStore = opts.nHistoricalStatesFileDataStore ?? false; + const stateCache = this.opts.nHistoricalStates + ? new FIFOBlockStateCache(this.opts, {metrics}) + : new StateContextCache({metrics}); + const checkpointStateCache = this.opts.nHistoricalStates + ? new PersistentCheckpointStateCache( + { + metrics, + logger, + clock, + shufflingCache: this.shufflingCache, + getHeadState: this.getHeadState.bind(this), + bufferPool: new BufferPool(anchorState.type.tree_serializedSize(anchorState.node), metrics), + datastore: fileDataStore + ? // debug option if we want to investigate any issues with the DB + new FileCPStateDatastore() + : // production option + new DbCPStateDatastore(this.db), + }, + this.opts + ) + : new InMemoryCheckpointStateCache({metrics}); const {checkpoint} = computeAnchorCheckpoint(config, anchorState); stateCache.add(cachedState); stateCache.setHeadState(cachedState); @@ -333,6 +357,7 @@ export class BeaconChain implements IBeaconChain { /** Populate in-memory caches with persisted data. Call at least once on startup */ async loadFromDisk(): Promise { + await this.regen.init(); await this.opPool.fromPersisted(this.db); } diff --git a/packages/beacon-node/src/chain/forkChoice/index.ts b/packages/beacon-node/src/chain/forkChoice/index.ts index 7e195a84922d..b032159f2119 100644 --- a/packages/beacon-node/src/chain/forkChoice/index.ts +++ b/packages/beacon-node/src/chain/forkChoice/index.ts @@ -7,7 +7,7 @@ import { ForkChoiceStore, ExecutionStatus, JustifiedBalancesGetter, - ForkChoiceOpts, + ForkChoiceOpts as RealForkChoiceOpts, } from "@lodestar/fork-choice"; import { CachedBeaconStateAllForks, @@ -21,7 +21,10 @@ import {ChainEventEmitter} from "../emitter.js"; import {ChainEvent} from "../emitter.js"; import {GENESIS_SLOT} from "../../constants/index.js"; -export type {ForkChoiceOpts}; +export type ForkChoiceOpts = RealForkChoiceOpts & { + // for testing only + forkchoiceConstructor?: typeof ForkChoice; +}; /** * Fork Choice extended with a ChainEventEmitter @@ -47,7 +50,11 @@ export function initializeForkChoice( const justifiedBalances = getEffectiveBalanceIncrementsZeroInactive(state); - return new ForkChoice( + // forkchoiceConstructor is only used for some test cases + // production code use ForkChoice constructor directly + const forkchoiceConstructor = opts.forkchoiceConstructor ?? ForkChoice; + + return new forkchoiceConstructor( config, new ForkChoiceStore( diff --git a/packages/beacon-node/src/chain/options.ts b/packages/beacon-node/src/chain/options.ts index cc7795ade0a1..53aef52a19ad 100644 --- a/packages/beacon-node/src/chain/options.ts +++ b/packages/beacon-node/src/chain/options.ts @@ -4,12 +4,17 @@ import {ArchiverOpts} from "./archiver/index.js"; import {ForkChoiceOpts} from "./forkChoice/index.js"; import {LightClientServerOpts} from "./lightClient/index.js"; import {ShufflingCacheOpts} from "./shufflingCache.js"; +import {DEFAULT_MAX_BLOCK_STATES, FIFOBlockStateCacheOpts} from "./stateCache/fifoBlockStateCache.js"; +import {PersistentCheckpointStateCacheOpts} from "./stateCache/persistentCheckpointsCache.js"; +import {DEFAULT_MAX_CP_STATE_EPOCHS_IN_MEMORY} from "./stateCache/persistentCheckpointsCache.js"; export type IChainOptions = BlockProcessOpts & PoolOpts & SeenCacheOpts & ForkChoiceOpts & ArchiverOpts & + FIFOBlockStateCacheOpts & + PersistentCheckpointStateCacheOpts & ShufflingCacheOpts & LightClientServerOpts & { blsVerifyAllMainThread?: boolean; @@ -30,6 +35,8 @@ export type IChainOptions = BlockProcessOpts & trustedSetup?: string; broadcastValidationStrictness?: string; minSameMessageSignatureSetsToBatch: number; + nHistoricalStates?: boolean; + nHistoricalStatesFileDataStore?: boolean; }; export type BlockProcessOpts = { @@ -102,4 +109,8 @@ export const defaultChainOptions: IChainOptions = { // batching too much may block the I/O thread so if useWorker=false, suggest this value to be 32 // since this batch attestation work is designed to work with useWorker=true, make this the lowest value minSameMessageSignatureSetsToBatch: 2, + nHistoricalStates: false, + nHistoricalStatesFileDataStore: false, + maxBlockStates: DEFAULT_MAX_BLOCK_STATES, + maxCPStateEpochsInMemory: DEFAULT_MAX_CP_STATE_EPOCHS_IN_MEMORY, }; diff --git a/packages/beacon-node/src/chain/regen/errors.ts b/packages/beacon-node/src/chain/regen/errors.ts index 85d43d1a4fe8..7c1573b415f8 100644 --- a/packages/beacon-node/src/chain/regen/errors.ts +++ b/packages/beacon-node/src/chain/regen/errors.ts @@ -8,6 +8,7 @@ export enum RegenErrorCode { TOO_MANY_BLOCK_PROCESSED = "REGEN_ERROR_TOO_MANY_BLOCK_PROCESSED", BLOCK_NOT_IN_DB = "REGEN_ERROR_BLOCK_NOT_IN_DB", STATE_TRANSITION_ERROR = "REGEN_ERROR_STATE_TRANSITION_ERROR", + INVALID_STATE_ROOT = "REGEN_ERROR_INVALID_STATE_ROOT", } export type RegenErrorType = @@ -17,7 +18,8 @@ export type RegenErrorType = | {code: RegenErrorCode.NO_SEED_STATE} | {code: RegenErrorCode.TOO_MANY_BLOCK_PROCESSED; stateRoot: RootHex | Root} | {code: RegenErrorCode.BLOCK_NOT_IN_DB; blockRoot: RootHex | Root} - | {code: RegenErrorCode.STATE_TRANSITION_ERROR; error: Error}; + | {code: RegenErrorCode.STATE_TRANSITION_ERROR; error: Error} + | {code: RegenErrorCode.INVALID_STATE_ROOT; slot: Slot; expected: RootHex; actual: RootHex}; export class RegenError extends Error { type: RegenErrorType; diff --git a/packages/beacon-node/src/chain/regen/interface.ts b/packages/beacon-node/src/chain/regen/interface.ts index e7be64d0eecb..f1845a574f54 100644 --- a/packages/beacon-node/src/chain/regen/interface.ts +++ b/packages/beacon-node/src/chain/regen/interface.ts @@ -35,11 +35,12 @@ export interface IStateRegenerator extends IStateRegeneratorInternal { dropCache(): void; dumpCacheSummary(): routes.lodestar.StateCacheItem[]; getStateSync(stateRoot: RootHex): CachedBeaconStateAllForks | null; + getCheckpointStateOrBytes(cp: CheckpointHex): Promise; getCheckpointStateSync(cp: CheckpointHex): CachedBeaconStateAllForks | null; getClosestHeadState(head: ProtoBlock): CachedBeaconStateAllForks | null; pruneOnCheckpoint(finalizedEpoch: Epoch, justifiedEpoch: Epoch, headStateRoot: RootHex): void; pruneOnFinalized(finalizedEpoch: Epoch): void; - addPostState(postState: CachedBeaconStateAllForks): void; + processState(blockRootHex: RootHex, postState: CachedBeaconStateAllForks): void; addCheckpointState(cp: phase0.Checkpoint, item: CachedBeaconStateAllForks): void; updateHeadState(newHeadStateRoot: RootHex, maybeHeadState: CachedBeaconStateAllForks): void; updatePreComputedCheckpoint(rootHex: RootHex, epoch: Epoch): number | null; diff --git a/packages/beacon-node/src/chain/regen/queued.ts b/packages/beacon-node/src/chain/regen/queued.ts index dfda56cc1eea..446f77d76458 100644 --- a/packages/beacon-node/src/chain/regen/queued.ts +++ b/packages/beacon-node/src/chain/regen/queued.ts @@ -4,9 +4,10 @@ import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice"; import {CachedBeaconStateAllForks, computeEpochAtSlot} from "@lodestar/state-transition"; import {Logger} from "@lodestar/utils"; import {routes} from "@lodestar/api"; -import {CheckpointHex, CheckpointStateCache, StateContextCache, toCheckpointHex} from "../stateCache/index.js"; +import {CheckpointHex, toCheckpointHex} from "../stateCache/index.js"; import {Metrics} from "../../metrics/index.js"; import {JobItemQueue} from "../../util/queue/index.js"; +import {BlockStateCache, CheckpointStateCache} from "../stateCache/types.js"; import {IStateRegenerator, IStateRegeneratorInternal, RegenCaller, RegenFnName, StateCloneOpts} from "./interface.js"; import {StateRegenerator, RegenModules} from "./regen.js"; import {RegenError, RegenErrorCode} from "./errors.js"; @@ -17,7 +18,6 @@ const REGEN_CAN_ACCEPT_WORK_THRESHOLD = 16; type QueuedStateRegeneratorModules = RegenModules & { signal: AbortSignal; - logger: Logger; }; type RegenRequestKey = keyof IStateRegeneratorInternal; @@ -34,7 +34,7 @@ export class QueuedStateRegenerator implements IStateRegenerator { private readonly regen: StateRegenerator; private readonly forkChoice: IForkChoice; - private readonly stateCache: StateContextCache; + private readonly stateCache: BlockStateCache; private readonly checkpointStateCache: CheckpointStateCache; private readonly metrics: Metrics | null; private readonly logger: Logger; @@ -53,6 +53,12 @@ export class QueuedStateRegenerator implements IStateRegenerator { this.logger = modules.logger; } + async init(): Promise { + if (this.checkpointStateCache.init) { + return this.checkpointStateCache.init(); + } + } + canAcceptWork(): boolean { return this.jobQueue.jobLen < REGEN_CAN_ACCEPT_WORK_THRESHOLD; } @@ -70,6 +76,10 @@ export class QueuedStateRegenerator implements IStateRegenerator { return this.stateCache.get(stateRoot); } + async getCheckpointStateOrBytes(cp: CheckpointHex): Promise { + return this.checkpointStateCache.getStateOrBytes(cp); + } + getCheckpointStateSync(cp: CheckpointHex): CachedBeaconStateAllForks | null { return this.checkpointStateCache.get(cp); } @@ -88,8 +98,11 @@ export class QueuedStateRegenerator implements IStateRegenerator { this.stateCache.deleteAllBeforeEpoch(finalizedEpoch); } - addPostState(postState: CachedBeaconStateAllForks): void { + processState(blockRootHex: RootHex, postState: CachedBeaconStateAllForks): void { this.stateCache.add(postState); + this.checkpointStateCache.processState(blockRootHex, postState).catch((e) => { + this.logger.debug("Error processing block state", {blockRootHex, slot: postState.slot}, e); + }); } addCheckpointState(cp: phase0.Checkpoint, item: CachedBeaconStateAllForks): void { @@ -107,10 +120,13 @@ export class QueuedStateRegenerator implements IStateRegenerator { } else { // Trigger regen on head change if necessary this.logger.warn("Head state not available, triggering regen", {stateRoot: newHeadStateRoot}); + // it's important to reload state to regen head state here + const shouldReload = true; // head has changed, so the existing cached head state is no longer useful. Set strong reference to null to free // up memory for regen step below. During regen, node won't be functional but eventually head will be available + // for legacy StateContextCache only this.stateCache.setHeadState(null); - this.regen.getState(newHeadStateRoot, RegenCaller.processBlock).then( + this.regen.getState(newHeadStateRoot, RegenCaller.processBlock, shouldReload).then( (headStateRegen) => this.stateCache.setHeadState(headStateRegen), (e) => this.logger.error("Error on head state regen", {}, e) ); diff --git a/packages/beacon-node/src/chain/regen/regen.ts b/packages/beacon-node/src/chain/regen/regen.ts index 0d6bd89d8ce7..0f0c52b93cad 100644 --- a/packages/beacon-node/src/chain/regen/regen.ts +++ b/packages/beacon-node/src/chain/regen/regen.ts @@ -10,29 +10,34 @@ import { stateTransition, } from "@lodestar/state-transition"; import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice"; -import {sleep} from "@lodestar/utils"; +import {Logger, sleep} from "@lodestar/utils"; import {SLOTS_PER_EPOCH} from "@lodestar/params"; import {ChainForkConfig} from "@lodestar/config"; import {Metrics} from "../../metrics/index.js"; import {IBeaconDb} from "../../db/index.js"; -import {CheckpointStateCache, StateContextCache} from "../stateCache/index.js"; import {getCheckpointFromState} from "../blocks/utils/checkpoint.js"; import {ChainEvent, ChainEventEmitter} from "../emitter.js"; +import {CheckpointStateCache, BlockStateCache} from "../stateCache/types.js"; import {IStateRegeneratorInternal, RegenCaller, StateCloneOpts} from "./interface.js"; import {RegenError, RegenErrorCode} from "./errors.js"; export type RegenModules = { db: IBeaconDb; forkChoice: IForkChoice; - stateCache: StateContextCache; + stateCache: BlockStateCache; checkpointStateCache: CheckpointStateCache; config: ChainForkConfig; emitter: ChainEventEmitter; + logger: Logger; metrics: Metrics | null; }; /** * Regenerates states that have already been processed by the fork choice + * Since Jan 2024, we support reloading checkpoint state from disk via shouldReload flag. Due to its performance impact + * this flag is only used in this case: + * - getPreState: this is for block processing, this is important for long unfinalized chain + * - updateHeadState: rarely happen, but it's important to make sure we always can regen head state */ export class StateRegenerator implements IStateRegeneratorInternal { constructor(private readonly modules: RegenModules) {} @@ -41,6 +46,8 @@ export class StateRegenerator implements IStateRegeneratorInternal { * Get the state to run with `block`. May be: * - If parent is in same epoch -> Exact state at `block.parentRoot` * - If parent is in prev epoch -> State after `block.parentRoot` dialed forward through epoch transition + * - It's important to reload state if needed in this flow + * TODO: refactor to getStateForBlockProcessing, this getPreState() api is used to regen state for processed block only */ async getPreState( block: allForks.BeaconBlock, @@ -57,6 +64,7 @@ export class StateRegenerator implements IStateRegeneratorInternal { const parentEpoch = computeEpochAtSlot(parentBlock.slot); const blockEpoch = computeEpochAtSlot(block.slot); + const shouldReload = true; // This may save us at least one epoch transition. // If the requested state crosses an epoch boundary @@ -64,11 +72,11 @@ export class StateRegenerator implements IStateRegeneratorInternal { // We may have the checkpoint state with parent root inside the checkpoint state cache // through gossip validation. if (parentEpoch < blockEpoch) { - return this.getCheckpointState({root: block.parentRoot, epoch: blockEpoch}, opts, rCaller); + return this.getCheckpointState({root: block.parentRoot, epoch: blockEpoch}, opts, rCaller, shouldReload); } // Otherwise, get the state normally. - return this.getState(parentBlock.stateRoot, rCaller); + return this.getState(parentBlock.stateRoot, rCaller, shouldReload); } /** @@ -77,20 +85,23 @@ export class StateRegenerator implements IStateRegeneratorInternal { async getCheckpointState( cp: phase0.Checkpoint, opts: StateCloneOpts, - rCaller: RegenCaller + rCaller: RegenCaller, + shouldReload = false ): Promise { const checkpointStartSlot = computeStartSlotAtEpoch(cp.epoch); - return this.getBlockSlotState(toHexString(cp.root), checkpointStartSlot, opts, rCaller); + return this.getBlockSlotState(toHexString(cp.root), checkpointStartSlot, opts, rCaller, shouldReload); } /** * Get state after block `blockRoot` dialed forward to `slot` + * - shouldReload should be used with care, as it will cause the state to be reloaded from disk */ async getBlockSlotState( blockRoot: RootHex, slot: Slot, opts: StateCloneOpts, - rCaller: RegenCaller + rCaller: RegenCaller, + shouldReload = false ): Promise { const block = this.modules.forkChoice.getBlockHex(blockRoot); if (!block) { @@ -108,26 +119,31 @@ export class StateRegenerator implements IStateRegeneratorInternal { }); } - const latestCheckpointStateCtx = this.modules.checkpointStateCache.getLatest(blockRoot, computeEpochAtSlot(slot)); + const {checkpointStateCache} = this.modules; + const getLatestApi = shouldReload + ? checkpointStateCache.getOrReloadLatest.bind(checkpointStateCache) + : checkpointStateCache.getLatest.bind(checkpointStateCache); + const latestCheckpointStateCtx = await getLatestApi(blockRoot, computeEpochAtSlot(slot)); // If a checkpoint state exists with the given checkpoint root, it either is in requested epoch // or needs to have empty slots processed until the requested epoch if (latestCheckpointStateCtx) { - return processSlotsByCheckpoint(this.modules, latestCheckpointStateCtx, slot, opts); + return processSlotsByCheckpoint(this.modules, latestCheckpointStateCtx, slot, rCaller, opts); } // Otherwise, use the fork choice to get the stateRoot from block at the checkpoint root // regenerate that state, // then process empty slots until the requested epoch - const blockStateCtx = await this.getState(block.stateRoot, rCaller); - return processSlotsByCheckpoint(this.modules, blockStateCtx, slot, opts); + const blockStateCtx = await this.getState(block.stateRoot, rCaller, shouldReload); + return processSlotsByCheckpoint(this.modules, blockStateCtx, slot, rCaller, opts); } /** * Get state by exact root. If not in cache directly, requires finding the block that references the state from the * forkchoice and replaying blocks to get to it. + * - shouldReload should be used with care, as it will cause the state to be reloaded from disk */ - async getState(stateRoot: RootHex, _rCaller: RegenCaller): Promise { + async getState(stateRoot: RootHex, _rCaller: RegenCaller, shouldReload = false): Promise { // Trivial case, state at stateRoot is already cached const cachedStateCtx = this.modules.stateCache.get(stateRoot); if (cachedStateCtx) { @@ -143,15 +159,17 @@ export class StateRegenerator implements IStateRegeneratorInternal { // gets reversed when replayed const blocksToReplay = [block]; let state: CachedBeaconStateAllForks | null = null; - for (const b of this.modules.forkChoice.iterateAncestorBlocks(block.parentRoot)) { + const {checkpointStateCache} = this.modules; + const getLatestApi = shouldReload + ? checkpointStateCache.getOrReloadLatest.bind(checkpointStateCache) + : checkpointStateCache.getLatest.bind(checkpointStateCache); + // iterateAncestorBlocks only returns ancestor blocks, not the block itself + for (const b of this.modules.forkChoice.iterateAncestorBlocks(block.blockRoot)) { state = this.modules.stateCache.get(b.stateRoot); if (state) { break; } - state = this.modules.checkpointStateCache.getLatest( - b.blockRoot, - computeEpochAtSlot(blocksToReplay[blocksToReplay.length - 1].slot - 1) - ); + state = await getLatestApi(b.blockRoot, computeEpochAtSlot(blocksToReplay[blocksToReplay.length - 1].slot - 1)); if (state) { break; } @@ -172,6 +190,8 @@ export class StateRegenerator implements IStateRegeneratorInternal { }); } + const replaySlots = blocksToReplay.map((b) => b.slot).join(","); + this.modules.logger.debug("Replaying blocks to get state", {stateRoot, replaySlots}); for (const b of blocksToReplay.reverse()) { const block = await this.modules.db.block.get(fromHexString(b.blockRoot)); if (!block) { @@ -195,11 +215,23 @@ export class StateRegenerator implements IStateRegeneratorInternal { verifyProposer: false, verifySignatures: false, }, - null + this.modules.metrics ); - // TODO: Persist states, note that regen could be triggered by old states. - // Should those take a place in the cache? + const stateRoot = toHexString(state.hashTreeRoot()); + if (b.stateRoot !== stateRoot) { + throw new RegenError({ + slot: b.slot, + code: RegenErrorCode.INVALID_STATE_ROOT, + actual: stateRoot, + expected: b.stateRoot, + }); + } + + if (shouldReload) { + // also with shouldReload flag, we "reload" it to the state cache too + this.modules.stateCache.add(state); + } // this avoids keeping our node busy processing blocks await sleep(0); @@ -210,13 +242,14 @@ export class StateRegenerator implements IStateRegeneratorInternal { }); } } + this.modules.logger.debug("Replayed blocks to get state", {stateRoot, replaySlots}); return state; } private findFirstStateBlock(stateRoot: RootHex): ProtoBlock { for (const block of this.modules.forkChoice.forwarditerateAncestorBlocks()) { - if (block !== undefined) { + if (block.stateRoot === stateRoot) { return block; } } @@ -237,9 +270,10 @@ async function processSlotsByCheckpoint( modules: {checkpointStateCache: CheckpointStateCache; metrics: Metrics | null; emitter: ChainEventEmitter}, preState: CachedBeaconStateAllForks, slot: Slot, + rCaller: RegenCaller, opts: StateCloneOpts ): Promise { - let postState = await processSlotsToNearestCheckpoint(modules, preState, slot, opts); + let postState = await processSlotsToNearestCheckpoint(modules, preState, slot, rCaller, opts); if (postState.slot < slot) { postState = processSlots(postState, slot, opts, modules.metrics); } @@ -257,6 +291,7 @@ async function processSlotsToNearestCheckpoint( modules: {checkpointStateCache: CheckpointStateCache; metrics: Metrics | null; emitter: ChainEventEmitter}, preState: CachedBeaconStateAllForks, slot: Slot, + rCaller: RegenCaller, opts: StateCloneOpts ): Promise { const preSlot = preState.slot; @@ -272,12 +307,16 @@ async function processSlotsToNearestCheckpoint( ) { // processSlots calls .clone() before mutating postState = processSlots(postState, nextEpochSlot, opts, metrics); + modules.metrics?.epochTransitionByCaller.inc({caller: rCaller}); - // Cache state to preserve epoch transition work + // this is usually added when we prepare for next slot or validate gossip block + // then when we process the 1st block of epoch, we don't have to do state transition again + // This adds Previous Root Checkpoint State to the checkpoint state cache + // This may becomes the "official" checkpoint state if the 1st block of epoch is skipped const checkpointState = postState; const cp = getCheckpointFromState(checkpointState); checkpointStateCache.add(cp, checkpointState); - emitter.emit(ChainEvent.checkpoint, cp, checkpointState); + emitter.emit(ChainEvent.checkpoint, cp, checkpointState.clone()); // this avoids keeping our node busy processing blocks await sleep(0); diff --git a/packages/beacon-node/src/chain/stateCache/datastore/db.ts b/packages/beacon-node/src/chain/stateCache/datastore/db.ts index fef38a7f8dd2..c6c9a3ee924b 100644 --- a/packages/beacon-node/src/chain/stateCache/datastore/db.ts +++ b/packages/beacon-node/src/chain/stateCache/datastore/db.ts @@ -1,4 +1,3 @@ -import {CachedBeaconStateAllForks} from "@lodestar/state-transition"; import {phase0, ssz} from "@lodestar/types"; import {IBeaconDb} from "../../../db/interface.js"; import {CPStateDatastore, DatastoreKey} from "./types.js"; @@ -9,9 +8,8 @@ import {CPStateDatastore, DatastoreKey} from "./types.js"; export class DbCPStateDatastore implements CPStateDatastore { constructor(private readonly db: IBeaconDb) {} - async write(cpKey: phase0.Checkpoint, state: CachedBeaconStateAllForks): Promise { + async write(cpKey: phase0.Checkpoint, stateBytes: Uint8Array): Promise { const serializedCheckpoint = checkpointToDatastoreKey(cpKey); - const stateBytes = state.serialize(); await this.db.checkpointState.putBinary(serializedCheckpoint, stateBytes); return serializedCheckpoint; } diff --git a/packages/beacon-node/src/chain/stateCache/datastore/file.ts b/packages/beacon-node/src/chain/stateCache/datastore/file.ts new file mode 100644 index 000000000000..17de7c4b7d73 --- /dev/null +++ b/packages/beacon-node/src/chain/stateCache/datastore/file.ts @@ -0,0 +1,52 @@ +import path from "node:path"; +import {toHexString, fromHexString} from "@chainsafe/ssz"; +import {phase0, ssz} from "@lodestar/types"; +import {ensureDir, readFile, readFileNames, removeFile, writeIfNotExist} from "../../../util/file.js"; +import {CPStateDatastore, DatastoreKey} from "./types.js"; + +const CHECKPOINT_STATES_FOLDER = "checkpoint_states"; +const CHECKPOINT_FILE_NAME_LENGTH = 82; + +/** + * Implementation of CPStatePersistentApis using file system, this is beneficial for debugging. + */ +export class FileCPStateDatastore implements CPStateDatastore { + private readonly folderPath: string; + + constructor(parentDir: string = ".") { + // by default use the beacon folder `/beacon/checkpoint_states` + this.folderPath = path.join(parentDir, CHECKPOINT_STATES_FOLDER); + } + + async init(): Promise { + try { + await ensureDir(this.folderPath); + } catch (_) { + // do nothing + } + } + + async write(cpKey: phase0.Checkpoint, stateBytes: Uint8Array): Promise { + const serializedCheckpoint = ssz.phase0.Checkpoint.serialize(cpKey); + const filePath = path.join(this.folderPath, toHexString(serializedCheckpoint)); + await writeIfNotExist(filePath, stateBytes); + return serializedCheckpoint; + } + + async remove(serializedCheckpoint: DatastoreKey): Promise { + const filePath = path.join(this.folderPath, toHexString(serializedCheckpoint)); + await removeFile(filePath); + } + + async read(serializedCheckpoint: DatastoreKey): Promise { + const filePath = path.join(this.folderPath, toHexString(serializedCheckpoint)); + return readFile(filePath); + } + + async readKeys(): Promise { + const fileNames = await readFileNames(this.folderPath); + return fileNames + .filter((fileName) => fileName.startsWith("0x") && fileName.length === CHECKPOINT_FILE_NAME_LENGTH) + .map((fileName) => fromHexString(fileName)); + } +} diff --git a/packages/beacon-node/src/chain/stateCache/datastore/types.ts b/packages/beacon-node/src/chain/stateCache/datastore/types.ts index 66ea67f93500..0f81e6ae1e75 100644 --- a/packages/beacon-node/src/chain/stateCache/datastore/types.ts +++ b/packages/beacon-node/src/chain/stateCache/datastore/types.ts @@ -1,4 +1,3 @@ -import {CachedBeaconStateAllForks} from "@lodestar/state-transition"; import {phase0} from "@lodestar/types"; // With db implementation, persistedKey is serialized data of a checkpoint @@ -6,8 +5,9 @@ export type DatastoreKey = Uint8Array; // Make this generic to support testing export interface CPStateDatastore { - write: (cpKey: phase0.Checkpoint, state: CachedBeaconStateAllForks) => Promise; + write: (cpKey: phase0.Checkpoint, stateBytes: Uint8Array) => Promise; remove: (key: DatastoreKey) => Promise; read: (key: DatastoreKey) => Promise; readKeys: () => Promise; + init?: () => Promise; } diff --git a/packages/beacon-node/src/chain/stateCache/fifoBlockStateCache.ts b/packages/beacon-node/src/chain/stateCache/fifoBlockStateCache.ts index 854983101c04..63cd806d1451 100644 --- a/packages/beacon-node/src/chain/stateCache/fifoBlockStateCache.ts +++ b/packages/beacon-node/src/chain/stateCache/fifoBlockStateCache.ts @@ -80,7 +80,7 @@ export class FIFOBlockStateCache implements BlockStateCache { this.metrics?.hits.inc(); this.metrics?.stateClonedCount.observe(item.clonedCount); - return item; + return item.clone(); } /** diff --git a/packages/beacon-node/src/chain/stateCache/persistentCheckpointsCache.ts b/packages/beacon-node/src/chain/stateCache/persistentCheckpointsCache.ts index 8ad5c5098118..57121a61bf5e 100644 --- a/packages/beacon-node/src/chain/stateCache/persistentCheckpointsCache.ts +++ b/packages/beacon-node/src/chain/stateCache/persistentCheckpointsCache.ts @@ -1,15 +1,24 @@ import {fromHexString, toHexString} from "@chainsafe/ssz"; import {phase0, Epoch, RootHex} from "@lodestar/types"; import {CachedBeaconStateAllForks, computeStartSlotAtEpoch, getBlockRootAtSlot} from "@lodestar/state-transition"; -import {Logger, MapDef} from "@lodestar/utils"; +import {Logger, MapDef, sleep} from "@lodestar/utils"; import {routes} from "@lodestar/api"; import {loadCachedBeaconState} from "@lodestar/state-transition"; +import {INTERVALS_PER_SLOT} from "@lodestar/params"; import {Metrics} from "../../metrics/index.js"; import {IClock} from "../../util/clock.js"; import {ShufflingCache} from "../shufflingCache.js"; +import {BufferPool} from "../../util/bufferPool.js"; import {MapTracker} from "./mapMetrics.js"; -import {CheckpointHex, CheckpointStateCache, CacheItemType} from "./types.js"; import {CPStateDatastore, DatastoreKey, datastoreKeyToCheckpoint} from "./datastore/index.js"; +import {CheckpointHex, CacheItemType, CheckpointStateCache} from "./types.js"; + +export type PersistentCheckpointStateCacheOpts = { + // Keep max n states in memory, persist the rest to disk + maxCPStateEpochsInMemory?: number; + // for test only + processLateBlock?: boolean; +}; type GetHeadStateFn = () => CachedBeaconStateAllForks; @@ -17,14 +26,11 @@ type PersistentCheckpointStateCacheModules = { metrics?: Metrics | null; logger: Logger; clock?: IClock | null; + signal?: AbortSignal; shufflingCache: ShufflingCache; datastore: CPStateDatastore; getHeadState?: GetHeadStateFn; -}; - -type PersistentCheckpointStateCacheOpts = { - // Keep max n states in memory, persist the rest to disk - maxCPStateEpochsInMemory?: number; + bufferPool?: BufferPool; }; /** checkpoint serialized as a string */ @@ -90,15 +96,27 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { private readonly metrics: Metrics["cpStateCache"] | null | undefined; private readonly logger: Logger; private readonly clock: IClock | null | undefined; + private readonly signal: AbortSignal | undefined; private preComputedCheckpoint: string | null = null; private preComputedCheckpointHits: number | null = null; private readonly maxEpochsInMemory: number; + private readonly processLateBlock: boolean; private readonly datastore: CPStateDatastore; private readonly shufflingCache: ShufflingCache; private readonly getHeadState?: GetHeadStateFn; + private readonly bufferPool?: BufferPool; constructor( - {metrics, logger, clock, shufflingCache, datastore, getHeadState}: PersistentCheckpointStateCacheModules, + { + metrics, + logger, + clock, + signal, + shufflingCache, + datastore, + getHeadState, + bufferPool, + }: PersistentCheckpointStateCacheModules, opts: PersistentCheckpointStateCacheOpts ) { this.cache = new MapTracker(metrics?.cpStateCache); @@ -127,20 +145,26 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { } this.logger = logger; this.clock = clock; + this.signal = signal; if (opts.maxCPStateEpochsInMemory !== undefined && opts.maxCPStateEpochsInMemory < 0) { throw new Error("maxEpochsInMemory must be >= 0"); } this.maxEpochsInMemory = opts.maxCPStateEpochsInMemory ?? DEFAULT_MAX_CP_STATE_EPOCHS_IN_MEMORY; + this.processLateBlock = opts.processLateBlock ?? false; // Specify different datastore for testing this.datastore = datastore; this.shufflingCache = shufflingCache; this.getHeadState = getHeadState; + this.bufferPool = bufferPool; } /** * Reload checkpoint state keys from the last run. */ async init(): Promise { + if (this.datastore?.init) { + await this.datastore.init(); + } const persistedKeys = await this.datastore.readKeys(); for (const persistedKey of persistedKeys) { const cp = datastoreKeyToCheckpoint(persistedKey); @@ -163,7 +187,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { async getOrReload(cp: CheckpointHex): Promise { const stateOrStateBytesData = await this.getStateOrLoadDb(cp); if (stateOrStateBytesData === null || isCachedBeaconState(stateOrStateBytesData)) { - return stateOrStateBytesData; + return stateOrStateBytesData?.clone() ?? null; } const {persistedKey, stateBytes} = stateOrStateBytesData; const logMeta = {persistedKey: toHexString(persistedKey)}; @@ -176,14 +200,31 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { this.metrics?.stateReloadEpochDiff.observe(Math.abs(seedState.epochCtx.epoch - cp.epoch)); this.logger.debug("Reload: found seed state", {...logMeta, seedSlot: seedState.slot}); + let bufferPoolKey: number | undefined = undefined; try { - const timer = this.metrics?.stateReloadDuration.startTimer(); - const newCachedState = loadCachedBeaconState(seedState, stateBytes, { - shufflingGetter: this.shufflingCache.getSync.bind(this.shufflingCache), - }); + const validatorsSszTimer = this.metrics?.stateReloadValidatorsSszDuration.startTimer(); + // 80% of validators serialization time comes from memory allocation, this is to avoid it + const bytesWithKey = this.serializeStateValidators(seedState); + bufferPoolKey = bytesWithKey.key; + validatorsSszTimer?.(); + const reloadTimer = this.metrics?.stateReloadDuration.startTimer(); + const newCachedState = loadCachedBeaconState( + seedState, + stateBytes, + { + shufflingGetter: (shufflingEpoch, decisionRootHex) => { + const shuffling = this.shufflingCache.getSync(shufflingEpoch, decisionRootHex); + if (shuffling == null) { + this.metrics?.stateReloadShufflingCacheMiss.inc(); + } + return shuffling; + }, + }, + bytesWithKey.data + ); newCachedState.commit(); const stateRoot = toHexString(newCachedState.hashTreeRoot()); - timer?.(); + reloadTimer?.(); this.logger.debug("Reload: cached state load successful", { ...logMeta, stateSlot: newCachedState.slot, @@ -196,10 +237,14 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { this.cache.set(cpKey, {type: CacheItemType.inMemory, state: newCachedState, persistedKey}); this.epochIndex.getOrDefault(cp.epoch).add(cp.rootHex); // don't prune from memory here, call it at the last 1/3 of slot 0 of an epoch - return newCachedState; + return newCachedState.clone(); } catch (e) { this.logger.debug("Reload: error loading cached state", logMeta, e as Error); return null; + } finally { + if (bufferPoolKey !== undefined) { + this.bufferPool?.free(bufferPoolKey); + } } } @@ -266,7 +311,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { if (isInMemoryCacheItem(cacheItem)) { const {state} = cacheItem; this.metrics?.stateClonedCount.observe(state.clonedCount); - return state; + return state.clone(); } return null; @@ -308,7 +353,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { if (this.epochIndex.get(epoch)?.has(rootHex)) { const inMemoryState = this.get({rootHex, epoch}); if (inMemoryState) { - return inMemoryState; + return inMemoryState.clone(); } } } @@ -332,7 +377,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { try { const state = await this.getOrReload({rootHex, epoch}); if (state) { - return state; + return state.clone(); } } catch (e) { this.logger.debug("Error get or reload state", {epoch, rootHex}, e as Error); @@ -421,7 +466,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { * - 1 then we'll persist {root: b1, epoch n-1} checkpoint state to disk. Note that at epoch n there is both {root: b0, epoch: n} and {root: c0, epoch: n} checkpoint states in memory * - 2 then we'll persist {root: b2, epoch n-2} checkpoint state to disk, there are also 2 checkpoint states in memory at epoch n, same to the above (maxEpochsInMemory=1) * - * As of Nov 2023, it takes 1.3s to 1.5s to persist a state on holesky on fast server. TODO: + * As of Jan 2024, it takes 1.2s to persist a holesky state on fast server. TODO: * - improve state serialization time * - or research how to only store diff against the finalized state */ @@ -433,65 +478,36 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { return 0; } - for (const lowestEpoch of sortedEpochs.slice(0, sortedEpochs.length - this.maxEpochsInMemory)) { - const epochBoundarySlot = computeStartSlotAtEpoch(lowestEpoch); - const epochBoundaryRoot = - epochBoundarySlot === state.slot ? fromHexString(blockRootHex) : getBlockRootAtSlot(state, epochBoundarySlot); - const epochBoundaryHex = toHexString(epochBoundaryRoot); - - // for each epoch, usually there are 2 rootHex respective to the 2 checkpoint states: Previous Root Checkpoint State and Current Root Checkpoint State - for (const rootHex of this.epochIndex.get(lowestEpoch) ?? []) { - const cpKey = toCacheKey({epoch: lowestEpoch, rootHex}); - const cacheItem = this.cache.get(cpKey); + const blockSlot = state.slot; + const twoThirdsSlot = (2 * state.config.SECONDS_PER_SLOT) / INTERVALS_PER_SLOT; + // we always have clock in production, fallback value is only for test + const secFromSlot = this.clock?.secFromSlot(blockSlot) ?? twoThirdsSlot; + const secToTwoThirdsSlot = twoThirdsSlot - secFromSlot; + if (secToTwoThirdsSlot > 0) { + // 2/3 of slot is the most free time of every slot, take that chance to persist checkpoint states + // normally it should only persist checkpoint states at 2/3 of slot 0 of epoch + await sleep(secToTwoThirdsSlot * 1000, this.signal); + } else if (!this.processLateBlock) { + // normally the block persist happens at 2/3 of slot 0 of epoch, if it's already late then just skip to allow other tasks to run + // there are plenty of chances in the same epoch to persist checkpoint states, also if block is late it could be reorged + this.logger.verbose("Skip persist checkpoint states", {blockSlot, root: blockRootHex}); + return 0; + } - if (cacheItem !== undefined && isInMemoryCacheItem(cacheItem)) { - // this is state in memory, we don't care if the checkpoint state is already persisted - let {persistedKey} = cacheItem; - const {state} = cacheItem; - const logMeta = { - stateSlot: state.slot, - rootHex, - epochBoundaryHex, - persistedKey: persistedKey ? toHexString(persistedKey) : "", - }; - - if (rootHex === epochBoundaryHex) { - if (persistedKey) { - // no need to persist - this.logger.verbose("Pruned checkpoint state from memory but no need to persist", logMeta); - } else { - // persist and do not update epochIndex - this.metrics?.statePersistSecFromSlot.observe(this.clock?.secFromSlot(this.clock?.currentSlot ?? 0) ?? 0); - const timer = this.metrics?.statePersistDuration.startTimer(); - const cpPersist = {epoch: lowestEpoch, root: epochBoundaryRoot}; - persistedKey = await this.datastore.write(cpPersist, state); - timer?.(); - persistCount++; - this.logger.verbose("Pruned checkpoint state from memory and persisted to disk", { - ...logMeta, - persistedKey: toHexString(persistedKey), - }); - } - // overwrite cpKey, this means the state is deleted from memory - this.cache.set(cpKey, {type: CacheItemType.persisted, value: persistedKey}); - } else { - if (persistedKey) { - // persisted file will be eventually deleted by the archive task - // this also means the state is deleted from memory - this.cache.set(cpKey, {type: CacheItemType.persisted, value: persistedKey}); - // do not update epochIndex - } else { - // delete the state from memory - this.cache.delete(cpKey); - this.epochIndex.get(lowestEpoch)?.delete(rootHex); - } - this.metrics?.statePruneFromMemoryCount.inc(); - this.logger.verbose("Pruned checkpoint state from memory", logMeta); - } - } - } + const persistEpochs = sortedEpochs.slice(0, sortedEpochs.length - this.maxEpochsInMemory); + for (const lowestEpoch of persistEpochs) { + // usually there is only 0 or 1 epoch to persist in this loop + persistCount += await this.processPastEpoch(blockRootHex, state, lowestEpoch); } + if (persistCount > 0) { + this.logger.verbose("Persisted checkpoint states", { + slot: blockSlot, + root: blockRootHex, + persistCount, + persistEpochs: persistEpochs.length, + }); + } return persistCount; } @@ -575,6 +591,81 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { return Array.from(this.cache.keys()); } + private async processPastEpoch( + blockRootHex: RootHex, + state: CachedBeaconStateAllForks, + epoch: Epoch + ): Promise { + let persistCount = 0; + const epochBoundarySlot = computeStartSlotAtEpoch(epoch); + const epochBoundaryRoot = + epochBoundarySlot === state.slot ? fromHexString(blockRootHex) : getBlockRootAtSlot(state, epochBoundarySlot); + const epochBoundaryHex = toHexString(epochBoundaryRoot); + + // for each epoch, usually there are 2 rootHex respective to the 2 checkpoint states: Previous Root Checkpoint State and Current Root Checkpoint State + for (const rootHex of this.epochIndex.get(epoch) ?? []) { + const cpKey = toCacheKey({epoch: epoch, rootHex}); + const cacheItem = this.cache.get(cpKey); + + if (cacheItem !== undefined && isInMemoryCacheItem(cacheItem)) { + // this is state in memory, we don't care if the checkpoint state is already persisted + let {persistedKey} = cacheItem; + const {state} = cacheItem; + const logMeta = { + stateSlot: state.slot, + rootHex, + epochBoundaryHex, + persistedKey: persistedKey ? toHexString(persistedKey) : "", + }; + + if (rootHex === epochBoundaryHex) { + if (persistedKey) { + // no need to persist + this.logger.verbose("Pruned checkpoint state from memory but no need to persist", logMeta); + } else { + // persist and do not update epochIndex + this.metrics?.statePersistSecFromSlot.observe(this.clock?.secFromSlot(this.clock?.currentSlot ?? 0) ?? 0); + const cpPersist = {epoch: epoch, root: epochBoundaryRoot}; + let bufferPoolKey: number | undefined = undefined; + try { + const timer = this.metrics?.statePersistDuration.startTimer(); + const stateBytesWithKey = this.serializeState(state); + bufferPoolKey = stateBytesWithKey.key; + persistedKey = await this.datastore.write(cpPersist, stateBytesWithKey.data); + timer?.(); + } finally { + if (bufferPoolKey !== undefined) { + this.bufferPool?.free(bufferPoolKey); + } + } + persistCount++; + this.logger.verbose("Pruned checkpoint state from memory and persisted to disk", { + ...logMeta, + persistedKey: toHexString(persistedKey), + }); + } + // overwrite cpKey, this means the state is deleted from memory + this.cache.set(cpKey, {type: CacheItemType.persisted, value: persistedKey}); + } else { + if (persistedKey) { + // persisted file will be eventually deleted by the archive task + // this also means the state is deleted from memory + this.cache.set(cpKey, {type: CacheItemType.persisted, value: persistedKey}); + // do not update epochIndex + } else { + // delete the state from memory + this.cache.delete(cpKey); + this.epochIndex.get(epoch)?.delete(rootHex); + } + this.metrics?.statePruneFromMemoryCount.inc(); + this.logger.verbose("Pruned checkpoint state from memory", logMeta); + } + } + } + + return persistCount; + } + /** * Delete all items of an epoch from disk and memory */ @@ -602,9 +693,57 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache { rootHexes: Array.from(rootHexes).join(","), }); } + + /* + * It's not sustainable to allocate ~240MB for each state every epoch, so we use buffer pool to reuse the memory. + * As monitored on holesky as of Jan 2024: + * - This does not increase heap allocation while gc time is the same + * - It helps stabilize persist time and save ~300ms in average (1.5s vs 1.2s) + * - It also helps the state reload to save ~500ms in average (4.3s vs 3.8s) + * - Also `serializeState.test.ts` perf test shows a lot of differences allocating ~240MB once vs per state serialization + */ + private serializeState(state: CachedBeaconStateAllForks): {data: Uint8Array; key?: number} { + const size = state.type.tree_serializedSize(state.node); + if (this.bufferPool) { + const bufferWithKey = this.bufferPool.alloc(size); + if (bufferWithKey) { + const stateBytes = bufferWithKey.buffer; + const dataView = new DataView(stateBytes.buffer, stateBytes.byteOffset, stateBytes.byteLength); + state.type.tree_serializeToBytes({uint8Array: stateBytes, dataView}, 0, state.node); + return {data: stateBytes, key: bufferWithKey.key}; + } + } + + this.metrics?.persistedStateAllocCount.inc(); + return {data: state.serialize()}; + } + + /** + * Serialize validators to bytes leveraging the buffer pool to save memory allocation. + * - As monitored on holesky as of Jan 2024, it helps save ~500ms state reload time (4.3s vs 3.8s) + * - Also `serializeState.test.ts` perf test shows a lot of differences allocating validators bytes once vs every time, + * This is 2x - 3x faster than allocating memory every time. + * TODO: consider serializing validators manually like in `serializeState.test.ts` perf test, this could be 3x faster than this + */ + private serializeStateValidators(state: CachedBeaconStateAllForks): {data: Uint8Array; key?: number} { + const type = state.type.fields.validators; + const size = type.tree_serializedSize(state.validators.node); + if (this.bufferPool) { + const bufferWithKey = this.bufferPool.alloc(size); + if (bufferWithKey) { + const validatorsBytes = bufferWithKey.buffer; + const dataView = new DataView(validatorsBytes.buffer, validatorsBytes.byteOffset, validatorsBytes.byteLength); + type.tree_serializeToBytes({uint8Array: validatorsBytes, dataView}, 0, state.validators.node); + return {data: validatorsBytes, key: bufferWithKey.key}; + } + } + + this.metrics?.stateReloadValidatorsSszAllocCount.inc(); + return {data: state.validators.serialize()}; + } } -function toCheckpointHex(checkpoint: phase0.Checkpoint): CheckpointHex { +export function toCheckpointHex(checkpoint: phase0.Checkpoint): CheckpointHex { return { epoch: checkpoint.epoch, rootHex: toHexString(checkpoint.root), diff --git a/packages/beacon-node/src/chain/stateCache/stateContextCheckpointsCache.ts b/packages/beacon-node/src/chain/stateCache/stateContextCheckpointsCache.ts index a177db9b7c87..00613b7feae4 100644 --- a/packages/beacon-node/src/chain/stateCache/stateContextCheckpointsCache.ts +++ b/packages/beacon-node/src/chain/stateCache/stateContextCheckpointsCache.ts @@ -5,7 +5,7 @@ import {MapDef} from "@lodestar/utils"; import {routes} from "@lodestar/api"; import {Metrics} from "../../metrics/index.js"; import {MapTracker} from "./mapMetrics.js"; -import {CheckpointStateCache as CheckpointStateCacheInterface, CacheItemType} from "./types.js"; +import {CheckpointStateCache, CacheItemType} from "./types.js"; export type CheckpointHex = {epoch: Epoch; rootHex: RootHex}; const MAX_EPOCHS = 10; @@ -15,9 +15,8 @@ const MAX_EPOCHS = 10; * belonging to checkpoint * * Similar API to Repository - * TODO: rename to MemoryCheckpointStateCache in the next PR of n-historical states */ -export class CheckpointStateCache implements CheckpointStateCacheInterface { +export class InMemoryCheckpointStateCache implements CheckpointStateCache { private readonly cache: MapTracker; /** Epoch -> Set */ private readonly epochIndex = new MapDef>(() => new Set()); diff --git a/packages/beacon-node/src/chain/validation/block.ts b/packages/beacon-node/src/chain/validation/block.ts index daf99cf5365c..63f0748f7021 100644 --- a/packages/beacon-node/src/chain/validation/block.ts +++ b/packages/beacon-node/src/chain/validation/block.ts @@ -111,13 +111,13 @@ export async function validateGossipBlock( }); } - // getBlockSlotState also checks for whether the current finalized checkpoint is an ancestor of the block. + // getPreState also checks for whether the current finalized checkpoint is an ancestor of the block. // As a result, we throw an IGNORE (whereas the spec says we should REJECT for this scenario). // this is something we should change this in the future to make the code airtight to the spec. // [IGNORE] The block's parent (defined by block.parent_root) has been seen (via both gossip and non-gossip sources) (a client MAY queue blocks for processing once the parent block is retrieved). // [REJECT] The block's parent (defined by block.parent_root) passes validation. const blockState = await chain.regen - .getBlockSlotState(parentRoot, blockSlot, {dontTransferCache: true}, RegenCaller.validateGossipBlock) + .getPreState(block, {dontTransferCache: true}, RegenCaller.validateGossipBlock) .catch(() => { throw new BlockGossipError(GossipAction.IGNORE, {code: BlockErrorCode.PARENT_UNKNOWN, parentRoot}); }); diff --git a/packages/beacon-node/src/metrics/metrics/lodestar.ts b/packages/beacon-node/src/metrics/metrics/lodestar.ts index f6b143913346..db3c2b37515c 100644 --- a/packages/beacon-node/src/metrics/metrics/lodestar.ts +++ b/packages/beacon-node/src/metrics/metrics/lodestar.ts @@ -290,7 +290,11 @@ export function createLodestarMetrics( }, // Beacon state transition metrics - + epochTransitionByCaller: register.gauge<{caller: RegenCaller}>({ + name: "lodestar_epoch_transition_by_caller_total", + help: "Total count of epoch transition by caller", + labelNames: ["caller"], + }), epochTransitionTime: register.histogram({ name: "lodestar_stfn_epoch_transition_seconds", help: "Time to process a single epoch transition in seconds", @@ -1086,6 +1090,25 @@ export function createLodestarMetrics( }), }, + bufferPool: { + length: register.gauge({ + name: "lodestar_buffer_pool_length", + help: "Buffer pool length", + }), + hits: register.counter({ + name: "lodestar_buffer_pool_hits_total", + help: "Total number of buffer pool hits", + }), + misses: register.counter({ + name: "lodestar_buffer_pool_misses_total", + help: "Total number of buffer pool misses", + }), + grows: register.counter({ + name: "lodestar_buffer_pool_grows_total", + help: "Total number of buffer pool length increases", + }), + }, + cpStateCache: { lookups: register.gauge({ name: "lodestar_cp_state_cache_lookups_total", @@ -1136,6 +1159,19 @@ export function createLodestarMetrics( help: "Histogram of time to persist state to db since the clock slot", buckets: [0, 2, 4, 6, 8, 10, 12], }), + stateReloadValidatorsSszDuration: register.histogram({ + name: "lodestar_cp_state_cache_state_reload_validators_ssz_seconds", + help: "Histogram of time to serialize validators", + buckets: [0.1, 0.2, 0.5, 1], + }), + stateReloadValidatorsSszAllocCount: register.counter({ + name: "lodestar_cp_state_cache_state_reload_validators_ssz_alloc_count", + help: "Total number time to allocate memory for validators serialization", + }), + stateReloadShufflingCacheMiss: register.counter({ + name: "lodestar_cp_state_cache_state_reload_shuffling_cache_miss_count", + help: "Total number of shuffling cache misses when loading a state", + }), stateReloadDuration: register.histogram({ name: "lodestar_cp_state_cache_state_reload_seconds", help: "Histogram of time to load state from db", @@ -1160,6 +1196,10 @@ export function createLodestarMetrics( name: "lodestar_cp_state_cache_persisted_state_remove_count", help: "Total number of persisted states removed", }), + persistedStateAllocCount: register.counter({ + name: "lodestar_cp_state_cache_persisted_state_alloc_count", + help: "Total number time to allocate memory for persisted state", + }), }, balancesCache: { diff --git a/packages/beacon-node/src/util/bufferPool.ts b/packages/beacon-node/src/util/bufferPool.ts new file mode 100644 index 000000000000..c637b036df33 --- /dev/null +++ b/packages/beacon-node/src/util/bufferPool.ts @@ -0,0 +1,77 @@ +import {Metrics} from "../metrics/metrics.js"; + +/** + * If consumer wants more memory than available, we grow the buffer by this ratio. + */ +const GROW_RATIO = 1.1; + +/** + * A simple implementation to manage a single buffer. + * This is initially used for state serialization at every epoch and for state reload. + * We can enhance and use this for other purposes in the future. + */ +export class BufferPool { + private buffer: Uint8Array; + private inUse = false; + private currentKey: number; + private readonly metrics: Metrics["bufferPool"] | null = null; + + constructor(size: number, metrics: Metrics | null = null) { + this.buffer = new Uint8Array(Math.floor(size * GROW_RATIO)); + this.currentKey = 0; + if (metrics) { + this.metrics = metrics.bufferPool; + metrics.bufferPool.length.addCollect(() => { + metrics.bufferPool.length.set(this.buffer.length); + }); + } + } + + get length(): number { + return this.buffer.length; + } + + /** + * Returns a buffer of the given size with all 0. + * If the buffer is already in use, return null. + * Grow the buffer if the requested size is larger than the current buffer. + */ + alloc(size: number): {buffer: Uint8Array; key: number} | null { + return this.doAlloc(size, false); + } + + /** + * Same to alloc() but the buffer is not zeroed. + */ + allocUnsafe(size: number): {buffer: Uint8Array; key: number} | null { + return this.doAlloc(size, true); + } + + private doAlloc(size: number, isUnsafe = false): {buffer: Uint8Array; key: number} | null { + if (this.inUse) { + this.metrics?.misses.inc(); + return null; + } + this.inUse = true; + this.metrics?.hits.inc(); + this.currentKey += 1; + if (size > this.buffer.length) { + this.metrics?.grows.inc(); + this.buffer = new Uint8Array(Math.floor(size * GROW_RATIO)); + } + const bytes = this.buffer.subarray(0, size); + if (!isUnsafe) { + bytes.fill(0); + } + return {buffer: bytes, key: this.currentKey}; + } + + /** + * Marks the buffer as free. + */ + free(key: number): void { + if (key === this.currentKey) { + this.inUse = false; + } + } +} diff --git a/packages/beacon-node/src/util/file.ts b/packages/beacon-node/src/util/file.ts index af78ca8b6126..c194fb07a3b8 100644 --- a/packages/beacon-node/src/util/file.ts +++ b/packages/beacon-node/src/util/file.ts @@ -23,3 +23,30 @@ export async function writeIfNotExist(filepath: string, bytes: Uint8Array): Prom return true; } } + +/** Remove a file if it exists */ +export async function removeFile(path: string): Promise { + try { + await promisify(fs.unlink)(path); + return true; + } catch (_) { + // may not exists + return false; + } +} + +export async function readFile(path: string): Promise { + try { + return await fs.promises.readFile(path); + } catch (_) { + return null; + } +} + +export async function readFileNames(folderPath: string): Promise { + try { + return await fs.promises.readdir(folderPath); + } catch (_) { + return []; + } +} diff --git a/packages/beacon-node/src/util/multifork.ts b/packages/beacon-node/src/util/multifork.ts index 81b4921a0a4a..2b84fd86861c 100644 --- a/packages/beacon-node/src/util/multifork.ts +++ b/packages/beacon-node/src/util/multifork.ts @@ -1,8 +1,9 @@ import {ChainForkConfig} from "@lodestar/config"; -import {allForks} from "@lodestar/types"; +import {Slot, allForks} from "@lodestar/types"; import {bytesToInt} from "@lodestar/utils"; import {getSlotFromSignedBeaconBlockSerialized} from "./sszBytes.js"; +// TODO: merge to sszBytes.ts util /** * Slot uint64 */ @@ -36,10 +37,14 @@ export function getStateTypeFromBytes( config: ChainForkConfig, bytes: Buffer | Uint8Array ): allForks.AllForksSSZTypes["BeaconState"] { - const slot = bytesToInt(bytes.subarray(SLOT_BYTES_POSITION_IN_STATE, SLOT_BYTES_POSITION_IN_STATE + SLOT_BYTE_COUNT)); + const slot = getStateSlotFromBytes(bytes); return config.getForkTypes(slot).BeaconState; } +export function getStateSlotFromBytes(bytes: Uint8Array): Slot { + return bytesToInt(bytes.subarray(SLOT_BYTES_POSITION_IN_STATE, SLOT_BYTES_POSITION_IN_STATE + SLOT_BYTE_COUNT)); +} + /** * First field in update is beacon, first field in beacon is slot * diff --git a/packages/beacon-node/test/e2e/chain/stateCache/nHistoricalStates.test.ts b/packages/beacon-node/test/e2e/chain/stateCache/nHistoricalStates.test.ts new file mode 100644 index 000000000000..96f688facfcf --- /dev/null +++ b/packages/beacon-node/test/e2e/chain/stateCache/nHistoricalStates.test.ts @@ -0,0 +1,420 @@ +import {describe, it, afterEach, expect} from "vitest"; +import {Gauge, Histogram} from "prom-client"; +import {ChainConfig} from "@lodestar/config"; +import {Slot, phase0} from "@lodestar/types"; +import {TimestampFormatCode} from "@lodestar/logger"; +import {SLOTS_PER_EPOCH} from "@lodestar/params"; +import {routes} from "@lodestar/api"; +import {LogLevel, TestLoggerOpts, testLogger} from "../../../utils/logger.js"; +import {getDevBeaconNode} from "../../../utils/node/beacon.js"; +import {getAndInitDevValidators} from "../../../utils/node/validator.js"; +import {waitForEvent} from "../../../utils/events/resolver.js"; +import {ChainEvent, ReorgEventData} from "../../../../src/chain/emitter.js"; +import {ReorgedForkChoice} from "../../../utils/mocks/forkchoice.js"; +import {connect} from "../../../utils/network.js"; +import {CacheItemType} from "../../../../src/chain/stateCache/types.js"; + +/** + * Test different reorg scenarios to make sure the StateCache implementations are correct. + */ +describe( + "chain/stateCache/n-historical states", + function () { + const validatorCount = 8; + const testParams: Pick = { + // eslint-disable-next-line @typescript-eslint/naming-convention + SECONDS_PER_SLOT: 2, + }; + + const afterEachCallbacks: (() => Promise | void)[] = []; + afterEach(async () => { + while (afterEachCallbacks.length > 0) { + const callback = afterEachCallbacks.pop(); + if (callback) await callback(); + } + }); + + // all tests run until this slot + const LAST_SLOT = 33; + + /** + * (n+1) + * -----------------| + * / + * |---------|---------| + * ^ ^ + * (n+1-x) reorgedSlot n + * ^ + * commonAncestor + * |<--reorgDistance-->| + */ + const testCases: { + name: string; + reorgedSlot: number; + reorgDistance: number; + maxBlockStates: number; + maxCPStateEpochsInMemory: number; + reloadCount: number; + persistCount: number; + numStatesInMemory: number; + numStatesPersisted: number; + numEpochsInMemory: number; + numEpochsPersisted: number; + }[] = [ + /** + * Block slot 28 has parent slot 25, block slot 26 and 27 are reorged + * --------------------|--- + * / ^ ^ ^ ^ + * / 28 29 32 33 + * |----------------|---------- + * ^ ^ ^ ^ + * 24 25 26 27 + * */ + { + name: "0 historical state, reorg in same epoch", + reorgedSlot: 27, + reorgDistance: 3, + maxBlockStates: 1, + maxCPStateEpochsInMemory: 0, + // reload at cp epoch 1 once to regen state 9 (12 - 3) + reloadCount: 1, + // persist for epoch 0 to 4, no need to persist cp epoch 3 again + persistCount: 5, + // run through slot 33, no state in memory + numStatesInMemory: 0, + // epoch 0 1 2 3 4 but finalized at epoch 2 so store checkpoint states for epoch 2 3 4 + numStatesPersisted: 3, + numEpochsInMemory: 0, + // epoch 0 1 2 3 4 but finalized at eopch 2 so store checkpoint states for epoch 2 3 4 + numEpochsPersisted: 3, + // chain is finalized at epoch 2 end of test + }, + /** + * Block slot 28 has parent slot 23, block slot 824 25 26 and 27 are reorged + * --------------------------|--- + * / | ^ ^ ^ ^ + * / | 28 29 32 33 + * |----------------|---------- + * 16 ^ ^ ^ ^ ^ + * ^ 23 24 25 26 27 + * reload ^ + * 2 checkpoint states at epoch 3 are persisted + */ + { + name: "0 historical state, reorg 1 epoch", + reorgedSlot: 27, + reorgDistance: 5, + maxBlockStates: 1, + maxCPStateEpochsInMemory: 0, + // reload at cp epoch 2 once to regen state 23 (28 - 5) + reloadCount: 1, + // 1 cp state for epoch 0 1 2 4, and 2 cp states for epoch 3 (different roots) + persistCount: 6, + numStatesInMemory: 0, + // epoch 0 1 2 4 has 1 cp state, epoch 3 has 2 checkpoint states + numStatesPersisted: 6, + numEpochsInMemory: 0, + // epoch 0 1 2 3 4 + numEpochsPersisted: 5, + // chain is not finalized end of test + }, + /** + * Block slot 28 has parent slot 25, block slot 26 and 27 are reorged + * --------------------|--- + * / ^ ^ ^ ^ + * / 28 29 32 33 + * |----------------|---------- + * ^ ^ ^ ^ + * 24 25 26 27 + * */ + { + name: "maxCPStateEpochsInMemory=1, reorg in same epoch", + reorgedSlot: 27, + reorgDistance: 3, + maxBlockStates: 1, + maxCPStateEpochsInMemory: 1, + // no need to reload as cp state epoch 3 is available in memory + reloadCount: 0, + // 1 time for epoch 0 1 2 3, cp state epoch 4 is in memory + persistCount: 4, + // epoch 4, one for Current Root Checkpoint State and one for Previous Root Checkpoint State + numStatesInMemory: 2, + // epoch 2 3, epoch 4 is in-memory + numStatesPersisted: 2, + // epoch 3 + numEpochsInMemory: 1, + // epoch 2 3, epoch 4 is in-memory + numEpochsPersisted: 2, + // chain is finalized at epoch 2 end of test + }, + /** + * Block slot 28 has parent slot 23, block slot 824 25 26 and 27 are reorged + * --------------------------|--- + * / | ^ ^ ^ ^ + * / | 28 29 32 33 + * |----------------|---------- + * 16 ^ ^ ^ ^ ^ + * 23 24 25 26 27 + * ^ + * PRCS at epoch 3 is persisted, CRCS is pruned + */ + { + name: "maxCPStateEpochsInMemory=1, reorg last slot of previous epoch", + reorgedSlot: 27, + reorgDistance: 5, + maxBlockStates: 1, + maxCPStateEpochsInMemory: 1, + // PRCS at epoch 3 is available in memory so no need to reload + reloadCount: 0, + // 1 cp state for epoch 0 1 2 3 + persistCount: 4, + // epoch 4, one for Current Root Checkpoint State and one for Previous Root Checkpoint State + numStatesInMemory: 2, + // chain is not finalized, epoch 4 is in-memory so CP state at epoch 0 1 2 3 are persisted + numStatesPersisted: 4, + // epoch 4 + numEpochsInMemory: 1, + // chain is not finalized, epoch 4 is in-memory so CP state at epoch 0 1 2 3 are persisted + numEpochsPersisted: 4, + // chain is NOT finalized end of test + }, + /** + * Block slot 28 has parent slot 23, block slot 824 25 26 and 27 are reorged + * --------------------------------|--- + * / | ^ ^ ^ ^ + * / | 28 29 32 33 + * |----------------|---------- + * 16 ^ ^ ^ ^ ^ ^ + * 19 23 24 25 26 27 + * ^ + * PRCS at epoch 3 is persisted, CRCS is pruned + */ + { + name: "maxCPStateEpochsInMemory=1, reorg middle slot of previous epoch", + reorgedSlot: 27, + reorgDistance: 9, + maxBlockStates: 1, + maxCPStateEpochsInMemory: 1, + // reload CP state epoch 2 (slot = 16) + reloadCount: 1, + // 1 cp state for epoch 0 1 2 3 + persistCount: 4, + // epoch 4, one for Current Root Checkpoint State and one for Previous Root Checkpoint State + numStatesInMemory: 2, + // chain is not finalized, epoch 4 is in-memory so CP state at epoch 0 1 2 3 are persisted + numStatesPersisted: 4, + // epoch 4 + numEpochsInMemory: 1, + // chain is not finalized, epoch 4 is in-memory so CP state at epoch 0 1 2 3 are persisted + numEpochsPersisted: 4, + // chain is NOT finalized end of test + }, + /** + * Block slot 28 has parent slot 23, block slot 824 25 26 and 27 are reorged + * --------------------------------------------|--- + * / | ^ ^ ^ ^ + * / | 28 29 32 33 + * |----------------|----------------|---------- + * ^ ^ 16 ^ ^ ^ ^ ^ ^ + * 8 15 19 23 24 25 26 27 + *reload ^ + * PRCS at epoch 3 is persisted, CRCS is pruned + */ + { + name: "maxCPStateEpochsInMemory=1, reorg 2 epochs", + reorgedSlot: 27, + reorgDistance: 13, + maxBlockStates: 1, + maxCPStateEpochsInMemory: 1, + // reload CP state epoch 2 (slot = 16) + reloadCount: 1, + // 1 cp state for epoch 0 1, 2 CP states for epoch 2, 1 cp state for epoch 3 + persistCount: 5, + // epoch 4, one for Current Root Checkpoint State and one for Previous Root Checkpoint State + numStatesInMemory: 2, + // chain is not finalized, epoch 4 is in-memory so CP state at epoch 0 1 2 3 are persisted, epoch 2 has 2 CP states + numStatesPersisted: 5, + // epoch 4 + numEpochsInMemory: 1, + // chain is not finalized, epoch 4 is in-memory so CP state at epoch 0 1 2 3 are persisted + numEpochsPersisted: 4, + // chain is NOT finalized end of test + }, + ]; + + for (const { + name, + reorgedSlot, + reorgDistance, + maxBlockStates, + maxCPStateEpochsInMemory, + reloadCount, + persistCount, + numStatesInMemory, + numStatesPersisted, + numEpochsInMemory, + numEpochsPersisted, + } of testCases) { + it(`${name} reorgedSlot=${reorgedSlot} reorgDistance=${reorgDistance}`, async function () { + // the node needs time to transpile/initialize bls worker threads + const genesisSlotsDelay = 7; + const genesisTime = Math.floor(Date.now() / 1000) + genesisSlotsDelay * testParams.SECONDS_PER_SLOT; + const testLoggerOpts: TestLoggerOpts = { + level: LogLevel.debug, + timestampFormat: { + format: TimestampFormatCode.EpochSlot, + genesisTime, + slotsPerEpoch: SLOTS_PER_EPOCH, + secondsPerSlot: testParams.SECONDS_PER_SLOT, + }, + }; + + const loggerNodeA = testLogger("Reorg-Node-A", testLoggerOpts); + const loggerNodeB = testLogger("FollowUp-Node-B", {...testLoggerOpts, level: LogLevel.debug}); + + const reorgedBn = await getDevBeaconNode({ + params: testParams, + options: { + sync: {isSingleNode: true}, + network: {allowPublishToZeroPeers: true}, + // run the first bn with ReorgedForkChoice, no nHistoricalStates flag so it does not have to reload + chain: { + blsVerifyAllMainThread: true, + forkchoiceConstructor: ReorgedForkChoice, + proposerBoostEnabled: true, + }, + }, + validatorCount, + logger: loggerNodeA, + }); + + // stop bn after validators + afterEachCallbacks.push(() => reorgedBn.close()); + + const {validators} = await getAndInitDevValidators({ + node: reorgedBn, + logPrefix: "bn-a", + validatorsPerClient: validatorCount, + validatorClientCount: 1, + startIndex: 0, + useRestApi: false, + testLoggerOpts, + }); + + afterEachCallbacks.push(() => Promise.all(validators.map((v) => v.close()))); + + const followupBn = await getDevBeaconNode({ + params: testParams, + options: { + api: {rest: {enabled: false}}, + // run the 2nd bn with nHistoricalStates flag and the configured maxBlockStates, maxCPStateEpochsInMemory + chain: { + blsVerifyAllMainThread: true, + forkchoiceConstructor: ReorgedForkChoice, + nHistoricalStates: true, + maxBlockStates, + maxCPStateEpochsInMemory, + proposerBoostEnabled: true, + }, + metrics: {enabled: true}, + }, + validatorCount, + genesisTime: reorgedBn.chain.getHeadState().genesisTime, + logger: loggerNodeB, + }); + + afterEachCallbacks.push(() => followupBn.close()); + + await connect(followupBn.network, reorgedBn.network); + + // wait for checkpoint 3 at slot 24, both nodes should reach same checkpoint + const checkpoints = await Promise.all( + [reorgedBn, followupBn].map((bn) => + waitForEvent(bn.chain.emitter, ChainEvent.checkpoint, 240000, (cp) => cp.epoch === 3) + ) + ); + expect(checkpoints[0]).toEqual(checkpoints[1]); + expect(checkpoints[0].epoch).toEqual(3); + const head = reorgedBn.chain.forkChoice.getHead(); + loggerNodeA.info("Node A emitted checkpoint event, head slot: " + head.slot); + + // setup reorg data for both bns + for (const bn of [reorgedBn, followupBn]) { + (bn.chain.forkChoice as ReorgedForkChoice).reorgedSlot = reorgedSlot; + (bn.chain.forkChoice as ReorgedForkChoice).reorgDistance = reorgDistance; + } + + // both nodes see the reorg event + const reorgDatas = await Promise.all( + [reorgedBn, followupBn].map((bn) => + waitForEvent( + bn.chain.emitter, + routes.events.EventType.chainReorg, + 240000, + (reorgData) => reorgData.slot === reorgedSlot + 1 + ) + ) + ); + for (const reorgData of reorgDatas) { + expect(reorgData.slot).toEqual(reorgedSlot + 1); + expect(reorgData.depth).toEqual(reorgDistance); + } + + // make sure both nodes can reach another checkpoint + const checkpoints2 = await Promise.all( + [reorgedBn, followupBn].map((bn) => + waitForEvent(bn.chain.emitter, ChainEvent.checkpoint, 240000, (cp) => cp.epoch === 4) + ) + ); + expect(checkpoints2[0]).toEqual(checkpoints2[1]); + expect(checkpoints2[0].epoch).toEqual(4); + + // wait for 1 more slot to persist states + await waitForEvent<{slot: Slot}>( + reorgedBn.chain.emitter, + routes.events.EventType.block, + 240000, + ({slot}) => slot === LAST_SLOT + ); + + const reloadMetricValues = await (followupBn.metrics?.cpStateCache.stateReloadDuration as Histogram).get(); + expect( + reloadMetricValues?.values.find( + (value) => value.metricName === "lodestar_cp_state_cache_state_reload_seconds_count" + )?.value + ).toEqual(reloadCount); + + const persistMetricValues = await (followupBn.metrics?.cpStateCache.statePersistDuration as Histogram).get(); + expect( + persistMetricValues?.values.find( + (value) => value.metricName === "lodestar_cp_state_cache_state_persist_seconds_count" + )?.value + ).toEqual(persistCount); + + // assert number of persisted/in-memory states + const stateSizeMetricValues = await (followupBn.metrics?.cpStateCache.size as unknown as Gauge).get(); + const numStateInMemoryItem = stateSizeMetricValues?.values.find( + (value) => value.labels.type === CacheItemType.inMemory + ); + const numStatePersistedItem = stateSizeMetricValues?.values.find( + (value) => value.labels.type === CacheItemType.persisted + ); + expect(numStateInMemoryItem?.value).toEqual(numStatesInMemory); + expect(numStatePersistedItem?.value).toEqual(numStatesPersisted); + + // assert number of epochs persisted/in-memory + const epochSizeMetricValues = await (followupBn.metrics?.cpStateCache.epochSize as unknown as Gauge).get(); + const numEpochsInMemoryItem = epochSizeMetricValues?.values.find( + (value) => value.labels.type === CacheItemType.inMemory + ); + const numEpochsPersistedItem = epochSizeMetricValues?.values.find( + (value) => value.labels.type === CacheItemType.persisted + ); + expect(numEpochsInMemoryItem?.value).toEqual(numEpochsInMemory); + expect(numEpochsPersistedItem?.value).toEqual(numEpochsPersisted); + }); + } + }, + // on local environment, it takes around 70s for 2 checkpoints so make it 96s for CI + {timeout: 96_000} +); diff --git a/packages/beacon-node/test/perf/chain/stateCache/stateContextCheckpointsCache.test.ts b/packages/beacon-node/test/perf/chain/stateCache/stateContextCheckpointsCache.test.ts index cf0ab1fa16b2..1ebb21b324ea 100644 --- a/packages/beacon-node/test/perf/chain/stateCache/stateContextCheckpointsCache.test.ts +++ b/packages/beacon-node/test/perf/chain/stateCache/stateContextCheckpointsCache.test.ts @@ -2,17 +2,17 @@ import {itBench, setBenchOpts} from "@dapplion/benchmark"; import {CachedBeaconStateAllForks} from "@lodestar/state-transition"; import {ssz, phase0} from "@lodestar/types"; import {generateCachedState} from "../../../utils/state.js"; -import {CheckpointStateCache, toCheckpointHex} from "../../../../src/chain/stateCache/index.js"; +import {InMemoryCheckpointStateCache, toCheckpointHex} from "../../../../src/chain/stateCache/index.js"; describe("CheckpointStateCache perf tests", function () { setBenchOpts({noThreshold: true}); let state: CachedBeaconStateAllForks; let checkpoint: phase0.Checkpoint; - let checkpointStateCache: CheckpointStateCache; + let checkpointStateCache: InMemoryCheckpointStateCache; before(() => { - checkpointStateCache = new CheckpointStateCache({}); + checkpointStateCache = new InMemoryCheckpointStateCache({}); state = generateCachedState(); checkpoint = ssz.phase0.Checkpoint.defaultValue(); }); diff --git a/packages/beacon-node/test/unit/chain/stateCache/persistentCheckpointsCache.test.ts b/packages/beacon-node/test/unit/chain/stateCache/persistentCheckpointsCache.test.ts index 83a2dddd65dd..18bdfac89793 100644 --- a/packages/beacon-node/test/unit/chain/stateCache/persistentCheckpointsCache.test.ts +++ b/packages/beacon-node/test/unit/chain/stateCache/persistentCheckpointsCache.test.ts @@ -88,7 +88,7 @@ describe("PersistentCheckpointStateCache", function () { const datastore = getTestDatastore(fileApisBuffer); cache = new PersistentCheckpointStateCache( {datastore, logger: testLogger(), shufflingCache: new ShufflingCache()}, - {maxCPStateEpochsInMemory: 2} + {maxCPStateEpochsInMemory: 2, processLateBlock: true} ); cache.add(cp0a, states["cp0a"]); cache.add(cp0b, states["cp0b"]); @@ -156,7 +156,7 @@ describe("PersistentCheckpointStateCache", function () { const datastore = getTestDatastore(fileApisBuffer); cache = new PersistentCheckpointStateCache( {datastore, logger: testLogger(), shufflingCache: new ShufflingCache()}, - {maxCPStateEpochsInMemory: 2} + {maxCPStateEpochsInMemory: 2, processLateBlock: true} ); cache.add(cp0a, states["cp0a"]); cache.add(cp0b, states["cp0b"]); @@ -228,7 +228,7 @@ describe("PersistentCheckpointStateCache", function () { const datastore = getTestDatastore(fileApisBuffer); cache = new PersistentCheckpointStateCache( {datastore, logger: testLogger(), shufflingCache: new ShufflingCache()}, - {maxCPStateEpochsInMemory: 2} + {maxCPStateEpochsInMemory: 2, processLateBlock: true} ); cache.add(cp0a, states["cp0a"]); cache.add(cp0b, states["cp0b"]); @@ -527,7 +527,7 @@ describe("PersistentCheckpointStateCache", function () { const datastore = getTestDatastore(fileApisBuffer); cache = new PersistentCheckpointStateCache( {datastore, logger: testLogger(), shufflingCache: new ShufflingCache()}, - {maxCPStateEpochsInMemory: 1} + {maxCPStateEpochsInMemory: 1, processLateBlock: true} ); cache.add(cp0a, states["cp0a"]); cache.add(cp0b, states["cp0b"]); @@ -792,7 +792,7 @@ describe("PersistentCheckpointStateCache", function () { const datastore = getTestDatastore(fileApisBuffer); cache = new PersistentCheckpointStateCache( {datastore, logger: testLogger(), shufflingCache: new ShufflingCache()}, - {maxCPStateEpochsInMemory: 0} + {maxCPStateEpochsInMemory: 0, processLateBlock: true} ); cache.add(cp0a, states["cp0a"]); cache.add(cp0b, states["cp0b"]); diff --git a/packages/beacon-node/test/unit/chain/validation/block.test.ts b/packages/beacon-node/test/unit/chain/validation/block.test.ts index f1aca0a43cf7..d2a681a14edf 100644 --- a/packages/beacon-node/test/unit/chain/validation/block.test.ts +++ b/packages/beacon-node/test/unit/chain/validation/block.test.ts @@ -132,7 +132,7 @@ describe("gossip block validation", function () { // Returned parent block is latter than proposed block forkChoice.getBlockHex.mockReturnValueOnce({slot: clockSlot - 1} as ProtoBlock); // Regen not able to get the parent block state - regen.getBlockSlotState.mockRejectedValue(undefined); + regen.getPreState.mockRejectedValue(undefined); await expectRejectedWithLodestarError( validateGossipBlock(config, chain, job, ForkName.phase0), @@ -146,7 +146,7 @@ describe("gossip block validation", function () { // Returned parent block is latter than proposed block forkChoice.getBlockHex.mockReturnValueOnce({slot: clockSlot - 1} as ProtoBlock); // Regen returns some state - regen.getBlockSlotState.mockResolvedValue(generateCachedState()); + regen.getPreState.mockResolvedValue(generateCachedState()); // BLS signature verifier returns invalid verifySignature.mockResolvedValue(false); @@ -163,7 +163,7 @@ describe("gossip block validation", function () { forkChoice.getBlockHex.mockReturnValueOnce({slot: clockSlot - 1} as ProtoBlock); // Regen returns some state const state = generateCachedState(); - regen.getBlockSlotState.mockResolvedValue(state); + regen.getPreState.mockResolvedValue(state); // BLS signature verifier returns valid verifySignature.mockResolvedValue(true); // Force proposer shuffling cache to return wrong value @@ -182,7 +182,7 @@ describe("gossip block validation", function () { forkChoice.getBlockHex.mockReturnValueOnce({slot: clockSlot - 1} as ProtoBlock); // Regen returns some state const state = generateCachedState(); - regen.getBlockSlotState.mockResolvedValue(state); + regen.getPreState.mockResolvedValue(state); // BLS signature verifier returns valid verifySignature.mockResolvedValue(true); // Force proposer shuffling cache to return wrong value diff --git a/packages/beacon-node/test/unit/util/bufferPool.test.ts b/packages/beacon-node/test/unit/util/bufferPool.test.ts new file mode 100644 index 000000000000..2f8004e8315e --- /dev/null +++ b/packages/beacon-node/test/unit/util/bufferPool.test.ts @@ -0,0 +1,25 @@ +import {describe, it, expect} from "vitest"; +import {BufferPool} from "../../../src/util/bufferPool.js"; + +describe("BufferPool", () => { + const pool = new BufferPool(100); + + it("should increase length", () => { + expect(pool.length).toEqual(110); + const mem = pool.alloc(200); + if (mem === null) { + throw Error("Expected non-null mem"); + } + expect(pool.length).toEqual(220); + pool.free(mem.key); + }); + + it("should not allow alloc if in use", () => { + const mem = pool.alloc(20); + if (mem === null) { + throw Error("Expected non-null mem"); + } + expect(pool.alloc(20)).toEqual(null); + pool.free(mem.key); + }); +}); diff --git a/packages/beacon-node/test/utils/chain/stateCache/datastore.ts b/packages/beacon-node/test/utils/chain/stateCache/datastore.ts index 8a944f4c2d88..73eab697de96 100644 --- a/packages/beacon-node/test/utils/chain/stateCache/datastore.ts +++ b/packages/beacon-node/test/utils/chain/stateCache/datastore.ts @@ -3,11 +3,11 @@ import {CPStateDatastore, checkpointToDatastoreKey} from "../../../../src/chain/ export function getTestDatastore(fileApisBuffer: Map): CPStateDatastore { const datastore: CPStateDatastore = { - write: (cp, state) => { + write: (cp, stateBytes) => { const persistentKey = checkpointToDatastoreKey(cp); const stringKey = toHexString(persistentKey); if (!fileApisBuffer.has(stringKey)) { - fileApisBuffer.set(stringKey, state.serialize()); + fileApisBuffer.set(stringKey, stateBytes); } return Promise.resolve(persistentKey); }, diff --git a/packages/beacon-node/test/utils/mocks/forkchoice.ts b/packages/beacon-node/test/utils/mocks/forkchoice.ts new file mode 100644 index 000000000000..5ec594e4ed81 --- /dev/null +++ b/packages/beacon-node/test/utils/mocks/forkchoice.ts @@ -0,0 +1,108 @@ +import {ChainForkConfig} from "@lodestar/config"; +import {ForkChoice, ForkChoiceOpts, IForkChoiceStore, ProtoArray, ProtoBlock} from "@lodestar/fork-choice"; +import {Slot} from "@lodestar/types"; + +/** + * Specific implementation of ForkChoice that reorg at a given slot and distance. + * (n+1) + * -----------------| + * / + * |---------|---------| + * ^ ^ + * (n+1-x) reorgedSlot n + * ^ + * commonAncestor + * |<--reorgDistance-->| + **/ +export class ReorgedForkChoice extends ForkChoice { + /** + * These need to be in the constructor, however we want to keep the constructor signature the same. + * So they are set after construction in the test instead. + */ + reorgedSlot: Slot | undefined; + reorgDistance: number | undefined; + private readonly _fcStore: IForkChoiceStore; + // these flags to mark if the current call of getHead() is to produce a block + // the other way to check this is to check the n-th call of getHead() in the same slot, but this is easier + private calledUpdateHead = false; + private calledUpdateTime = false; + + constructor( + config: ChainForkConfig, + fcStore: IForkChoiceStore, + /** The underlying representation of the block DAG. */ + protoArray: ProtoArray, + opts?: ForkChoiceOpts + ) { + super(config, fcStore, protoArray, opts); + this._fcStore = fcStore; + } + + /** + * Override the getHead() method + * - produceBlock: to reorg at a given slot and distance. + * - produceAttestation: to build on the latest node after the reorged slot + * - importBlock: to return the old branch at the reorged slot to produce the reorg event + */ + getHead = (): ProtoBlock => { + const currentSlot = this._fcStore.currentSlot; + const producingBlock = this.calledUpdateHead && this.calledUpdateTime; + if (this.reorgedSlot === undefined || this.reorgDistance === undefined) { + return super.getHead(); + } + + this.calledUpdateTime = false; + this.calledUpdateHead = false; + + // produceBlock: at reorgedSlot + 1, build new branch + if (currentSlot === this.reorgedSlot + 1 && producingBlock) { + const nodes = super.getAllNodes(); + const headSlot = currentSlot - this.reorgDistance; + const headNode = nodes.find((node) => node.slot === headSlot); + if (headNode !== undefined) { + return headNode; + } + } + + // this is mainly for producing attestations + produceBlock for latter slots + if (currentSlot > this.reorgedSlot + 1) { + // from now on build on latest node which reorged at the given slot + const nodes = super.getAllNodes(); + return nodes[nodes.length - 1]; + } + + // importBlock flow at "this.reorgedSlot + 1" returns the old branch for oldHead computation which trigger reorg event + return super.getHead(); + }; + + updateTime(currentSlot: Slot): void { + // set flag to signal produceBlock flow + this.calledUpdateTime = true; + super.updateTime(currentSlot); + } + + /** + * Override this function to: + * - produceBlock flow: mark flags to indicate that the current call of getHead() is to produce a block + * - importBlock: return the new branch after the reorged slot, this is for newHead computation + */ + updateHead = (): ProtoBlock => { + if (this.reorgedSlot === undefined || this.reorgDistance === undefined) { + return super.updateHead(); + } + // in all produce blocks flow, it always call updateTime() first then recomputeForkChoiceHead() + if (this.calledUpdateTime) { + this.calledUpdateHead = true; + } + const currentSlot = this._fcStore.currentSlot; + if (currentSlot <= this.reorgedSlot) { + return super.updateHead(); + } + + // since reorgSlot, always return the latest node + const nodes = super.getAllNodes(); + const head = nodes[nodes.length - 1]; + super.updateHead(); + return head; + }; +} diff --git a/packages/cli/src/options/beaconNodeOptions/chain.ts b/packages/cli/src/options/beaconNodeOptions/chain.ts index 0c9280f5330c..331619846b33 100644 --- a/packages/cli/src/options/beaconNodeOptions/chain.ts +++ b/packages/cli/src/options/beaconNodeOptions/chain.ts @@ -26,6 +26,10 @@ export type ChainArgs = { broadcastValidationStrictness?: string; "chain.minSameMessageSignatureSetsToBatch"?: number; "chain.maxShufflingCacheEpochs"?: number; + "chain.nHistoricalStates"?: boolean; + "chain.nHistoricalStatesFileDataStore"?: boolean; + "chain.maxBlockStates"?: number; + "chain.maxCPStateEpochsInMemory"?: number; }; export function parseArgs(args: ChainArgs): IBeaconNodeOptions["chain"] { @@ -53,6 +57,11 @@ export function parseArgs(args: ChainArgs): IBeaconNodeOptions["chain"] { minSameMessageSignatureSetsToBatch: args["chain.minSameMessageSignatureSetsToBatch"] ?? defaultOptions.chain.minSameMessageSignatureSetsToBatch, maxShufflingCacheEpochs: args["chain.maxShufflingCacheEpochs"] ?? defaultOptions.chain.maxShufflingCacheEpochs, + nHistoricalStates: args["chain.nHistoricalStates"] ?? defaultOptions.chain.nHistoricalStates, + nHistoricalStatesFileDataStore: + args["chain.nHistoricalStatesFileDataStore"] ?? defaultOptions.chain.nHistoricalStatesFileDataStore, + maxBlockStates: args["chain.maxBlockStates"] ?? defaultOptions.chain.maxBlockStates, + maxCPStateEpochsInMemory: args["chain.maxCPStateEpochsInMemory"] ?? defaultOptions.chain.maxCPStateEpochsInMemory, }; } @@ -212,4 +221,37 @@ Will double processing times. Use only for debugging purposes.", default: defaultOptions.chain.maxShufflingCacheEpochs, group: "chain", }, + + "chain.nHistoricalStates": { + hidden: true, + description: + "Use the new FIFOBlockStateCache and PersistentCheckpointStateCache or not which make lodestar heap size bounded instead of unbounded as before", + type: "boolean", + default: defaultOptions.chain.nHistoricalStates, + group: "chain", + }, + + "chain.nHistoricalStatesFileDataStore": { + hidden: true, + description: "Use fs to store checkpoint state for PersistentCheckpointStateCache or not", + type: "boolean", + default: defaultOptions.chain.nHistoricalStatesFileDataStore, + group: "chain", + }, + + "chain.maxBlockStates": { + hidden: true, + description: "Max block states to cache in memory, used for FIFOBlockStateCache", + type: "number", + default: defaultOptions.chain.maxBlockStates, + group: "chain", + }, + + "chain.maxCPStateEpochsInMemory": { + hidden: true, + description: "Max epochs to cache checkpoint states in memory, used for PersistentCheckpointStateCache", + type: "number", + default: defaultOptions.chain.maxCPStateEpochsInMemory, + group: "chain", + }, }; diff --git a/packages/cli/test/unit/options/beaconNodeOptions.test.ts b/packages/cli/test/unit/options/beaconNodeOptions.test.ts index 8a9ccff5a917..35247d1e165f 100644 --- a/packages/cli/test/unit/options/beaconNodeOptions.test.ts +++ b/packages/cli/test/unit/options/beaconNodeOptions.test.ts @@ -36,6 +36,10 @@ describe("options / beaconNodeOptions", () => { "chain.trustedSetup": "", "chain.minSameMessageSignatureSetsToBatch": 32, "chain.maxShufflingCacheEpochs": 100, + "chain.nHistoricalStates": true, + "chain.nHistoricalStatesFileDataStore": true, + "chain.maxBlockStates": 100, + "chain.maxCPStateEpochsInMemory": 100, emitPayloadAttributes: false, eth1: true, @@ -139,6 +143,10 @@ describe("options / beaconNodeOptions", () => { trustedSetup: "", minSameMessageSignatureSetsToBatch: 32, maxShufflingCacheEpochs: 100, + nHistoricalStates: true, + nHistoricalStatesFileDataStore: true, + maxBlockStates: 100, + maxCPStateEpochsInMemory: 100, }, eth1: { enabled: true, diff --git a/packages/state-transition/src/cache/stateCache.ts b/packages/state-transition/src/cache/stateCache.ts index b01ca0c409b2..8b45152a3646 100644 --- a/packages/state-transition/src/cache/stateCache.ts +++ b/packages/state-transition/src/cache/stateCache.ts @@ -164,9 +164,15 @@ export function createCachedBeaconState( export function loadCachedBeaconState( cachedSeedState: T, stateBytes: Uint8Array, - opts?: EpochCacheOpts + opts?: EpochCacheOpts, + seedValidatorsBytes?: Uint8Array ): T { - const {state: migratedState, modifiedValidators} = loadState(cachedSeedState.config, cachedSeedState, stateBytes); + const {state: migratedState, modifiedValidators} = loadState( + cachedSeedState.config, + cachedSeedState, + stateBytes, + seedValidatorsBytes + ); const {pubkey2index, index2pubkey} = cachedSeedState.epochCtx; // Get the validators sub tree once for all the loop const validators = migratedState.validators; diff --git a/packages/state-transition/src/util/index.ts b/packages/state-transition/src/util/index.ts index 3f2e91da9a77..f4c97453e5c7 100644 --- a/packages/state-transition/src/util/index.ts +++ b/packages/state-transition/src/util/index.ts @@ -23,3 +23,4 @@ export * from "./slot.js"; export * from "./syncCommittee.js"; export * from "./validator.js"; export * from "./weakSubjectivity.js"; +export * from "./sszBytes.js"; diff --git a/packages/state-transition/src/util/loadState/loadState.ts b/packages/state-transition/src/util/loadState/loadState.ts index 83377101609d..dc9f8fe4fcab 100644 --- a/packages/state-transition/src/util/loadState/loadState.ts +++ b/packages/state-transition/src/util/loadState/loadState.ts @@ -20,7 +20,8 @@ type MigrateStateOutput = {state: BeaconStateAllForks; modifiedValidators: numbe export function loadState( config: ChainForkConfig, seedState: BeaconStateAllForks, - stateBytes: Uint8Array + stateBytes: Uint8Array, + seedValidatorsBytes?: Uint8Array ): MigrateStateOutput { // casting only to make typescript happy const stateType = getStateTypeFromBytes(config, stateBytes) as typeof ssz.capella.BeaconState; @@ -42,7 +43,8 @@ export function loadState( const modifiedValidators = loadValidators( migratedState, seedState, - stateBytes.subarray(validatorsRange.start, validatorsRange.end) + stateBytes.subarray(validatorsRange.start, validatorsRange.end), + seedValidatorsBytes ); // inactivityScores are rarely changed @@ -128,7 +130,7 @@ function loadInactivityScores( } /** - * As of Sep 2021, common validators of 2 mainnet states are rarely changed. However, the benchmark shows that + * As of Sep 2023, common validators of 2 mainnet states are rarely changed. However, the benchmark shows that * 10k modified validators is not an issue. (see packages/state-transition/test/perf/util/loadState/findModifiedValidators.test.ts) * * This method loads validators from bytes given a seed state so that they share the same base tree. This gives some benefits: @@ -159,7 +161,8 @@ function loadInactivityScores( function loadValidators( migratedState: BeaconStateAllForks, seedState: BeaconStateAllForks, - newValidatorsBytes: Uint8Array + newValidatorsBytes: Uint8Array, + seedStateValidatorsBytes?: Uint8Array ): number[] { const seedValidatorCount = seedState.validators.length; const newValidatorCount = Math.floor(newValidatorsBytes.length / VALIDATOR_BYTES_SIZE); @@ -167,7 +170,9 @@ function loadValidators( const minValidatorCount = Math.min(seedValidatorCount, newValidatorCount); // migrated state starts with the same validators to seed state migratedState.validators = seedState.validators.clone(); - const seedValidatorsBytes = seedState.validators.serialize(); + // 80% of validators serialization time comes from memory allocation + // seedStateValidatorsBytes is an optimization at beacon-node side to avoid memory allocation here + const seedValidatorsBytes = seedStateValidatorsBytes ?? seedState.validators.serialize(); const modifiedValidators: number[] = []; findModifiedValidators( isMoreValidator ? seedValidatorsBytes : seedValidatorsBytes.subarray(0, minValidatorCount * VALIDATOR_BYTES_SIZE), diff --git a/packages/state-transition/test/perf/util/serializeState.test.ts b/packages/state-transition/test/perf/util/serializeState.test.ts new file mode 100644 index 000000000000..5bd6c6b38e6a --- /dev/null +++ b/packages/state-transition/test/perf/util/serializeState.test.ts @@ -0,0 +1,122 @@ +import {itBench, setBenchOpts} from "@dapplion/benchmark"; +import {ssz} from "@lodestar/types"; +import {generatePerfTestCachedStateAltair} from "../util.js"; + +/** + * This shows different statistics between allocating memory once vs every time. + * Due to gc, the test is not consistent so skipping it for CI. + */ +describe.skip("serialize state and validators", function () { + this.timeout(0); + + setBenchOpts({ + // increasing this may have different statistics due to gc time + minMs: 60_000, + }); + const valicatorCount = 1_500_000; + const seedState = generatePerfTestCachedStateAltair({vc: 1_500_000, goBackOneSlot: false}); + + /** + * Allocate memory every time, on a Mac M1: + * - 700ms to 750ms + * - Used to see 2.8s + * Allocate memory once, may test multiple times but seems consistent: + * - 430ms to 480ms + */ + const stateType = ssz.altair.BeaconState; + const rootNode = seedState.node; + const stateBytes = new Uint8Array(stateType.tree_serializedSize(rootNode)); + const stateDataView = new DataView(stateBytes.buffer, stateBytes.byteOffset, stateBytes.byteLength); + itBench({ + id: `serialize state ${valicatorCount} validators, alloc once`, + fn: () => { + stateBytes.fill(0); + stateType.tree_serializeToBytes({uint8Array: stateBytes, dataView: stateDataView}, 0, rootNode); + }, + }); + + itBench({ + id: `serialize altair state ${valicatorCount} validators`, + fn: () => { + seedState.serialize(); + }, + }); + + /** + * Allocate memory once, this takes 450ms - 500ms on a Mac M1. + */ + const validatorsType = seedState.type.fields.validators; + const validatorsSize = validatorsType.tree_serializedSize(seedState.validators.node); + const validatorsBytes = new Uint8Array(validatorsSize); + const validatorsDataView = new DataView( + validatorsBytes.buffer, + validatorsBytes.byteOffset, + validatorsBytes.byteLength + ); + itBench({ + id: `serialize state validators ${valicatorCount} validators, alloc once`, + fn: () => { + validatorsBytes.fill(0); + validatorsType.tree_serializeToBytes( + {uint8Array: validatorsBytes, dataView: validatorsDataView}, + 0, + seedState.validators.node + ); + }, + }); + + /** + * Allocate memory every time, this takes 640ms to more than 1s on a Mac M1. + */ + itBench({ + id: `serialize state validators ${valicatorCount} validators`, + fn: () => { + seedState.validators.serialize(); + }, + }); + + /** + * Allocating once and populate validators nodes once, this takes 120ms - 150ms on a Mac M1, + * this is 3x faster than the previous approach. + */ + const NUMBER_2_POW_32 = 2 ** 32; + const output = new Uint8Array(121 * 1_500_000); + const dataView = new DataView(output.buffer, output.byteOffset, output.byteLength); + // this caches validators nodes which is what happen after we run a state transition + const validators = seedState.validators.getAllReadonlyValues(); + itBench({ + id: `serialize ${valicatorCount} validators manually`, + fn: () => { + let offset = 0; + for (const validator of validators) { + output.set(validator.pubkey, offset); + offset += 48; + output.set(validator.withdrawalCredentials, offset); + offset += 32; + const {effectiveBalance, activationEligibilityEpoch, activationEpoch, exitEpoch, withdrawableEpoch} = validator; + dataView.setUint32(offset, effectiveBalance & 0xffffffff, true); + offset += 4; + dataView.setUint32(offset, (effectiveBalance / NUMBER_2_POW_32) & 0xffffffff, true); + offset += 4; + output[offset] = validator.slashed ? 1 : 0; + offset += 1; + dataView.setUint32(offset, activationEligibilityEpoch & 0xffffffff, true); + offset += 4; + dataView.setUint32(offset, (activationEligibilityEpoch / NUMBER_2_POW_32) & 0xffffffff, true); + offset += 4; + dataView.setUint32(offset, activationEpoch & 0xffffffff, true); + offset += 4; + dataView.setUint32(offset, (activationEpoch / NUMBER_2_POW_32) & 0xffffffff, true); + offset += 4; + dataView.setUint32(offset, exitEpoch & 0xffffffff, true); + offset += 4; + dataView.setUint32(offset, (exitEpoch / NUMBER_2_POW_32) & 0xffffffff, true); + offset += 4; + dataView.setUint32(offset, withdrawableEpoch & 0xffffffff, true); + offset += 4; + dataView.setUint32(offset, (withdrawableEpoch / NUMBER_2_POW_32) & 0xffffffff, true); + offset += 4; + } + }, + }); +});