Skip to content

Commit

Permalink
Merge e70e4ab into 551fd8e
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths committed Jan 18, 2024
2 parents 551fd8e + e70e4ab commit 58a2800
Show file tree
Hide file tree
Showing 33 changed files with 1,367 additions and 178 deletions.
26 changes: 20 additions & 6 deletions packages/beacon-node/src/chain/archiver/archiveStates.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {computeEpochAtSlot, computeStartSlotAtEpoch} from "@lodestar/state-trans
import {CheckpointWithHex} from "@lodestar/fork-choice";
import {IBeaconDb} from "../../db/index.js";
import {IStateRegenerator} from "../regen/interface.js";
import {getStateSlotFromBytes} from "../../util/multifork.js";

/**
* Minimum number of epochs between single temp archived states
Expand Down Expand Up @@ -83,13 +84,26 @@ export class StatesArchiver {
* Only the new finalized state is stored to disk
*/
async archiveState(finalized: CheckpointWithHex): Promise<void> {
const finalizedState = this.regen.getCheckpointStateSync(finalized);
if (!finalizedState) {
throw Error("No state in cache for finalized checkpoint state epoch #" + finalized.epoch);
// starting from Jan 2024, the finalized state could be from disk or in memory
const finalizedStateOrBytes = await this.regen.getCheckpointStateOrBytes(finalized);
const {rootHex} = finalized;
if (!finalizedStateOrBytes) {
throw Error(`No state in cache for finalized checkpoint state epoch #${finalized.epoch} root ${rootHex}`);
}
if (finalizedStateOrBytes instanceof Uint8Array) {
const slot = getStateSlotFromBytes(finalizedStateOrBytes);
await this.db.stateArchive.putBinary(slot, finalizedStateOrBytes);
this.logger.verbose("Archived finalized state bytes", {epoch: finalized.epoch, slot, root: rootHex});
} else {
// state
await this.db.stateArchive.put(finalizedStateOrBytes.slot, finalizedStateOrBytes);
// don't delete states before the finalized state, auto-prune will take care of it
this.logger.verbose("Archived finalized state", {
epoch: finalized.epoch,
slot: finalizedStateOrBytes.slot,
root: rootHex,
});
}
await this.db.stateArchive.put(finalizedState.slot, finalizedState);
// don't delete states before the finalized state, auto-prune will take care of it
this.logger.verbose("Archived finalized state", {finalizedEpoch: finalized.epoch});
}
}

Expand Down
44 changes: 22 additions & 22 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@ export async function importBlock(
): Promise<void> {
const {blockInput, postState, parentBlockSlot, executionStatus} = fullyVerifiedBlock;
const {block, source} = blockInput;
const blockRoot = this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message);
const {slot: blockSlot} = block.message;
const blockRoot = this.config.getForkTypes(blockSlot).BeaconBlock.hashTreeRoot(block.message);
const blockRootHex = toHexString(blockRoot);
const currentEpoch = computeEpochAtSlot(this.forkChoice.getTime());
const blockEpoch = computeEpochAtSlot(block.message.slot);
const blockEpoch = computeEpochAtSlot(blockSlot);
const parentEpoch = computeEpochAtSlot(parentBlockSlot);
const prevFinalizedEpoch = this.forkChoice.getFinalizedCheckpoint().epoch;
const blockDelaySec = (fullyVerifiedBlock.seenTimestampSec - postState.genesisTime) % this.config.SECONDS_PER_SLOT;
Expand All @@ -87,17 +88,16 @@ export async function importBlock(

// This adds the state necessary to process the next block
// Some block event handlers require state being in state cache so need to do this before emitting EventType.block
this.regen.addPostState(postState);
this.regen.processState(blockRootHex, postState);

this.metrics?.importBlock.bySource.inc({source});
this.logger.verbose("Added block to forkchoice and state cache", {slot: block.message.slot, root: blockRootHex});
this.logger.verbose("Added block to forkchoice and state cache", {slot: blockSlot, root: blockRootHex});

// We want to import block asap so call all event handler in the next event loop
setTimeout(() => {
const slot = block.message.slot;
this.emitter.emit(routes.events.EventType.block, {
block: blockRootHex,
slot,
slot: blockSlot,
executionOptimistic: blockSummary != null && isOptimisticBlock(blockSummary),
});

Expand All @@ -106,7 +106,7 @@ export async function importBlock(
const {index, kzgCommitment} = blobSidecar;
this.emitter.emit(routes.events.EventType.blobSidecar, {
blockRoot: blockRootHex,
slot,
slot: blockSlot,
index,
kzgCommitment: toHexString(kzgCommitment),
versionedHash: toHexString(kzgCommitmentToVersionedHash(kzgCommitment)),
Expand Down Expand Up @@ -171,7 +171,7 @@ export async function importBlock(
correctHead,
missedSlotVote,
blockRootHex,
block.message.slot
blockSlot
);
} catch (e) {
// a block has a lot of attestations and it may has same error, we don't want to log all of them
Expand All @@ -185,15 +185,15 @@ export async function importBlock(
}
} else {
// always log other errors
this.logger.warn("Error processing attestation from block", {slot: block.message.slot}, e as Error);
this.logger.warn("Error processing attestation from block", {slot: blockSlot}, e as Error);
}
}
}

for (const {error, count} of invalidAttestationErrorsByCode.values()) {
this.logger.warn(
"Error processing attestations from block",
{slot: block.message.slot, erroredAttestations: count},
{slot: blockSlot, erroredAttestations: count},
error
);
}
Expand All @@ -214,7 +214,7 @@ export async function importBlock(
// all AttesterSlashings are valid before reaching this
this.forkChoice.onAttesterSlashing(slashing);
} catch (e) {
this.logger.warn("Error processing AttesterSlashing from block", {slot: block.message.slot}, e as Error);
this.logger.warn("Error processing AttesterSlashing from block", {slot: blockSlot}, e as Error);
}
}
}
Expand Down Expand Up @@ -297,7 +297,7 @@ export async function importBlock(
parentBlockSlot
);
} catch (e) {
this.logger.verbose("Error lightClientServer.onImportBlock", {slot: block.message.slot}, e as Error);
this.logger.verbose("Error lightClientServer.onImportBlock", {slot: blockSlot}, e as Error);
}
}, 0);
}
Expand Down Expand Up @@ -351,15 +351,15 @@ export async function importBlock(
if (parentEpoch < blockEpoch) {
// current epoch and previous epoch are likely cached in previous states
this.shufflingCache.processState(postState, postState.epochCtx.nextShuffling.epoch);
this.logger.verbose("Processed shuffling for next epoch", {parentEpoch, blockEpoch, slot: block.message.slot});
this.logger.verbose("Processed shuffling for next epoch", {parentEpoch, blockEpoch, slot: blockSlot});
}

if (block.message.slot % SLOTS_PER_EPOCH === 0) {
if (blockSlot % SLOTS_PER_EPOCH === 0) {
// Cache state to preserve epoch transition work
const checkpointState = postState;
const cp = getCheckpointFromState(checkpointState);
this.regen.addCheckpointState(cp, checkpointState);
this.emitter.emit(ChainEvent.checkpoint, cp, checkpointState);
this.emitter.emit(ChainEvent.checkpoint, cp, checkpointState.clone());

// Note: in-lined code from previos handler of ChainEvent.checkpoint
this.logger.verbose("Checkpoint processed", toCheckpointHex(cp));
Expand Down Expand Up @@ -397,7 +397,7 @@ export async function importBlock(

// Send block events, only for recent enough blocks

if (this.clock.currentSlot - block.message.slot < EVENTSTREAM_EMIT_RECENT_BLOCK_SLOTS) {
if (this.clock.currentSlot - blockSlot < EVENTSTREAM_EMIT_RECENT_BLOCK_SLOTS) {
// NOTE: Skip looping if there are no listeners from the API
if (this.emitter.listenerCount(routes.events.EventType.voluntaryExit)) {
for (const voluntaryExit of block.message.body.voluntaryExits) {
Expand All @@ -417,10 +417,10 @@ export async function importBlock(
}

// Register stat metrics about the block after importing it
this.metrics?.parentBlockDistance.observe(block.message.slot - parentBlockSlot);
this.metrics?.parentBlockDistance.observe(blockSlot - parentBlockSlot);
this.metrics?.proposerBalanceDeltaAny.observe(fullyVerifiedBlock.proposerBalanceDelta);
this.metrics?.registerImportedBlock(block.message, fullyVerifiedBlock);
if (this.config.getForkSeq(block.message.slot) >= ForkSeq.altair) {
if (this.config.getForkSeq(blockSlot) >= ForkSeq.altair) {
this.metrics?.registerSyncAggregateInBlock(
blockEpoch,
(block as altair.SignedBeaconBlock).message.body.syncAggregate,
Expand All @@ -433,18 +433,18 @@ export async function importBlock(
// Gossip blocks need to be imported as soon as possible, waiting attestations could be processed
// in the next event loop. See https://github.com/ChainSafe/lodestar/issues/4789
setTimeout(() => {
this.reprocessController.onBlockImported({slot: block.message.slot, root: blockRootHex}, advancedSlot);
this.reprocessController.onBlockImported({slot: blockSlot, root: blockRootHex}, advancedSlot);
}, 0);

if (opts.seenTimestampSec !== undefined) {
const recvToImportedBlock = Date.now() / 1000 - opts.seenTimestampSec;
this.metrics?.gossipBlock.receivedToBlockImport.observe(recvToImportedBlock);
this.logger.verbose("Imported block", {slot: block.message.slot, recvToImportedBlock});
this.logger.verbose("Imported block", {slot: blockSlot, recvToImportedBlock});
}

this.logger.verbose("Block processed", {
slot: block.message.slot,
slot: blockSlot,
root: blockRootHex,
delaySec: this.clock.secFromSlot(block.message.slot),
delaySec: this.clock.secFromSlot(blockSlot),
});
}
33 changes: 29 additions & 4 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import {IExecutionEngine, IExecutionBuilder} from "../execution/index.js";
import {Clock, ClockEvent, IClock} from "../util/clock.js";
import {ensureDir, writeIfNotExist} from "../util/file.js";
import {isOptimisticBlock} from "../util/forkChoice.js";
import {BufferPool} from "../util/bufferPool.js";
import {BlockProcessor, ImportBlockOpts} from "./blocks/index.js";
import {ChainEventEmitter, ChainEvent} from "./emitter.js";
import {IBeaconChain, ProposerPreparationData, BlockHash, StateGetOpts, CommonBlockBody} from "./interface.js";
Expand Down Expand Up @@ -79,7 +80,11 @@ import {SeenAttestationDatas} from "./seenCache/seenAttestationData.js";
import {ShufflingCache} from "./shufflingCache.js";
import {StateContextCache} from "./stateCache/stateContextCache.js";
import {SeenGossipBlockInput} from "./seenCache/index.js";
import {CheckpointStateCache} from "./stateCache/stateContextCheckpointsCache.js";
import {InMemoryCheckpointStateCache} from "./stateCache/stateContextCheckpointsCache.js";
import {FIFOBlockStateCache} from "./stateCache/fifoBlockStateCache.js";
import {PersistentCheckpointStateCache} from "./stateCache/persistentCheckpointsCache.js";
import {DbCPStateDatastore} from "./stateCache/datastore/db.js";
import {FileCPStateDatastore} from "./stateCache/datastore/file.js";

/**
* Arbitrary constants, blobs and payloads should be consumed immediately in the same slot
Expand Down Expand Up @@ -237,9 +242,28 @@ export class BeaconChain implements IBeaconChain {
this.pubkey2index = cachedState.epochCtx.pubkey2index;
this.index2pubkey = cachedState.epochCtx.index2pubkey;

const stateCache = new StateContextCache({metrics});
const checkpointStateCache = new CheckpointStateCache({metrics});

const fileDataStore = opts.nHistoricalStatesFileDataStore ?? false;
const stateCache = this.opts.nHistoricalStates
? new FIFOBlockStateCache(this.opts, {metrics})
: new StateContextCache({metrics});
const checkpointStateCache = this.opts.nHistoricalStates
? new PersistentCheckpointStateCache(
{
metrics,
logger,
clock,
shufflingCache: this.shufflingCache,
getHeadState: this.getHeadState.bind(this),
bufferPool: new BufferPool(anchorState.type.tree_serializedSize(anchorState.node), metrics),
datastore: fileDataStore
? // debug option if we want to investigate any issues with the DB
new FileCPStateDatastore()
: // production option
new DbCPStateDatastore(this.db),
},
this.opts
)
: new InMemoryCheckpointStateCache({metrics});
const {checkpoint} = computeAnchorCheckpoint(config, anchorState);
stateCache.add(cachedState);
stateCache.setHeadState(cachedState);
Expand Down Expand Up @@ -333,6 +357,7 @@ export class BeaconChain implements IBeaconChain {

/** Populate in-memory caches with persisted data. Call at least once on startup */
async loadFromDisk(): Promise<void> {
await this.regen.init();
await this.opPool.fromPersisted(this.db);
}

Expand Down
13 changes: 10 additions & 3 deletions packages/beacon-node/src/chain/forkChoice/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
ForkChoiceStore,
ExecutionStatus,
JustifiedBalancesGetter,
ForkChoiceOpts,
ForkChoiceOpts as RealForkChoiceOpts,
} from "@lodestar/fork-choice";
import {
CachedBeaconStateAllForks,
Expand All @@ -21,7 +21,10 @@ import {ChainEventEmitter} from "../emitter.js";
import {ChainEvent} from "../emitter.js";
import {GENESIS_SLOT} from "../../constants/index.js";

export type {ForkChoiceOpts};
export type ForkChoiceOpts = RealForkChoiceOpts & {
// for testing only
forkchoiceConstructor?: typeof ForkChoice;
};

/**
* Fork Choice extended with a ChainEventEmitter
Expand All @@ -47,7 +50,11 @@ export function initializeForkChoice(

const justifiedBalances = getEffectiveBalanceIncrementsZeroInactive(state);

return new ForkChoice(
// forkchoiceConstructor is only used for some test cases
// production code use ForkChoice constructor directly
const forkchoiceConstructor = opts.forkchoiceConstructor ?? ForkChoice;

return new forkchoiceConstructor(
config,

new ForkChoiceStore(
Expand Down
11 changes: 11 additions & 0 deletions packages/beacon-node/src/chain/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@ import {ArchiverOpts} from "./archiver/index.js";
import {ForkChoiceOpts} from "./forkChoice/index.js";
import {LightClientServerOpts} from "./lightClient/index.js";
import {ShufflingCacheOpts} from "./shufflingCache.js";
import {DEFAULT_MAX_BLOCK_STATES, FIFOBlockStateCacheOpts} from "./stateCache/fifoBlockStateCache.js";
import {PersistentCheckpointStateCacheOpts} from "./stateCache/persistentCheckpointsCache.js";
import {DEFAULT_MAX_CP_STATE_EPOCHS_IN_MEMORY} from "./stateCache/persistentCheckpointsCache.js";

export type IChainOptions = BlockProcessOpts &
PoolOpts &
SeenCacheOpts &
ForkChoiceOpts &
ArchiverOpts &
FIFOBlockStateCacheOpts &
PersistentCheckpointStateCacheOpts &
ShufflingCacheOpts &
LightClientServerOpts & {
blsVerifyAllMainThread?: boolean;
Expand All @@ -30,6 +35,8 @@ export type IChainOptions = BlockProcessOpts &
trustedSetup?: string;
broadcastValidationStrictness?: string;
minSameMessageSignatureSetsToBatch: number;
nHistoricalStates?: boolean;
nHistoricalStatesFileDataStore?: boolean;
};

export type BlockProcessOpts = {
Expand Down Expand Up @@ -102,4 +109,8 @@ export const defaultChainOptions: IChainOptions = {
// batching too much may block the I/O thread so if useWorker=false, suggest this value to be 32
// since this batch attestation work is designed to work with useWorker=true, make this the lowest value
minSameMessageSignatureSetsToBatch: 2,
nHistoricalStates: false,
nHistoricalStatesFileDataStore: false,
maxBlockStates: DEFAULT_MAX_BLOCK_STATES,
maxCPStateEpochsInMemory: DEFAULT_MAX_CP_STATE_EPOCHS_IN_MEMORY,
};
4 changes: 3 additions & 1 deletion packages/beacon-node/src/chain/regen/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export enum RegenErrorCode {
TOO_MANY_BLOCK_PROCESSED = "REGEN_ERROR_TOO_MANY_BLOCK_PROCESSED",
BLOCK_NOT_IN_DB = "REGEN_ERROR_BLOCK_NOT_IN_DB",
STATE_TRANSITION_ERROR = "REGEN_ERROR_STATE_TRANSITION_ERROR",
INVALID_STATE_ROOT = "REGEN_ERROR_INVALID_STATE_ROOT",
}

export type RegenErrorType =
Expand All @@ -17,7 +18,8 @@ export type RegenErrorType =
| {code: RegenErrorCode.NO_SEED_STATE}
| {code: RegenErrorCode.TOO_MANY_BLOCK_PROCESSED; stateRoot: RootHex | Root}
| {code: RegenErrorCode.BLOCK_NOT_IN_DB; blockRoot: RootHex | Root}
| {code: RegenErrorCode.STATE_TRANSITION_ERROR; error: Error};
| {code: RegenErrorCode.STATE_TRANSITION_ERROR; error: Error}
| {code: RegenErrorCode.INVALID_STATE_ROOT; slot: Slot; expected: RootHex; actual: RootHex};

export class RegenError extends Error {
type: RegenErrorType;
Expand Down
3 changes: 2 additions & 1 deletion packages/beacon-node/src/chain/regen/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ export interface IStateRegenerator extends IStateRegeneratorInternal {
dropCache(): void;
dumpCacheSummary(): routes.lodestar.StateCacheItem[];
getStateSync(stateRoot: RootHex): CachedBeaconStateAllForks | null;
getCheckpointStateOrBytes(cp: CheckpointHex): Promise<CachedBeaconStateAllForks | Uint8Array | null>;
getCheckpointStateSync(cp: CheckpointHex): CachedBeaconStateAllForks | null;
getClosestHeadState(head: ProtoBlock): CachedBeaconStateAllForks | null;
pruneOnCheckpoint(finalizedEpoch: Epoch, justifiedEpoch: Epoch, headStateRoot: RootHex): void;
pruneOnFinalized(finalizedEpoch: Epoch): void;
addPostState(postState: CachedBeaconStateAllForks): void;
processState(blockRootHex: RootHex, postState: CachedBeaconStateAllForks): void;
addCheckpointState(cp: phase0.Checkpoint, item: CachedBeaconStateAllForks): void;
updateHeadState(newHeadStateRoot: RootHex, maybeHeadState: CachedBeaconStateAllForks): void;
updatePreComputedCheckpoint(rootHex: RootHex, epoch: Epoch): number | null;
Expand Down
Loading

0 comments on commit 58a2800

Please sign in to comment.