Skip to content

Commit

Permalink
fix: MAX_ATTESTATIONS_PER_GROUP_ELECTRA and address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths authored and ensi321 committed May 6, 2024
1 parent 1e83f10 commit 0df33bf
Showing 1 changed file with 39 additions and 31 deletions.
70 changes: 39 additions & 31 deletions packages/beacon-node/src/chain/opPools/aggregatedAttestationPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ type CommitteeIndex = number;

// for pre-electra
type AttestationWithScore = {attestation: allForks.Attestation; score: number};
// for electra
/**
* for electra, this is to consolidate aggregated attestations of the same attestation data into a single attestation to be included in block
* note that this is not validator consolidation
*/
type AttestationsConsolidation = {
byCommittee: Map<CommitteeIndex, AttestationNonParticipant>;
attData: phase0.AttestationData;
Expand Down Expand Up @@ -71,12 +74,15 @@ const MAX_RETAINED_ATTESTATIONS_PER_GROUP = 4;
* Starting from Jan 2024, we have a performance issue getting attestations for a block. Based on the
* fact that lot of groups will have only 1 full participation attestation, increase this number
* a bit higher than average. This also help decrease number of slots to search for attestations.
*
*/
const MAX_ATTESTATIONS_PER_GROUP = 3;

/**
* For electra, each block has up to 8 aggregated attestations, assuming there are 3 for the "best"
* attestation data, there are still 5 for other attestation data so this constant is still good.
* We should separate to 2 constant based on conditions of different networks
*/
const MAX_ATTESTATIONS_PER_GROUP = 3;
const MAX_ATTESTATIONS_PER_GROUP_ELECTRA = 3;

/**
* Maintain a pool of aggregated attestations. Attestations can be retrieved for inclusion in a block
Expand All @@ -85,8 +91,12 @@ const MAX_ATTESTATIONS_PER_GROUP = 3;
* Note that we want to remove attestations with attesters that were included in the chain.
*/
export class AggregatedAttestationPool {
// per electra, different committees could have the same AttData hex
private readonly attestationGroupByIndexByDataHashBySlot = new MapDef<
/**
* post electra, different committees could have the same AttData and we have to consolidate attestations of the same
* data to be included in block, so we should group by data before index
* // TODO: make sure it does not affect performance for pre electra forks
*/
private readonly attestationGroupByIndexByDataHexBySlot = new MapDef<
Slot,
Map<DataRootHex, Map<CommitteeIndex, MatchingDataAttestationGroup>>
>(() => new Map<DataRootHex, Map<CommitteeIndex, MatchingDataAttestationGroup>>());
Expand All @@ -96,8 +106,8 @@ export class AggregatedAttestationPool {
getAttestationCount(): {attestationCount: number; attestationDataCount: number} {
let attestationCount = 0;
let attestationDataCount = 0;
for (const attestationGroupByIndexByData of this.attestationGroupByIndexByDataHashBySlot.values()) {
for (const attestationGroupByIndex of attestationGroupByIndexByData.values()) {
for (const attestationGroupByIndexByDataHex of this.attestationGroupByIndexByDataHexBySlot.values()) {
for (const attestationGroupByIndex of attestationGroupByIndexByDataHex.values()) {
attestationDataCount += attestationGroupByIndex.size;
for (const attestationGroup of attestationGroupByIndex.values()) {
attestationCount += attestationGroup.getAttestationCount();
Expand All @@ -121,16 +131,20 @@ export class AggregatedAttestationPool {
return InsertOutcome.Old;
}

const attestationGroupByIndexByDataHash = this.attestationGroupByIndexByDataHashBySlot.getOrDefault(slot);
const attestationGroupByIndexByDataHash = this.attestationGroupByIndexByDataHexBySlot.getOrDefault(slot);
let attestationGroupByIndex = attestationGroupByIndexByDataHash.get(dataRootHex);
if (!attestationGroupByIndex) {
attestationGroupByIndex = new Map<CommitteeIndex, MatchingDataAttestationGroup>();
attestationGroupByIndexByDataHash.set(dataRootHex, attestationGroupByIndex);
}
const committeeIndex = isElectraAttestation(attestation)
? // this attestation is added to pool after validation
(attestation.committeeBits.getSingleTrueBit() as number)
attestation.committeeBits.getSingleTrueBit()
: attestation.data.index;
if (committeeIndex === null) {
// this should not happen because attestation should be validated before reaching this
throw Error(`Invalid attestation slot=${slot} committeeIndex=${committeeIndex}`);
}
let attestationGroup = attestationGroupByIndex.get(committeeIndex);
if (!attestationGroup) {
attestationGroup = new MatchingDataAttestationGroup(committee, attestation.data);
Expand All @@ -146,7 +160,7 @@ export class AggregatedAttestationPool {
/** Remove attestations which are too old to be included in a block. */
prune(clockSlot: Slot): void {
// Only retain SLOTS_PER_EPOCH slots
pruneBySlot(this.attestationGroupByIndexByDataHashBySlot, clockSlot, SLOTS_PER_EPOCH);
pruneBySlot(this.attestationGroupByIndexByDataHexBySlot, clockSlot, SLOTS_PER_EPOCH);
this.lowestPermissibleSlot = Math.max(clockSlot - SLOTS_PER_EPOCH, 0);
}

Expand All @@ -156,9 +170,9 @@ export class AggregatedAttestationPool {
state: CachedBeaconStateAllForks
): allForks.Attestation[] {
const forkSeq = ForkSeq[fork];
return forkSeq < ForkSeq.electra
? this.getAttestationsForBlockPreElectra(fork, forkChoice, state)
: this.getAttestationsForBlockElectra(fork, forkChoice, state);
return forkSeq >= ForkSeq.electra
? this.getAttestationsForBlockElectra(fork, forkChoice, state)
: this.getAttestationsForBlockPreElectra(fork, forkChoice, state);
}

/**
Expand All @@ -178,12 +192,12 @@ export class AggregatedAttestationPool {

const attestationsByScore: AttestationWithScore[] = [];

const slots = Array.from(this.attestationGroupByIndexByDataHashBySlot.keys()).sort((a, b) => b - a);
const slots = Array.from(this.attestationGroupByIndexByDataHexBySlot.keys()).sort((a, b) => b - a);
let minScore = Number.MAX_SAFE_INTEGER;
let slotCount = 0;
slot: for (const slot of slots) {
slotCount++;
const attestationGroupByIndexByDataHash = this.attestationGroupByIndexByDataHashBySlot.get(slot);
const attestationGroupByIndexByDataHash = this.attestationGroupByIndexByDataHexBySlot.get(slot);
// should not happen
if (!attestationGroupByIndexByDataHash) {
throw Error(`No aggregated attestation pool for slot=${slot}`);
Expand Down Expand Up @@ -285,13 +299,13 @@ export class AggregatedAttestationPool {
const notSeenValidatorsFn = getNotSeenValidatorsFn(state);
const validateAttestationDataFn = getValidateAttestationDataFn(forkChoice, state);

const slots = Array.from(this.attestationGroupByIndexByDataHashBySlot.keys()).sort((a, b) => b - a);
const slots = Array.from(this.attestationGroupByIndexByDataHexBySlot.keys()).sort((a, b) => b - a);
const consolidations: AttestationsConsolidation[] = [];
let minScore = Number.MAX_SAFE_INTEGER;
let slotCount = 0;
slot: for (const slot of slots) {
slotCount++;
const attestationGroupByIndexByDataHash = this.attestationGroupByIndexByDataHashBySlot.get(slot);
const attestationGroupByIndexByDataHash = this.attestationGroupByIndexByDataHexBySlot.get(slot);
// should not happen
if (!attestationGroupByIndexByDataHash) {
throw Error(`No aggregated attestation pool for slot=${slot}`);
Expand All @@ -313,7 +327,7 @@ export class AggregatedAttestationPool {
// att01 --- - --- att21 --- 1 (att 01 __ 21)
// - --- - --- att22 --- 2 (att __ __ 22)
for (const attestationGroupByIndex of attestationGroupByIndexByDataHash.values()) {
// sameAttDataCons could be up to MAX_ATTESTATIONS_PER_GROUP
// sameAttDataCons could be up to MAX_ATTESTATIONS_PER_GROUP_ELECTRA
const sameAttDataCons: AttestationsConsolidation[] = [];
for (const [committeeIndex, attestationGroup] of attestationGroupByIndex.entries()) {
const notSeenAttestingIndices = notSeenValidatorsFn(epoch, slot, committeeIndex);
Expand Down Expand Up @@ -389,11 +403,11 @@ export class AggregatedAttestationPool {
getAll(bySlot?: Slot): allForks.Attestation[] {
let attestationGroupsArr: Map<CommitteeIndex, MatchingDataAttestationGroup>[];
if (bySlot === undefined) {
attestationGroupsArr = Array.from(this.attestationGroupByIndexByDataHashBySlot.values()).flatMap((byIndex) =>
attestationGroupsArr = Array.from(this.attestationGroupByIndexByDataHexBySlot.values()).flatMap((byIndex) =>
Array.from(byIndex.values())
);
} else {
const attestationGroupsByIndex = this.attestationGroupByIndexByDataHashBySlot.get(bySlot);
const attestationGroupsByIndex = this.attestationGroupByIndexByDataHexBySlot.get(bySlot);
if (!attestationGroupsByIndex) throw Error(`No attestations for slot ${bySlot}`);
attestationGroupsArr = Array.from(attestationGroupsByIndex.values());
}
Expand Down Expand Up @@ -514,12 +528,11 @@ export class MatchingDataAttestationGroup {
}
}

if (attestations.length <= MAX_ATTESTATIONS_PER_GROUP) {
const maxAttestation = forkSeq >= ForkSeq.electra ? MAX_ATTESTATIONS_PER_GROUP_ELECTRA : MAX_ATTESTATIONS_PER_GROUP;
if (attestations.length <= maxAttestation) {
return attestations;
} else {
return attestations
.sort((a, b) => b.notSeenAttesterCount - a.notSeenAttesterCount)
.slice(0, MAX_ATTESTATIONS_PER_GROUP);
return attestations.sort((a, b) => b.notSeenAttesterCount - a.notSeenAttesterCount).slice(0, maxAttestation);
}
}

Expand Down Expand Up @@ -594,9 +607,7 @@ export function getNotSeenValidatorsFn(state: CachedBeaconStateAllForks): GetNot
if (participants === null) {
return null;
}
const shuffling = state.epochCtx.getShufflingAtEpoch(epoch);
const slotCommittees = shuffling.committees[slot % SLOTS_PER_EPOCH];
const committee = slotCommittees[committeeIndex];
const committee = state.epochCtx.getBeaconCommittee(slot, committeeIndex);

const notSeenAttestingIndices = new Set<number>();
for (const [i, validatorIndex] of committee.entries()) {
Expand Down Expand Up @@ -635,10 +646,7 @@ export function getNotSeenValidatorsFn(state: CachedBeaconStateAllForks): GetNot
return notSeenAttestingIndices.size === 0 ? null : notSeenAttestingIndices;
}

const shuffling = state.epochCtx.getShufflingAtEpoch(epoch);
const slotCommittees = shuffling.committees[slot % SLOTS_PER_EPOCH];
const committee = slotCommittees[committeeIndex];

const committee = state.epochCtx.getBeaconCommittee(slot, committeeIndex);
notSeenAttestingIndices = new Set<number>();
for (const [i, validatorIndex] of committee.entries()) {
// no need to check flagIsTimelySource as if validator is not seen, it's participation status is 0
Expand Down

0 comments on commit 0df33bf

Please sign in to comment.