Skip to content

Commit

Permalink
Merge ef5022c into d10ed38
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths committed Mar 4, 2024
2 parents d10ed38 + ef5022c commit aabcbe2
Show file tree
Hide file tree
Showing 11 changed files with 67 additions and 28 deletions.
26 changes: 20 additions & 6 deletions packages/beacon-node/src/chain/archiver/archiveStates.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {computeEpochAtSlot, computeStartSlotAtEpoch} from "@lodestar/state-trans
import {CheckpointWithHex} from "@lodestar/fork-choice";
import {IBeaconDb} from "../../db/index.js";
import {IStateRegenerator} from "../regen/interface.js";
import {getStateSlotFromBytes} from "../../util/multifork.js";

/**
* Minimum number of epochs between single temp archived states
Expand Down Expand Up @@ -83,13 +84,26 @@ export class StatesArchiver {
* Only the new finalized state is stored to disk
*/
async archiveState(finalized: CheckpointWithHex): Promise<void> {
const finalizedState = this.regen.getCheckpointStateSync(finalized);
if (!finalizedState) {
throw Error("No state in cache for finalized checkpoint state epoch #" + finalized.epoch);
// starting from Mar 2024, the finalized state could be from disk or in memory
const finalizedStateOrBytes = await this.regen.getCheckpointStateOrBytes(finalized);
const {rootHex} = finalized;
if (!finalizedStateOrBytes) {
throw Error(`No state in cache for finalized checkpoint state epoch #${finalized.epoch} root ${rootHex}`);
}
if (finalizedStateOrBytes instanceof Uint8Array) {
const slot = getStateSlotFromBytes(finalizedStateOrBytes);
await this.db.stateArchive.putBinary(slot, finalizedStateOrBytes);
this.logger.verbose("Archived finalized state bytes", {epoch: finalized.epoch, slot, root: rootHex});
} else {
// state
await this.db.stateArchive.put(finalizedStateOrBytes.slot, finalizedStateOrBytes);
// don't delete states before the finalized state, auto-prune will take care of it
this.logger.verbose("Archived finalized state", {
epoch: finalized.epoch,
slot: finalizedStateOrBytes.slot,
root: rootHex,
});
}
await this.db.stateArchive.put(finalizedState.slot, finalizedState);
// don't delete states before the finalized state, auto-prune will take care of it
this.logger.verbose("Archived finalized state", {finalizedEpoch: finalized.epoch});
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ export async function importBlock(
const checkpointState = postState;
const cp = getCheckpointFromState(checkpointState);
this.regen.addCheckpointState(cp, checkpointState);
this.emitter.emit(ChainEvent.checkpoint, cp, checkpointState);
this.emitter.emit(ChainEvent.checkpoint, cp, checkpointState.clone(true));

// Note: in-lined code from previos handler of ChainEvent.checkpoint
this.logger.verbose("Checkpoint processed", toCheckpointHex(cp));
Expand Down
1 change: 1 addition & 0 deletions packages/beacon-node/src/chain/regen/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export interface IStateRegenerator extends IStateRegeneratorInternal {
dumpCacheSummary(): routes.lodestar.StateCacheItem[];
getStateSync(stateRoot: RootHex): CachedBeaconStateAllForks | null;
getPreStateSync(block: allForks.BeaconBlock): CachedBeaconStateAllForks | null;
getCheckpointStateOrBytes(cp: CheckpointHex): Promise<CachedBeaconStateAllForks | Uint8Array | null>;
getCheckpointStateSync(cp: CheckpointHex): CachedBeaconStateAllForks | null;
getClosestHeadState(head: ProtoBlock): CachedBeaconStateAllForks | null;
pruneOnCheckpoint(finalizedEpoch: Epoch, justifiedEpoch: Epoch, headStateRoot: RootHex): void;
Expand Down
16 changes: 14 additions & 2 deletions packages/beacon-node/src/chain/regen/queued.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ const REGEN_CAN_ACCEPT_WORK_THRESHOLD = 16;

type QueuedStateRegeneratorModules = RegenModules & {
signal: AbortSignal;
logger: Logger;
};

type RegenRequestKey = keyof IStateRegeneratorInternal;
Expand Down Expand Up @@ -54,6 +53,12 @@ export class QueuedStateRegenerator implements IStateRegenerator {
this.logger = modules.logger;
}

async init(): Promise<void> {
if (this.checkpointStateCache.init) {
return this.checkpointStateCache.init();
}
}

canAcceptWork(): boolean {
return this.jobQueue.jobLen < REGEN_CAN_ACCEPT_WORK_THRESHOLD;
}
Expand Down Expand Up @@ -105,6 +110,10 @@ export class QueuedStateRegenerator implements IStateRegenerator {
return null;
}

async getCheckpointStateOrBytes(cp: CheckpointHex): Promise<CachedBeaconStateAllForks | Uint8Array | null> {
return this.checkpointStateCache.getStateOrBytes(cp);
}

getCheckpointStateSync(cp: CheckpointHex): CachedBeaconStateAllForks | null {
return this.checkpointStateCache.get(cp);
}
Expand Down Expand Up @@ -145,10 +154,13 @@ export class QueuedStateRegenerator implements IStateRegenerator {
} else {
// Trigger regen on head change if necessary
this.logger.warn("Head state not available, triggering regen", {stateRoot: newHeadStateRoot});
// it's important to reload state to regen head state here
const allowDiskReload = true;
// head has changed, so the existing cached head state is no longer useful. Set strong reference to null to free
// up memory for regen step below. During regen, node won't be functional but eventually head will be available
// for legacy StateContextCache only
this.stateCache.setHeadState(null);
this.regen.getState(newHeadStateRoot, RegenCaller.processBlock).then(
this.regen.getState(newHeadStateRoot, RegenCaller.processBlock, allowDiskReload).then(
(headStateRegen) => this.stateCache.setHeadState(headStateRegen),
(e) => this.logger.error("Error on head state regen", {}, e)
);
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/regen/regen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ async function processSlotsToNearestCheckpoint(
const checkpointState = postState;
const cp = getCheckpointFromState(checkpointState);
checkpointStateCache.add(cp, checkpointState);
emitter.emit(ChainEvent.checkpoint, cp, checkpointState.clone());
emitter.emit(ChainEvent.checkpoint, cp, checkpointState.clone(true));

// this avoids keeping our node busy processing blocks
await sleep(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ export class FIFOBlockStateCache implements BlockStateCache {
this.metrics?.hits.inc();
this.metrics?.stateClonedCount.observe(item.clonedCount);

return item;
return item.clone(true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
async getOrReload(cp: CheckpointHex): Promise<CachedBeaconStateAllForks | null> {
const stateOrStateBytesData = await this.getStateOrLoadDb(cp);
if (stateOrStateBytesData === null || isCachedBeaconState(stateOrStateBytesData)) {
return stateOrStateBytesData;
return stateOrStateBytesData?.clone(true) ?? null;
}
const {persistedKey, stateBytes} = stateOrStateBytesData;
const logMeta = {persistedKey: toHexString(persistedKey)};
Expand Down Expand Up @@ -242,7 +242,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
this.cache.set(cpKey, {type: CacheItemType.inMemory, state: newCachedState, persistedKey});
this.epochIndex.getOrDefault(cp.epoch).add(cp.rootHex);
// don't prune from memory here, call it at the last 1/3 of slot 0 of an epoch
return newCachedState;
return newCachedState.clone(true);
} catch (e) {
this.logger.debug("Reload: error loading cached state", logMeta, e as Error);
return null;
Expand Down Expand Up @@ -312,7 +312,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
if (isInMemoryCacheItem(cacheItem)) {
const {state} = cacheItem;
this.metrics?.stateClonedCount.observe(state.clonedCount);
return state;
return state.clone(true);
}

return null;
Expand Down Expand Up @@ -352,9 +352,9 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
.filter((e) => e <= maxEpoch);
for (const epoch of epochs) {
if (this.epochIndex.get(epoch)?.has(rootHex)) {
const inMemoryState = this.get({rootHex, epoch});
if (inMemoryState) {
return inMemoryState;
const inMemoryClonedState = this.get({rootHex, epoch});
if (inMemoryClonedState) {
return inMemoryClonedState;
}
}
}
Expand All @@ -376,9 +376,9 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
for (const epoch of epochs) {
if (this.epochIndex.get(epoch)?.has(rootHex)) {
try {
const state = await this.getOrReload({rootHex, epoch});
if (state) {
return state;
const clonedState = await this.getOrReload({rootHex, epoch});
if (clonedState) {
return clonedState;
}
} catch (e) {
this.logger.debug("Error get or reload state", {epoch, rootHex}, e as Error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export class StateContextCache implements BlockStateCache {
this.metrics?.hits.inc();
this.metrics?.stateClonedCount.observe(item.clonedCount);

return item;
return item.clone(true);
}

add(item: CachedBeaconStateAllForks): void {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ export class CheckpointStateCache implements CheckpointStateCacheInterface {

this.metrics?.stateClonedCount.observe(item.clonedCount);

return item;
return item.clone(true);
}

add(cp: phase0.Checkpoint, item: CachedBeaconStateAllForks): void {
Expand Down
8 changes: 6 additions & 2 deletions packages/beacon-node/src/util/multifork.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {ChainForkConfig} from "@lodestar/config";
import {allForks} from "@lodestar/types";
import {Slot, allForks} from "@lodestar/types";
import {bytesToInt} from "@lodestar/utils";
import {getSlotFromSignedBeaconBlockSerialized} from "./sszBytes.js";

Expand Down Expand Up @@ -36,10 +36,14 @@ export function getStateTypeFromBytes(
config: ChainForkConfig,
bytes: Buffer | Uint8Array
): allForks.AllForksSSZTypes["BeaconState"] {
const slot = bytesToInt(bytes.subarray(SLOT_BYTES_POSITION_IN_STATE, SLOT_BYTES_POSITION_IN_STATE + SLOT_BYTE_COUNT));
const slot = getStateSlotFromBytes(bytes);
return config.getForkTypes(slot).BeaconState;
}

export function getStateSlotFromBytes(bytes: Uint8Array): Slot {
return bytesToInt(bytes.subarray(SLOT_BYTES_POSITION_IN_STATE, SLOT_BYTES_POSITION_IN_STATE + SLOT_BYTE_COUNT));
}

/**
* First field in update is beacon, first field in beacon is slot
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ describe("PersistentCheckpointStateCache", function () {

it("pruneFinalized and getStateOrBytes", async function () {
cache.add(cp2, states["cp2"]);
expect(await cache.getStateOrBytes(cp0bHex)).toEqual(states["cp0b"]);
expect(((await cache.getStateOrBytes(cp0bHex)) as CachedBeaconStateAllForks).hashTreeRoot()).toEqual(
states["cp0b"].hashTreeRoot()
);
expect(await cache.processState(toHexString(cp2.root), states["cp2"])).toEqual(1);
// cp0 is persisted
expect(fileApisBuffer.size).toEqual(1);
Expand Down Expand Up @@ -484,7 +486,9 @@ describe("PersistentCheckpointStateCache", function () {

// regen needs to reload cp0b
cache.add(cp0b, states["cp0b"]);
expect(await cache.getStateOrBytes(cp0bHex)).toEqual(states["cp0b"]);
expect(((await cache.getStateOrBytes(cp0bHex)) as CachedBeaconStateAllForks).hashTreeRoot()).toEqual(
states["cp0b"].hashTreeRoot()
);

// regen generates cp1b
const cp1b = {epoch: 21, root: root0b};
Expand Down Expand Up @@ -670,7 +674,9 @@ describe("PersistentCheckpointStateCache", function () {

// simulate regen
cache.add(cp0b, states["cp0b"]);
expect(await cache.getStateOrBytes(cp0bHex)).toEqual(states["cp0b"]);
expect(((await cache.getStateOrBytes(cp0bHex)) as CachedBeaconStateAllForks).hashTreeRoot()).toEqual(
states["cp0b"].hashTreeRoot()
);
// root2, regen cp0b
const cp1bState = states["cp0b"].clone();
cp1bState.slot = 21 * SLOTS_PER_EPOCH;
Expand Down Expand Up @@ -847,7 +853,9 @@ describe("PersistentCheckpointStateCache", function () {

// simulate reload cp1b
cache.add(cp0b, states["cp0b"]);
expect(await cache.getStateOrBytes(cp0bHex)).toEqual(states["cp0b"]);
expect(((await cache.getStateOrBytes(cp0bHex)) as CachedBeaconStateAllForks).hashTreeRoot()).toEqual(
states["cp0b"].hashTreeRoot()
);
const root1b = Buffer.alloc(32, 101);
const state1b = states["cp0b"].clone();
state1b.slot = state1a.slot + 1;
Expand Down

0 comments on commit aabcbe2

Please sign in to comment.