Skip to content

Commit

Permalink
Validator
Browse files Browse the repository at this point in the history
  • Loading branch information
ensi321 committed May 6, 2024
1 parent 28279a8 commit 4f8fdcf
Show file tree
Hide file tree
Showing 13 changed files with 156 additions and 43 deletions.
4 changes: 2 additions & 2 deletions packages/api/src/beacon/routes/beacon/pool.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {phase0, altair, capella, CommitteeIndex, Slot, ssz} from "@lodestar/types";
import {phase0, altair, capella, CommitteeIndex, Slot, ssz, allForks} from "@lodestar/types";
import {ApiClientResponse} from "../../../interfaces.js";
import {HttpStatusCode} from "../../../utils/client/httpStatusCode.js";
import {
Expand Down Expand Up @@ -80,7 +80,7 @@ export type Api = {
* @throws ApiError
*/
submitPoolAttestations(
attestations: phase0.Attestation[]
attestations: allForks.Attestation[]
): Promise<ApiClientResponse<{[HttpStatusCode.OK]: void}, HttpStatusCode.BAD_REQUEST>>;

/**
Expand Down
6 changes: 3 additions & 3 deletions packages/api/src/beacon/routes/validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ export type Api = {
slot: Slot
): Promise<
ApiClientResponse<
{[HttpStatusCode.OK]: {data: phase0.Attestation}},
{[HttpStatusCode.OK]: {data: allForks.Attestation; version: ForkName}},
HttpStatusCode.BAD_REQUEST | HttpStatusCode.NOT_FOUND
>
>;
Expand All @@ -354,7 +354,7 @@ export type Api = {
* @throws ApiError
*/
publishAggregateAndProofs(
signedAggregateAndProofs: phase0.SignedAggregateAndProof[]
signedAggregateAndProofs: allForks.SignedAggregateAndProof[] // TODO Electra: Add version
): Promise<ApiClientResponse<{[HttpStatusCode.OK]: void}, HttpStatusCode.BAD_REQUEST>>;

publishContributionAndProofs(
Expand Down Expand Up @@ -786,7 +786,7 @@ export function getReturnTypes(): ReturnTypes<Api> {

produceAttestationData: ContainerData(ssz.phase0.AttestationData),
produceSyncCommitteeContribution: ContainerData(ssz.altair.SyncCommitteeContribution),
getAggregatedAttestation: ContainerData(ssz.phase0.Attestation),
getAggregatedAttestation: WithVersion((fork) => ssz.allForks[fork].Attestation),
submitBeaconCommitteeSelections: ContainerData(ArrayOf(BeaconCommitteeSelection)),
submitSyncCommitteeSelections: ContainerData(ArrayOf(SyncCommitteeSelection)),
getLiveness: jsonType("snake"),
Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/api/impl/beacon/pool/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {routes, ServerApi} from "@lodestar/api";
import {Epoch, ssz} from "@lodestar/types";
import {SYNC_COMMITTEE_SUBNET_SIZE} from "@lodestar/params";
import {ForkName, SYNC_COMMITTEE_SUBNET_SIZE} from "@lodestar/params";
import {validateApiAttestation} from "../../../../chain/validation/index.js";
import {validateApiAttesterSlashing} from "../../../../chain/validation/attesterSlashing.js";
import {validateApiProposerSlashing} from "../../../../chain/validation/proposerSlashing.js";
Expand Down Expand Up @@ -77,7 +77,7 @@ export function getBeaconPoolApi({
metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome});
}

chain.emitter.emit(routes.events.EventType.attestation, attestation);
chain.emitter.emit(routes.events.EventType.attestation, {data: attestation, version: ForkName.phase0});

const sentPeers = await network.publishBeaconAttestation(attestation, subnet);
metrics?.onPoolSubmitUnaggregatedAttestation(seenTimestampSec, indexedAttestation, subnet, sentPeers);
Expand Down
4 changes: 3 additions & 1 deletion packages/beacon-node/src/api/impl/validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,7 @@ export function getValidatorApi({
const attEpoch = computeEpochAtSlot(slot);
const headBlockRootHex = chain.forkChoice.getHead().blockRoot;
const headBlockRoot = fromHexString(headBlockRootHex);
const fork = config.getForkSeq(slot);

const beaconBlockRoot =
slot >= headSlot
Expand Down Expand Up @@ -846,7 +847,7 @@ export function getValidatorApi({
return {
data: {
slot,
index: committeeIndex,
index: fork >= ForkSeq.electra ? 0 : committeeIndex,
beaconBlockRoot,
source: attEpochState.currentJustifiedCheckpoint,
target: {epoch: attEpoch, root: targetRoot},
Expand Down Expand Up @@ -1078,6 +1079,7 @@ export function getValidatorApi({

return {
data: aggregate,
version: config.getForkName(slot),
};
},

Expand Down
7 changes: 5 additions & 2 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export async function importBlock(
const prevFinalizedEpoch = this.forkChoice.getFinalizedCheckpoint().epoch;
const blockDelaySec = (fullyVerifiedBlock.seenTimestampSec - postState.genesisTime) % this.config.SECONDS_PER_SLOT;
const recvToValLatency = Date.now() / 1000 - (opts.seenTimestampSec ?? Date.now() / 1000);
const fork = postState.config.getForkSeq(blockSlot);
const fork = this.config.getForkSeq(blockSlot);

// this is just a type assertion since blockinput with blobsPromise type will not end up here
if (blockInput.type === BlockInputType.blobsPromise) {
Expand Down Expand Up @@ -425,7 +425,10 @@ export async function importBlock(
}
if (this.emitter.listenerCount(routes.events.EventType.attestation)) {
for (const attestation of block.message.body.attestations) {
this.emitter.emit(routes.events.EventType.attestation, attestation);
this.emitter.emit(routes.events.EventType.attestation, {
version: this.config.getForkName(blockSlot),
data: attestation,
});
}
}
if (this.emitter.listenerCount(routes.events.EventType.attesterSlashing)) {
Expand Down
57 changes: 46 additions & 11 deletions packages/beacon-node/src/chain/opPools/aggregatedAttestationPool.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import bls from "@chainsafe/bls";
import {toHexString} from "@chainsafe/ssz";
import {ForkName, ForkSeq, MAX_ATTESTATIONS, MIN_ATTESTATION_INCLUSION_DELAY, SLOTS_PER_EPOCH} from "@lodestar/params";
import {phase0, Epoch, Slot, ssz, ValidatorIndex, RootHex, allForks} from "@lodestar/types";
import {
ForkName,
ForkSeq,
MAX_ATTESTATIONS,
MAX_ATTESTATIONS_ELECTRA,
MIN_ATTESTATION_INCLUSION_DELAY,
SLOTS_PER_EPOCH,
} from "@lodestar/params";
import {phase0, Epoch, Slot, ssz, ValidatorIndex, RootHex, allForks, electra} from "@lodestar/types";
import {
CachedBeaconStateAllForks,
CachedBeaconStatePhase0,
Expand Down Expand Up @@ -39,7 +46,7 @@ type ValidateAttestationDataFn = (attData: phase0.AttestationData) => boolean;
const MAX_RETAINED_ATTESTATIONS_PER_GROUP = 4;

/**
* On mainnet, each slot has 64 committees, and each block has 128 attestations max so in average
* On mainnet, each slot has 64 committees, and each block has 128 (8 in electra) attestations max so in average
* we get 2 attestation per groups.
* Starting from Jan 2024, we have a performance issue getting attestations for a block. Based on the
* fact that lot of groups will have only 1 attestation since it's full of participation increase this number
Expand Down Expand Up @@ -220,16 +227,34 @@ export class AggregatedAttestationPool {
}
}

const sortedAttestationsByScore = attestationsByScore.sort((a, b) => b.score - a.score);
const attestationsForBlock: allForks.Attestation[] = [];
for (const [i, attestationWithScore] of sortedAttestationsByScore.entries()) {
if (i >= MAX_ATTESTATIONS) {
break;
if (ForkSeq[fork] >= ForkSeq.electra) {
// In Electra, we further pack attestations with same attestationData from different committee
const sortedAttestationsByScore = this.aggregateAttestationsByScore(attestationsByScore).sort(
(a, b) => b.score - a.score
);
const attestationsForBlock: electra.Attestation[] = [];
for (const [i, attestationWithScore] of sortedAttestationsByScore.entries()) {
if (i >= MAX_ATTESTATIONS_ELECTRA) {
break;
}
// attestations could be modified in this op pool, so we need to clone for block
attestationsForBlock.push(
ssz.electra.Attestation.clone(attestationWithScore.attestation as electra.Attestation)
);
}
return attestationsForBlock;
} else {
const sortedAttestationsByScore = attestationsByScore.sort((a, b) => b.score - a.score);
const attestationsForBlock: phase0.Attestation[] = [];
for (const [i, attestationWithScore] of sortedAttestationsByScore.entries()) {
if (i >= MAX_ATTESTATIONS) {
break;
}
// attestations could be modified in this op pool, so we need to clone for block
attestationsForBlock.push(ssz.phase0.Attestation.clone(attestationWithScore.attestation));
}
// attestations could be modified in this op pool, so we need to clone for block
attestationsForBlock.push(ssz.allForks[fork].Attestation.clone(attestationWithScore.attestation));
return attestationsForBlock;
}
return attestationsForBlock;
}

/**
Expand All @@ -256,6 +281,16 @@ export class AggregatedAttestationPool {
}
return attestations;
}

/**
* Electra and after: Block proposer consolidates attestations with the same
* attestation data from different committee into a single attestation
* https://github.com/ethereum/consensus-specs/blob/aba6345776aa876dad368cab27fbbb23fae20455/specs/_features/eip7549/validator.md?plain=1#L39
* TODO Electra: implement this or consider other strategy
*/
private aggregateAttestationsByScore(attestationsByScore: AttestationWithScore[]): AttestationWithScore[] {
return attestationsByScore;
}
}

interface AttestationWithIndex {
Expand Down
54 changes: 48 additions & 6 deletions packages/beacon-node/src/chain/opPools/attestationPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@ const SLOTS_RETAINED = 3;
*/
const MAX_ATTESTATIONS_PER_SLOT = 16_384;

type AggregateFast = {
type AggregateFastPhase0 = {
data: allForks.Attestation["data"];
aggregationBits: BitArray;
signature: Signature;
};

type AggregateFastElectra = AggregateFastPhase0 & {committeeBits: BitArray};

type AggregateFast = AggregateFastPhase0 | AggregateFastElectra;

/** Hex string of DataRoot `TODO` */
type DataRootHex = string;

Expand Down Expand Up @@ -186,6 +190,26 @@ function aggregateAttestationInto(aggregate: AggregateFast, attestation: allFork
throw Error("Invalid attestation not exactly one bit set");
}

if ("committeeBits" in attestation && !("committeeBits" in aggregate)) {
throw Error("Attempt to aggregate electra attestation into phase0 attestation");
}

if (!("committeeBits" in attestation) && "committeeBits" in aggregate) {
throw Error("Attempt to aggregate phase0 attestation into electra attestation");
}

if ("committeeBits" in attestation) {
// We assume attestation.committeeBits should already be validated in api and gossip handler and should be non-null
const attestationCommitteeIndex = attestation.committeeBits.getSingleTrueBit();
const aggregateCommitteeIndex = (aggregate as AggregateFastElectra).committeeBits.getSingleTrueBit();

if (attestationCommitteeIndex !== aggregateCommitteeIndex) {
throw Error(
`Committee index mismatched: attestation ${attestationCommitteeIndex} aggregate ${aggregateCommitteeIndex}`
);
}
}

if (aggregate.aggregationBits.get(bitIndex) === true) {
return InsertOutcome.AlreadyKnown;
}
Expand All @@ -202,6 +226,15 @@ function aggregateAttestationInto(aggregate: AggregateFast, attestation: allFork
* Format `contribution` into an efficient `aggregate` to add more contributions in with aggregateContributionInto()
*/
function attestationToAggregate(attestation: allForks.Attestation): AggregateFast {
if ("committeeBits" in attestation) {
return {
data: attestation.data,
// clone because it will be mutated
aggregationBits: attestation.aggregationBits.clone(),
committeeBits: attestation.committeeBits,
signature: signatureFromBytesNoCheck(attestation.signature),
};
}
return {
data: attestation.data,
// clone because it will be mutated
Expand All @@ -214,9 +247,18 @@ function attestationToAggregate(attestation: allForks.Attestation): AggregateFas
* Unwrap AggregateFast to phase0.Attestation
*/
function fastToAttestation(aggFast: AggregateFast): allForks.Attestation {
return {
data: aggFast.data,
aggregationBits: aggFast.aggregationBits,
signature: aggFast.signature.toBytes(PointFormat.compressed),
};
if ("committeeBits" in aggFast) {
return {
data: aggFast.data,
aggregationBits: aggFast.aggregationBits,
committeeBits: aggFast.committeeBits,
signature: aggFast.signature.toBytes(PointFormat.compressed),
};
} else {
return {
data: aggFast.data,
aggregationBits: aggFast.aggregationBits,
signature: aggFast.signature.toBytes(PointFormat.compressed),
};
}
}
15 changes: 11 additions & 4 deletions packages/beacon-node/src/network/processor/gossipHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,11 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
validationResult = await validateGossipAggregateAndProof(fork, chain, signedAggregateAndProof, serializedData);
} catch (e) {
if (e instanceof AttestationError && e.action === GossipAction.REJECT) {
chain.persistInvalidSszValue(ssz.allForks[fork].SignedAggregateAndProof, signedAggregateAndProof, "gossip_reject");
chain.persistInvalidSszValue(
ssz.allForks[fork].SignedAggregateAndProof,
signedAggregateAndProof,
"gossip_reject"
);
}
throw e;
}
Expand Down Expand Up @@ -451,7 +455,10 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
}
}

chain.emitter.emit(routes.events.EventType.attestation, signedAggregateAndProof.message.aggregate);
chain.emitter.emit(routes.events.EventType.attestation, {
version: fork,
data: signedAggregateAndProof.message.aggregate,
});
},
[GossipType.beacon_attestation]: async ({
gossipData,
Expand Down Expand Up @@ -503,7 +510,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
}
}

chain.emitter.emit(routes.events.EventType.attestation, attestation);
chain.emitter.emit(routes.events.EventType.attestation, {version: fork, data: attestation});
},

[GossipType.attester_slashing]: async ({
Expand Down Expand Up @@ -711,7 +718,7 @@ function getBatchHandlers(modules: ValidatorFnsModules, options: GossipHandlerOp
}
}

chain.emitter.emit(routes.events.EventType.attestation, attestation);
chain.emitter.emit(routes.events.EventType.attestation, {version: fork, data: attestation});
}

if (batchableBls) {
Expand Down
1 change: 1 addition & 0 deletions packages/types/src/electra/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ export type LightClientFinalityUpdate = ValueOf<typeof ssz.LightClientFinalityUp
export type LightClientOptimisticUpdate = ValueOf<typeof ssz.LightClientOptimisticUpdate>;
export type LightClientStore = ValueOf<typeof ssz.LightClientStore>;

export type AggregateAndProof = ValueOf<typeof ssz.AggregateAndProof>;
export type SignedAggregateAndProof = ValueOf<typeof ssz.SignedAggregateAndProof>;
6 changes: 4 additions & 2 deletions packages/validator/src/services/attestation.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import {toHexString} from "@chainsafe/ssz";
import {BLSSignature, phase0, Slot, ssz} from "@lodestar/types";
import {allForks, BLSSignature, phase0, Slot, ssz} from "@lodestar/types";
import {computeEpochAtSlot, isAggregatorFromCommitteeLength} from "@lodestar/state-transition";
import {sleep} from "@lodestar/utils";
import {Api, ApiError, routes} from "@lodestar/api";
import {ChainForkConfig} from "@lodestar/config";
import {IClock, LoggerVc} from "../util/index.js";
import {PubkeyHex} from "../types.js";
import {Metrics} from "../metrics.js";
Expand Down Expand Up @@ -41,6 +42,7 @@ export class AttestationService {
private readonly emitter: ValidatorEventEmitter,
chainHeadTracker: ChainHeaderTracker,
private readonly metrics: Metrics | null,
private readonly config: ChainForkConfig,
private readonly opts?: AttestationServiceOpts
) {
this.dutiesService = new AttestationDutiesService(logger, api, clock, validatorStore, chainHeadTracker, metrics, {
Expand Down Expand Up @@ -263,7 +265,7 @@ export class AttestationService {
const aggregate = res.response;
this.metrics?.numParticipantsInAggregate.observe(aggregate.data.aggregationBits.getTrueBitIndexes().length);

const signedAggregateAndProofs: phase0.SignedAggregateAndProof[] = [];
const signedAggregateAndProofs: allForks.SignedAggregateAndProof[] = [];

await Promise.all(
duties.map(async ({duty, selectionProof}) => {
Expand Down
Loading

0 comments on commit 4f8fdcf

Please sign in to comment.