Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement shuffling cache #6030

Merged
merged 3 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ export async function importBlock(
const blockRootHex = toHexString(blockRoot);
const currentEpoch = computeEpochAtSlot(this.forkChoice.getTime());
const blockEpoch = computeEpochAtSlot(block.message.slot);
const parentEpoch = computeEpochAtSlot(parentBlockSlot);
const prevFinalizedEpoch = this.forkChoice.getFinalizedCheckpoint().epoch;
const blockDelaySec = (fullyVerifiedBlock.seenTimestampSec - postState.genesisTime) % this.config.SECONDS_PER_SLOT;

Expand Down Expand Up @@ -347,6 +348,12 @@ export async function importBlock(
this.logger.verbose("After importBlock caching postState without SSZ cache", {slot: postState.slot});
}

if (parentEpoch < blockEpoch) {
// current epoch and previous epoch are likely cached in previous states
this.shufflingCache.processState(postState, postState.epochCtx.nextShuffling.epoch);
this.logger.verbose("Processed shuffling for next epoch", {parentEpoch, blockEpoch, slot: block.message.slot});
}

if (block.message.slot % SLOTS_PER_EPOCH === 0) {
// Cache state to preserve epoch transition work
const checkpointState = postState;
Expand Down
53 changes: 52 additions & 1 deletion packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
isCachedBeaconState,
Index2PubkeyCache,
PubkeyIndexMap,
EpochShuffling,
} from "@lodestar/state-transition";
import {BeaconConfig} from "@lodestar/config";
import {
Expand Down Expand Up @@ -39,7 +40,6 @@ import {IExecutionEngine, IExecutionBuilder} from "../execution/index.js";
import {Clock, ClockEvent, IClock} from "../util/clock.js";
import {ensureDir, writeIfNotExist} from "../util/file.js";
import {isOptimisticBlock} from "../util/forkChoice.js";
import {CheckpointStateCache, StateContextCache} from "./stateCache/index.js";
import {BlockProcessor, ImportBlockOpts} from "./blocks/index.js";
import {ChainEventEmitter, ChainEvent} from "./emitter.js";
import {IBeaconChain, ProposerPreparationData, BlockHash, StateGetOpts} from "./interface.js";
Expand Down Expand Up @@ -75,6 +75,9 @@ import {BlockAttributes, produceBlockBody} from "./produceBlock/produceBlockBody
import {computeNewStateRoot} from "./produceBlock/computeNewStateRoot.js";
import {BlockInput} from "./blocks/types.js";
import {SeenAttestationDatas} from "./seenCache/seenAttestationData.js";
import {ShufflingCache} from "./shufflingCache.js";
import {StateContextCache} from "./stateCache/stateContextCache.js";
import {CheckpointStateCache} from "./stateCache/stateContextCheckpointsCache.js";

/**
* Arbitrary constants, blobs and payloads should be consumed immediately in the same slot
Expand Down Expand Up @@ -129,6 +132,7 @@ export class BeaconChain implements IBeaconChain {

readonly beaconProposerCache: BeaconProposerCache;
readonly checkpointBalancesCache: CheckpointBalancesCache;
readonly shufflingCache: ShufflingCache;
/** Map keyed by executionPayload.blockHash of the block for those blobs */
readonly producedBlobSidecarsCache = new Map<BlockHash, deneb.BlobSidecars>();
readonly producedBlindedBlobSidecarsCache = new Map<BlockHash, deneb.BlindedBlobSidecars>();
Expand Down Expand Up @@ -209,6 +213,7 @@ export class BeaconChain implements IBeaconChain {

this.beaconProposerCache = new BeaconProposerCache(opts);
this.checkpointBalancesCache = new CheckpointBalancesCache();
this.shufflingCache = new ShufflingCache(metrics, this.opts);

// Restore state caches
// anchorState may already by a CachedBeaconState. If so, don't create the cache again, since deserializing all
Expand All @@ -223,6 +228,9 @@ export class BeaconChain implements IBeaconChain {
pubkey2index: new PubkeyIndexMap(),
index2pubkey: [],
});
this.shufflingCache.processState(cachedState, cachedState.epochCtx.previousShuffling.epoch);
this.shufflingCache.processState(cachedState, cachedState.epochCtx.currentShuffling.epoch);
this.shufflingCache.processState(cachedState, cachedState.epochCtx.nextShuffling.epoch);

// Persist single global instance of state caches
this.pubkey2index = cachedState.epochCtx.pubkey2index;
Expand Down Expand Up @@ -640,6 +648,49 @@ export class BeaconChain implements IBeaconChain {
}
}

/**
* Regenerate state for attestation verification, this does not happen with default chain option of maxSkipSlots = 32 .
* However, need to handle just in case. Lodestar doesn't support multiple regen state requests for attestation verification
* at the same time, bounded inside "ShufflingCache.insertPromise()" function.
* Leave this function in chain instead of attestatation verification code to make sure we're aware of its performance impact.
*/
async regenStateForAttestationVerification(
attEpoch: Epoch,
shufflingDependentRoot: RootHex,
attHeadBlock: ProtoBlock,
regenCaller: RegenCaller
): Promise<EpochShuffling> {
// this is to prevent multiple calls to get shuffling for the same epoch and dependent root
// any subsequent calls of the same epoch and dependent root will wait for this promise to resolve
this.shufflingCache.insertPromise(attEpoch, shufflingDependentRoot);
const blockEpoch = computeEpochAtSlot(attHeadBlock.slot);

let state: CachedBeaconStateAllForks;
if (blockEpoch < attEpoch - 1) {
// thanks to one epoch look ahead, we don't need to dial up to attEpoch
const targetSlot = computeStartSlotAtEpoch(attEpoch - 1);
this.metrics?.gossipAttestation.useHeadBlockStateDialedToTargetEpoch.inc({caller: regenCaller});
state = await this.regen.getBlockSlotState(
attHeadBlock.blockRoot,
targetSlot,
{dontTransferCache: true},
regenCaller
);
} else if (blockEpoch > attEpoch) {
// should not happen, handled inside attestation verification code
throw Error(`Block epoch ${blockEpoch} is after attestation epoch ${attEpoch}`);
} else {
// should use either current or next shuffling of head state
// it's not likely to hit this since these shufflings are cached already
// so handle just in case
this.metrics?.gossipAttestation.useHeadBlockState.inc({caller: regenCaller});
state = await this.regen.getState(attHeadBlock.stateRoot, regenCaller);
}

// resolve the promise to unblock other calls of the same epoch and dependent root
return this.shufflingCache.processState(state, attEpoch);
}

/**
* `ForkChoice.onBlock` must never throw for a block that is valid with respect to the network
* `justifiedBalancesGetter()` must never throw and it should always return a state.
Expand Down
8 changes: 1 addition & 7 deletions packages/beacon-node/src/chain/errors/attestationError.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {toHexString} from "@chainsafe/ssz";
import {CommitteeIndex, Epoch, Slot, ValidatorIndex, RootHex} from "@lodestar/types";
import {Epoch, Slot, ValidatorIndex, RootHex} from "@lodestar/types";
import {GossipActionError} from "./gossipValidation.js";

export enum AttestationErrorCode {
Expand Down Expand Up @@ -65,11 +65,6 @@ export enum AttestationErrorCode {
* A signature on the attestation is invalid.
*/
INVALID_SIGNATURE = "ATTESTATION_ERROR_INVALID_SIGNATURE",
/**
* There is no committee for the slot and committee index of this attestation
* and the attestation should not have been produced.
*/
NO_COMMITTEE_FOR_SLOT_AND_INDEX = "ATTESTATION_ERROR_NO_COMMITTEE_FOR_SLOT_AND_INDEX",
/**
* The unaggregated attestation doesn't have only one aggregation bit set.
*/
Expand Down Expand Up @@ -150,7 +145,6 @@ export type AttestationErrorType =
| {code: AttestationErrorCode.HEAD_NOT_TARGET_DESCENDANT}
| {code: AttestationErrorCode.UNKNOWN_TARGET_ROOT; root: Uint8Array}
| {code: AttestationErrorCode.INVALID_SIGNATURE}
| {code: AttestationErrorCode.NO_COMMITTEE_FOR_SLOT_AND_INDEX; slot: Slot; index: CommitteeIndex}
| {code: AttestationErrorCode.NOT_EXACTLY_ONE_AGGREGATION_BIT_SET}
| {code: AttestationErrorCode.PRIOR_ATTESTATION_KNOWN; validatorIndex: ValidatorIndex; epoch: Epoch}
| {code: AttestationErrorCode.FUTURE_EPOCH; attestationEpoch: Epoch; currentEpoch: Epoch}
Expand Down
9 changes: 9 additions & 0 deletions packages/beacon-node/src/chain/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {allForks, UintNum64, Root, phase0, Slot, RootHex, Epoch, ValidatorIndex,
import {
BeaconStateAllForks,
CachedBeaconStateAllForks,
EpochShuffling,
Index2PubkeyCache,
PubkeyIndexMap,
} from "@lodestar/state-transition";
Expand Down Expand Up @@ -36,6 +37,7 @@ import {CheckpointBalancesCache} from "./balancesCache.js";
import {IChainOptions} from "./options.js";
import {AssembledBlockType, BlockAttributes, BlockType} from "./produceBlock/produceBlockBody.js";
import {SeenAttestationDatas} from "./seenCache/seenAttestationData.js";
import {ShufflingCache} from "./shufflingCache.js";

export {BlockType, type AssembledBlockType};
export {type ProposerPreparationData};
Expand Down Expand Up @@ -96,6 +98,7 @@ export interface IBeaconChain {
readonly producedBlobSidecarsCache: Map<BlockHash, deneb.BlobSidecars>;
readonly producedBlockRoot: Map<RootHex, allForks.ExecutionPayload | null>;
readonly producedBlindedBlobSidecarsCache: Map<BlockHash, deneb.BlindedBlobSidecars>;
readonly shufflingCache: ShufflingCache;
readonly producedBlindedBlockRoot: Set<RootHex>;
readonly opts: IChainOptions;

Expand Down Expand Up @@ -160,6 +163,12 @@ export interface IBeaconChain {
persistInvalidSszBytes(type: string, sszBytes: Uint8Array, suffix?: string): void;
/** Persist bad items to persistInvalidSszObjectsDir dir, for example invalid state, attestations etc. */
persistInvalidSszView(view: TreeView<CompositeTypeAny>, suffix?: string): void;
regenStateForAttestationVerification(
attEpoch: Epoch,
shufflingDependentRoot: RootHex,
attHeadBlock: ProtoBlock,
regenCaller: RegenCaller
): Promise<EpochShuffling>;
updateBuilderStatus(clockSlot: Slot): void;

regenCanAcceptWork(): boolean;
Expand Down
2 changes: 2 additions & 0 deletions packages/beacon-node/src/chain/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ import {defaultOptions as defaultValidatorOptions} from "@lodestar/validator";
import {ArchiverOpts} from "./archiver/index.js";
import {ForkChoiceOpts} from "./forkChoice/index.js";
import {LightClientServerOpts} from "./lightClient/index.js";
import {ShufflingCacheOpts} from "./shufflingCache.js";

export type IChainOptions = BlockProcessOpts &
PoolOpts &
SeenCacheOpts &
ForkChoiceOpts &
ArchiverOpts &
ShufflingCacheOpts &
LightClientServerOpts & {
blsVerifyAllMainThread?: boolean;
blsVerifyAllMultiThread?: boolean;
Expand Down
193 changes: 193 additions & 0 deletions packages/beacon-node/src/chain/shufflingCache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
import {toHexString} from "@chainsafe/ssz";
import {CachedBeaconStateAllForks, EpochShuffling, getShufflingDecisionBlock} from "@lodestar/state-transition";
import {Epoch, RootHex, ssz} from "@lodestar/types";
import {MapDef, pruneSetToMax} from "@lodestar/utils";
import {GENESIS_SLOT} from "@lodestar/params";
import {Metrics} from "../metrics/metrics.js";
import {computeAnchorCheckpoint} from "./initState.js";

/**
* Same value to CheckpointBalancesCache, with the assumption that we don't have to use it for old epochs. In the worse case:
* - when loading state bytes from disk, we need to compute shuffling for all epochs (~1s as of Sep 2023)
* - don't have shuffling to verify attestations, need to do 1 epoch transition to add shuffling to this cache. This never happens
* with default chain option of maxSkipSlots = 32
**/
const MAX_EPOCHS = 4;

/**
* With default chain option of maxSkipSlots = 32, there should be no shuffling promise. If that happens a lot, it could blow up Lodestar,
* with MAX_EPOCHS = 4, only allow 2 promise at a time. Note that regen already bounds number of concurrent requests at 1 already.
*/
const MAX_PROMISES = 2;

enum CacheItemType {
shuffling,
promise,
}

type ShufflingCacheItem = {
type: CacheItemType.shuffling;
shuffling: EpochShuffling;
};

type PromiseCacheItem = {
type: CacheItemType.promise;
promise: Promise<EpochShuffling>;
resolveFn: (shuffling: EpochShuffling) => void;
};

type CacheItem = ShufflingCacheItem | PromiseCacheItem;

export type ShufflingCacheOpts = {
maxShufflingCacheEpochs?: number;
};

/**
* A shuffling cache to help:
* - get committee quickly for attestation verification
* - if a shuffling is not available (which does not happen with default chain option of maxSkipSlots = 32), track a promise to make sure we don't compute the same shuffling twice
* - skip computing shuffling when loading state bytes from disk
*/
export class ShufflingCache {
/** LRU cache implemented as a map, pruned every time we add an item */
private readonly itemsByDecisionRootByEpoch: MapDef<Epoch, Map<RootHex, CacheItem>> = new MapDef(
() => new Map<RootHex, CacheItem>()
);

private readonly maxEpochs: number;

constructor(
private readonly metrics: Metrics | null = null,
opts: ShufflingCacheOpts = {}
) {
if (metrics) {
metrics.shufflingCache.size.addCollect(() =>
metrics.shufflingCache.size.set(
Array.from(this.itemsByDecisionRootByEpoch.values()).reduce((total, innerMap) => total + innerMap.size, 0)
)
);
}

this.maxEpochs = opts.maxShufflingCacheEpochs ?? MAX_EPOCHS;
}

/**
* Extract shuffling from state and add to cache
*/
processState(state: CachedBeaconStateAllForks, shufflingEpoch: Epoch): EpochShuffling {
const decisionBlockHex = getDecisionBlock(state, shufflingEpoch);
let shuffling: EpochShuffling;
switch (shufflingEpoch) {
case state.epochCtx.nextShuffling.epoch:
shuffling = state.epochCtx.nextShuffling;
break;
case state.epochCtx.currentShuffling.epoch:
shuffling = state.epochCtx.currentShuffling;
break;
case state.epochCtx.previousShuffling.epoch:
shuffling = state.epochCtx.previousShuffling;
break;
default:
throw new Error(`Shuffling not found from state ${state.slot} for epoch ${shufflingEpoch}`);
}

let cacheItem = this.itemsByDecisionRootByEpoch.getOrDefault(shufflingEpoch).get(decisionBlockHex);
if (cacheItem !== undefined) {
// update existing promise
if (isPromiseCacheItem(cacheItem)) {
// unblock consumers of this promise
cacheItem.resolveFn(shuffling);
// then update item type to shuffling
cacheItem = {
type: CacheItemType.shuffling,
shuffling,
};
this.add(shufflingEpoch, decisionBlockHex, cacheItem);
// we updated type to CacheItemType.shuffling so the above fields are not used anyway
this.metrics?.shufflingCache.processStateUpdatePromise.inc();
} else {
// ShufflingCacheItem, do nothing
this.metrics?.shufflingCache.processStateNoOp.inc();
}
} else {
// not found, new shuffling
this.add(shufflingEpoch, decisionBlockHex, {type: CacheItemType.shuffling, shuffling});
this.metrics?.shufflingCache.processStateInsertNew.inc();
}

return shuffling;
}

/**
* Insert a promise to make sure we don't regen state for the same shuffling.
* Bound by MAX_SHUFFLING_PROMISE to make sure our node does not blow up.
*/
insertPromise(shufflingEpoch: Epoch, decisionRootHex: RootHex): void {
const promiseCount = Array.from(this.itemsByDecisionRootByEpoch.values())
.flatMap((innerMap) => Array.from(innerMap.values()))
.filter((item) => isPromiseCacheItem(item)).length;
if (promiseCount >= MAX_PROMISES) {
throw new Error(
`Too many shuffling promises: ${promiseCount}, shufflingEpoch: ${shufflingEpoch}, decisionRootHex: ${decisionRootHex}`
);
}
let resolveFn: ((shuffling: EpochShuffling) => void) | null = null;
const promise = new Promise<EpochShuffling>((resolve) => {
resolveFn = resolve;
});
if (resolveFn === null) {
throw new Error("Promise Constructor was not executed immediately");
}

const cacheItem: PromiseCacheItem = {
type: CacheItemType.promise,
promise,
resolveFn,
};
this.add(shufflingEpoch, decisionRootHex, cacheItem);
this.metrics?.shufflingCache.insertPromiseCount.inc();
}

/**
* Most of the time, this should return a shuffling immediately.
* If there's a promise, it means we are computing the same shuffling, so we wait for the promise to resolve.
* Return null if we don't have a shuffling for this epoch and dependentRootHex.
*/
async get(shufflingEpoch: Epoch, decisionRootHex: RootHex): Promise<EpochShuffling | null> {
const cacheItem = this.itemsByDecisionRootByEpoch.getOrDefault(shufflingEpoch).get(decisionRootHex);
if (cacheItem === undefined) {
return null;
}

if (isShufflingCacheItem(cacheItem)) {
return cacheItem.shuffling;
} else {
// promise
return cacheItem.promise;
}
}

private add(shufflingEpoch: Epoch, decisionBlock: RootHex, cacheItem: CacheItem): void {
this.itemsByDecisionRootByEpoch.getOrDefault(shufflingEpoch).set(decisionBlock, cacheItem);
pruneSetToMax(this.itemsByDecisionRootByEpoch, this.maxEpochs);
}
}

function isShufflingCacheItem(item: CacheItem): item is ShufflingCacheItem {
return item.type === CacheItemType.shuffling;
}

function isPromiseCacheItem(item: CacheItem): item is PromiseCacheItem {
return item.type === CacheItemType.promise;
}

/**
* Get the shuffling decision block root for the given epoch of given state
* - Special case close to genesis block, return the genesis block root
* - This is similar to forkchoice.getDependentRoot() function, otherwise we cannot get cached shuffing in attestation verification when syncing from genesis.
*/
function getDecisionBlock(state: CachedBeaconStateAllForks, epoch: Epoch): RootHex {
return state.slot > GENESIS_SLOT
? getShufflingDecisionBlock(state, epoch)
: toHexString(ssz.phase0.BeaconBlockHeader.hashTreeRoot(computeAnchorCheckpoint(state.config, state).blockHeader));
}
Loading