Skip to content

Commit

Permalink
feat: metrics for epoch transition by callers and log block replays
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths committed Jan 18, 2024
1 parent 2b45190 commit e70e4ab
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 15 deletions.
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ export async function importBlock(
const checkpointState = postState;
const cp = getCheckpointFromState(checkpointState);
this.regen.addCheckpointState(cp, checkpointState);
this.emitter.emit(ChainEvent.checkpoint, cp, checkpointState);
this.emitter.emit(ChainEvent.checkpoint, cp, checkpointState.clone());

// Note: in-lined code from previos handler of ChainEvent.checkpoint
this.logger.verbose("Checkpoint processed", toCheckpointHex(cp));
Expand Down
1 change: 0 additions & 1 deletion packages/beacon-node/src/chain/regen/queued.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ const REGEN_CAN_ACCEPT_WORK_THRESHOLD = 16;

type QueuedStateRegeneratorModules = RegenModules & {
signal: AbortSignal;
logger: Logger;
};

type RegenRequestKey = keyof IStateRegeneratorInternal;
Expand Down
19 changes: 13 additions & 6 deletions packages/beacon-node/src/chain/regen/regen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
stateTransition,
} from "@lodestar/state-transition";
import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice";
import {sleep} from "@lodestar/utils";
import {Logger, sleep} from "@lodestar/utils";
import {SLOTS_PER_EPOCH} from "@lodestar/params";
import {ChainForkConfig} from "@lodestar/config";
import {Metrics} from "../../metrics/index.js";
Expand All @@ -28,6 +28,7 @@ export type RegenModules = {
checkpointStateCache: CheckpointStateCache;
config: ChainForkConfig;
emitter: ChainEventEmitter;
logger: Logger;
metrics: Metrics | null;
};

Expand Down Expand Up @@ -127,14 +128,14 @@ export class StateRegenerator implements IStateRegeneratorInternal {
// If a checkpoint state exists with the given checkpoint root, it either is in requested epoch
// or needs to have empty slots processed until the requested epoch
if (latestCheckpointStateCtx) {
return processSlotsByCheckpoint(this.modules, latestCheckpointStateCtx, slot, opts);
return processSlotsByCheckpoint(this.modules, latestCheckpointStateCtx, slot, rCaller, opts);
}

// Otherwise, use the fork choice to get the stateRoot from block at the checkpoint root
// regenerate that state,
// then process empty slots until the requested epoch
const blockStateCtx = await this.getState(block.stateRoot, rCaller, shouldReload);
return processSlotsByCheckpoint(this.modules, blockStateCtx, slot, opts);
return processSlotsByCheckpoint(this.modules, blockStateCtx, slot, rCaller, opts);
}

/**
Expand Down Expand Up @@ -189,6 +190,8 @@ export class StateRegenerator implements IStateRegeneratorInternal {
});
}

const replaySlots = blocksToReplay.map((b) => b.slot).join(",");
this.modules.logger.debug("Replaying blocks to get state", {stateRoot, replaySlots});
for (const b of blocksToReplay.reverse()) {
const block = await this.modules.db.block.get(fromHexString(b.blockRoot));
if (!block) {
Expand All @@ -212,7 +215,7 @@ export class StateRegenerator implements IStateRegeneratorInternal {
verifyProposer: false,
verifySignatures: false,
},
null
this.modules.metrics
);

const stateRoot = toHexString(state.hashTreeRoot());
Expand All @@ -239,6 +242,7 @@ export class StateRegenerator implements IStateRegeneratorInternal {
});
}
}
this.modules.logger.debug("Replayed blocks to get state", {stateRoot, replaySlots});

return state;
}
Expand Down Expand Up @@ -266,9 +270,10 @@ async function processSlotsByCheckpoint(
modules: {checkpointStateCache: CheckpointStateCache; metrics: Metrics | null; emitter: ChainEventEmitter},
preState: CachedBeaconStateAllForks,
slot: Slot,
rCaller: RegenCaller,
opts: StateCloneOpts
): Promise<CachedBeaconStateAllForks> {
let postState = await processSlotsToNearestCheckpoint(modules, preState, slot, opts);
let postState = await processSlotsToNearestCheckpoint(modules, preState, slot, rCaller, opts);
if (postState.slot < slot) {
postState = processSlots(postState, slot, opts, modules.metrics);
}
Expand All @@ -286,6 +291,7 @@ async function processSlotsToNearestCheckpoint(
modules: {checkpointStateCache: CheckpointStateCache; metrics: Metrics | null; emitter: ChainEventEmitter},
preState: CachedBeaconStateAllForks,
slot: Slot,
rCaller: RegenCaller,
opts: StateCloneOpts
): Promise<CachedBeaconStateAllForks> {
const preSlot = preState.slot;
Expand All @@ -301,6 +307,7 @@ async function processSlotsToNearestCheckpoint(
) {
// processSlots calls .clone() before mutating
postState = processSlots(postState, nextEpochSlot, opts, metrics);
modules.metrics?.epochTransitionByCaller.inc({caller: rCaller});

// this is usually added when we prepare for next slot or validate gossip block
// then when we process the 1st block of epoch, we don't have to do state transition again
Expand All @@ -309,7 +316,7 @@ async function processSlotsToNearestCheckpoint(
const checkpointState = postState;
const cp = getCheckpointFromState(checkpointState);
checkpointStateCache.add(cp, checkpointState);
emitter.emit(ChainEvent.checkpoint, cp, checkpointState);
emitter.emit(ChainEvent.checkpoint, cp, checkpointState.clone());

// this avoids keeping our node busy processing blocks
await sleep(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ export class FIFOBlockStateCache implements BlockStateCache {
this.metrics?.hits.inc();
this.metrics?.stateClonedCount.observe(item.clonedCount);

return item;
return item.clone();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
async getOrReload(cp: CheckpointHex): Promise<CachedBeaconStateAllForks | null> {
const stateOrStateBytesData = await this.getStateOrLoadDb(cp);
if (stateOrStateBytesData === null || isCachedBeaconState(stateOrStateBytesData)) {
return stateOrStateBytesData;
return stateOrStateBytesData?.clone() ?? null;
}
const {persistedKey, stateBytes} = stateOrStateBytesData;
const logMeta = {persistedKey: toHexString(persistedKey)};
Expand Down Expand Up @@ -237,7 +237,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
this.cache.set(cpKey, {type: CacheItemType.inMemory, state: newCachedState, persistedKey});
this.epochIndex.getOrDefault(cp.epoch).add(cp.rootHex);
// don't prune from memory here, call it at the last 1/3 of slot 0 of an epoch
return newCachedState;
return newCachedState.clone();
} catch (e) {
this.logger.debug("Reload: error loading cached state", logMeta, e as Error);
return null;
Expand Down Expand Up @@ -311,7 +311,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
if (isInMemoryCacheItem(cacheItem)) {
const {state} = cacheItem;
this.metrics?.stateClonedCount.observe(state.clonedCount);
return state;
return state.clone();
}

return null;
Expand Down Expand Up @@ -353,7 +353,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
if (this.epochIndex.get(epoch)?.has(rootHex)) {
const inMemoryState = this.get({rootHex, epoch});
if (inMemoryState) {
return inMemoryState;
return inMemoryState.clone();
}
}
}
Expand All @@ -377,7 +377,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
try {
const state = await this.getOrReload({rootHex, epoch});
if (state) {
return state;
return state.clone();
}
} catch (e) {
this.logger.debug("Error get or reload state", {epoch, rootHex}, e as Error);
Expand Down
6 changes: 5 additions & 1 deletion packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,11 @@ export function createLodestarMetrics(
},

// Beacon state transition metrics

epochTransitionByCaller: register.gauge<{caller: RegenCaller}>({
name: "lodestar_epoch_transition_by_caller_total",
help: "Total count of epoch transition by caller",
labelNames: ["caller"],
}),
epochTransitionTime: register.histogram({
name: "lodestar_stfn_epoch_transition_seconds",
help: "Time to process a single epoch transition in seconds",
Expand Down

0 comments on commit e70e4ab

Please sign in to comment.