Skip to content

Commit

Permalink
Merge 9f0b670 into da696c0
Browse files Browse the repository at this point in the history
  • Loading branch information
wemeetagain committed May 2, 2023
2 parents da696c0 + 9f0b670 commit 0e4d80f
Show file tree
Hide file tree
Showing 11 changed files with 249 additions and 196 deletions.
2 changes: 1 addition & 1 deletion packages/beacon-node/src/api/impl/beacon/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ export function getBeaconBlockApi({
await promiseAllMaybeAsync([
// Send the block, regardless of whether or not it is valid. The API
// specification is very clear that this is the desired behaviour.
() => network.publishBeaconBlockMaybeBlobs(blockForImport),
() => network.gossip.publishBeaconBlockMaybeBlobs(blockForImport) as Promise<unknown>,

() =>
chain.processBlock(blockForImport, opts).catch((e) => {
Expand Down
9 changes: 7 additions & 2 deletions packages/beacon-node/src/api/impl/beacon/pool/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,13 @@ export function getBeaconPoolApi({
const insertOutcome = chain.attestationPool.add(attestation, attDataRootHex);
metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome});
}
const sentPeers = await network.gossip.publishBeaconAttestation(attestation, subnet);
metrics?.submitUnaggregatedAttestation(seenTimestampSec, indexedAttestation, subnet, sentPeers);
const result = await network.gossip.publishBeaconAttestation(attestation, subnet);
metrics?.submitUnaggregatedAttestation(
seenTimestampSec,
indexedAttestation,
subnet,
result.recipients.length
);
} catch (e) {
errors.push(e as Error);
logger.error(
Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/api/impl/validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -544,8 +544,8 @@ export function getValidatorApi({
indexedAttestation.attestingIndices.length,
committeeIndices
);
const sentPeers = await network.gossip.publishBeaconAggregateAndProof(signedAggregateAndProof);
metrics?.submitAggregatedAttestation(seenTimestampSec, indexedAttestation, sentPeers);
const result = await network.gossip.publishBeaconAggregateAndProof(signedAggregateAndProof);
metrics?.submitAggregatedAttestation(seenTimestampSec, indexedAttestation, result.recipients.length);
} catch (e) {
if (e instanceof AttestationError && e.type.code === AttestationErrorCode.AGGREGATOR_ALREADY_KNOWN) {
logger.debug("Ignoring known signedAggregateAndProof");
Expand Down
144 changes: 6 additions & 138 deletions packages/beacon-node/src/network/gossip/gossipsub.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
import {PeerId} from "@libp2p/interface-peer-id";
import {TopicValidatorResult} from "@libp2p/interface-pubsub";
import {GossipSub, GossipsubEvents} from "@chainsafe/libp2p-gossipsub";
import {PublishOpts, SignaturePolicy, TopicStr} from "@chainsafe/libp2p-gossipsub/types";
import {SignaturePolicy, TopicStr} from "@chainsafe/libp2p-gossipsub/types";
import {PeerScore, PeerScoreParams} from "@chainsafe/libp2p-gossipsub/score";
import {MetricsRegister, TopicLabel, TopicStrToLabel} from "@chainsafe/libp2p-gossipsub/metrics";
import {BeaconConfig} from "@lodestar/config";
import {ATTESTATION_SUBNET_COUNT, ForkName, SYNC_COMMITTEE_SUBNET_COUNT} from "@lodestar/params";
import {allForks, altair, phase0, capella, deneb} from "@lodestar/types";
import {Logger, Map2d, Map2dArr} from "@lodestar/utils";
import {computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {upgradeLightClientFinalityUpdate, upgradeLightClientOptimisticUpdate} from "@lodestar/light-client";

import {NetworkCoreMetrics} from "../core/metrics.js";
import {Eth2Context} from "../../chain/index.js";
Expand All @@ -18,8 +15,8 @@ import {ClientKind} from "../peers/client.js";
import {GOSSIP_MAX_SIZE, GOSSIP_MAX_SIZE_BELLATRIX} from "../../constants/network.js";
import {Libp2p} from "../interface.js";
import {NetworkEvent, NetworkEventBus} from "../events.js";
import {GossipBeaconNode, GossipTopic, GossipTopicMap, GossipType, GossipTypeMap} from "./interface.js";
import {getGossipSSZType, GossipTopicCache, stringifyGossipTopic, getCoreTopicsAtFork} from "./topic.js";
import {GossipTopic, GossipType} from "./interface.js";
import {GossipTopicCache, stringifyGossipTopic, getCoreTopicsAtFork} from "./topic.js";
import {DataTransformSnappy, fastMsgIdFn, msgIdFn, msgIdToStrFn} from "./encoding.js";

import {
Expand Down Expand Up @@ -68,7 +65,7 @@ export type Eth2GossipsubOpts = {
*
* See https://github.com/ethereum/consensus-specs/blob/v1.1.10/specs/phase0/p2p-interface.md#the-gossip-domain-gossipsub
*/
export class Eth2Gossipsub extends GossipSub implements GossipBeaconNode {
export class Eth2Gossipsub extends GossipSub {
readonly scoreParams: Partial<PeerScoreParams>;
private readonly config: BeaconConfig;
private readonly logger: Logger;
Expand Down Expand Up @@ -146,28 +143,11 @@ export class Eth2Gossipsub extends GossipSub implements GossipBeaconNode {
}
}

/**
* Publish a `GossipObject` on a `GossipTopic`
*/
async publishObject<K extends GossipType>(
topic: GossipTopicMap[K],
object: GossipTypeMap[K],
opts?: PublishOpts | undefined
): Promise<number> {
const topicStr = this.getGossipTopicString(topic);
const sszType = getGossipSSZType(topic);
const messageData = (sszType.serialize as (object: GossipTypeMap[GossipType]) => Uint8Array)(object);
const result = await this.publish(topicStr, messageData, opts);
const sentPeers = result.recipients.length;
this.logger.verbose("Publish to topic", {topic: topicStr, sentPeers});
return sentPeers;
}

/**
* Subscribe to a `GossipTopic`
*/
subscribeTopic(topic: GossipTopic): void {
const topicStr = this.getGossipTopicString(topic);
const topicStr = stringifyGossipTopic(this.config, topic);
// Register known topicStr
this.gossipTopicCache.setTopic(topicStr, topic);

Expand All @@ -179,123 +159,11 @@ export class Eth2Gossipsub extends GossipSub implements GossipBeaconNode {
* Unsubscribe to a `GossipTopic`
*/
unsubscribeTopic(topic: GossipTopic): void {
const topicStr = this.getGossipTopicString(topic);
const topicStr = stringifyGossipTopic(this.config, topic);
this.logger.verbose("Unsubscribe to gossipsub topic", {topic: topicStr});
this.unsubscribe(topicStr);
}

async publishBeaconBlock(signedBlock: allForks.SignedBeaconBlock): Promise<void> {
const fork = this.config.getForkName(signedBlock.message.slot);
await this.publishObject<GossipType.beacon_block>({type: GossipType.beacon_block, fork}, signedBlock, {
ignoreDuplicatePublishError: true,
});
}

async publishSignedBeaconBlockAndBlobsSidecar(item: deneb.SignedBeaconBlockAndBlobsSidecar): Promise<void> {
const fork = this.config.getForkName(item.beaconBlock.message.slot);
await this.publishObject<GossipType.beacon_block_and_blobs_sidecar>(
{type: GossipType.beacon_block_and_blobs_sidecar, fork},
item,
{ignoreDuplicatePublishError: true}
);
}

async publishBeaconAggregateAndProof(aggregateAndProof: phase0.SignedAggregateAndProof): Promise<number> {
const fork = this.config.getForkName(aggregateAndProof.message.aggregate.data.slot);
return this.publishObject<GossipType.beacon_aggregate_and_proof>(
{type: GossipType.beacon_aggregate_and_proof, fork},
aggregateAndProof,
{ignoreDuplicatePublishError: true}
);
}

async publishBeaconAttestation(attestation: phase0.Attestation, subnet: number): Promise<number> {
const fork = this.config.getForkName(attestation.data.slot);
return this.publishObject<GossipType.beacon_attestation>(
{type: GossipType.beacon_attestation, fork, subnet},
attestation,
{ignoreDuplicatePublishError: true}
);
}

async publishVoluntaryExit(voluntaryExit: phase0.SignedVoluntaryExit): Promise<void> {
const fork = this.config.getForkName(computeStartSlotAtEpoch(voluntaryExit.message.epoch));
await this.publishObject<GossipType.voluntary_exit>({type: GossipType.voluntary_exit, fork}, voluntaryExit, {
ignoreDuplicatePublishError: true,
});
}

async publishBlsToExecutionChange(blsToExecutionChange: capella.SignedBLSToExecutionChange): Promise<void> {
const fork = ForkName.capella;
await this.publishObject<GossipType.bls_to_execution_change>(
{type: GossipType.bls_to_execution_change, fork},
blsToExecutionChange,
{ignoreDuplicatePublishError: true}
);
}

async publishProposerSlashing(proposerSlashing: phase0.ProposerSlashing): Promise<void> {
const fork = this.config.getForkName(Number(proposerSlashing.signedHeader1.message.slot as bigint));
await this.publishObject<GossipType.proposer_slashing>(
{type: GossipType.proposer_slashing, fork},
proposerSlashing
);
}

async publishAttesterSlashing(attesterSlashing: phase0.AttesterSlashing): Promise<void> {
const fork = this.config.getForkName(Number(attesterSlashing.attestation1.data.slot as bigint));
await this.publishObject<GossipType.attester_slashing>(
{type: GossipType.attester_slashing, fork},
attesterSlashing
);
}

async publishSyncCommitteeSignature(signature: altair.SyncCommitteeMessage, subnet: number): Promise<void> {
const fork = this.config.getForkName(signature.slot);
await this.publishObject<GossipType.sync_committee>({type: GossipType.sync_committee, fork, subnet}, signature, {
ignoreDuplicatePublishError: true,
});
}

async publishContributionAndProof(contributionAndProof: altair.SignedContributionAndProof): Promise<void> {
const fork = this.config.getForkName(contributionAndProof.message.contribution.slot);
await this.publishObject<GossipType.sync_committee_contribution_and_proof>(
{type: GossipType.sync_committee_contribution_and_proof, fork},
contributionAndProof,
{ignoreDuplicatePublishError: true}
);
}

async publishLightClientFinalityUpdate(lightClientFinalityUpdate: allForks.LightClientFinalityUpdate): Promise<void> {
const fork = this.config.getForkName(lightClientFinalityUpdate.signatureSlot);
const attestedFork = this.config.getForkName(lightClientFinalityUpdate.attestedHeader.beacon.slot);
if (attestedFork !== fork) {
lightClientFinalityUpdate = upgradeLightClientFinalityUpdate(this.config, fork, lightClientFinalityUpdate);
}
await this.publishObject<GossipType.light_client_finality_update>(
{type: GossipType.light_client_finality_update, fork},
lightClientFinalityUpdate
);
}

async publishLightClientOptimisticUpdate(
lightClientOptimisitcUpdate: allForks.LightClientOptimisticUpdate
): Promise<void> {
const fork = this.config.getForkName(lightClientOptimisitcUpdate.signatureSlot);
const attestedFork = this.config.getForkName(lightClientOptimisitcUpdate.attestedHeader.beacon.slot);
if (attestedFork !== fork) {
lightClientOptimisitcUpdate = upgradeLightClientOptimisticUpdate(this.config, fork, lightClientOptimisitcUpdate);
}
await this.publishObject<GossipType.light_client_optimistic_update>(
{type: GossipType.light_client_optimistic_update, fork},
lightClientOptimisitcUpdate
);
}

private getGossipTopicString(topic: GossipTopic): string {
return stringifyGossipTopic(this.config, topic);
}

private onScrapeLodestarMetrics(metrics: NetworkCoreMetrics): void {
const mesh = this["mesh"] as Map<string, Set<string>>;
const topics = this["topics"] as Map<string, Set<string>>;
Expand Down
34 changes: 20 additions & 14 deletions packages/beacon-node/src/network/gossip/interface.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {EventEmitter} from "events";
import {Libp2p} from "libp2p";
import {Message, TopicValidatorResult} from "@libp2p/interface-pubsub";
import {Message, PublishResult, TopicValidatorResult} from "@libp2p/interface-pubsub";
import StrictEventEmitter from "strict-event-emitter-types";
import {PeerIdStr} from "@chainsafe/libp2p-gossipsub/types";
import {ForkName} from "@lodestar/params";
Expand All @@ -10,6 +10,7 @@ import {Logger} from "@lodestar/utils";
import {IBeaconChain} from "../../chain/index.js";
import {NetworkEvent} from "../events.js";
import {JobItemQueue} from "../../util/queue/index.js";
import {BlockInput} from "../../chain/blocks/types.js";

export enum GossipType {
beacon_block = "beacon_block",
Expand Down Expand Up @@ -126,19 +127,24 @@ export type GossipModules = {
chain: IBeaconChain;
};

export type GossipBeaconNode = {
publishBeaconBlock(signedBlock: allForks.SignedBeaconBlock): Promise<void>;
publishSignedBeaconBlockAndBlobsSidecar(item: deneb.SignedBeaconBlockAndBlobsSidecar): Promise<void>;
publishBeaconAggregateAndProof(aggregateAndProof: phase0.SignedAggregateAndProof): Promise<number>;
publishBeaconAttestation(attestation: phase0.Attestation, subnet: number): Promise<number>;
publishVoluntaryExit(voluntaryExit: phase0.SignedVoluntaryExit): Promise<void>;
publishBlsToExecutionChange(blsToExecutionChange: capella.SignedBLSToExecutionChange): Promise<void>;
publishProposerSlashing(proposerSlashing: phase0.ProposerSlashing): Promise<void>;
publishAttesterSlashing(attesterSlashing: phase0.AttesterSlashing): Promise<void>;
publishSyncCommitteeSignature(signature: altair.SyncCommitteeMessage, subnet: number): Promise<void>;
publishContributionAndProof(contributionAndProof: altair.SignedContributionAndProof): Promise<void>;
publishLightClientFinalityUpdate(lightClientFinalityUpdate: allForks.LightClientFinalityUpdate): Promise<void>;
publishLightClientOptimisticUpdate(lightClientOptimisitcUpdate: allForks.LightClientOptimisticUpdate): Promise<void>;
export type PublisherBeaconNode = {
publishBeaconBlockMaybeBlobs(signedBlock: BlockInput): Promise<PublishResult>;
publishBeaconBlock(signedBlock: allForks.SignedBeaconBlock): Promise<PublishResult>;
publishSignedBeaconBlockAndBlobsSidecar(item: deneb.SignedBeaconBlockAndBlobsSidecar): Promise<PublishResult>;
publishBeaconAggregateAndProof(aggregateAndProof: phase0.SignedAggregateAndProof): Promise<PublishResult>;
publishBeaconAttestation(attestation: phase0.Attestation, subnet: number): Promise<PublishResult>;
publishVoluntaryExit(voluntaryExit: phase0.SignedVoluntaryExit): Promise<PublishResult>;
publishBlsToExecutionChange(blsToExecutionChange: capella.SignedBLSToExecutionChange): Promise<PublishResult>;
publishProposerSlashing(proposerSlashing: phase0.ProposerSlashing): Promise<PublishResult>;
publishAttesterSlashing(attesterSlashing: phase0.AttesterSlashing): Promise<PublishResult>;
publishSyncCommitteeSignature(signature: altair.SyncCommitteeMessage, subnet: number): Promise<PublishResult>;
publishContributionAndProof(contributionAndProof: altair.SignedContributionAndProof): Promise<PublishResult>;
publishLightClientFinalityUpdate(
lightClientFinalityUpdate: allForks.LightClientFinalityUpdate
): Promise<PublishResult>;
publishLightClientOptimisticUpdate(
lightClientOptimisticUpdate: allForks.LightClientOptimisticUpdate
): Promise<PublishResult>;
};

/**
Expand Down

0 comments on commit 0e4d80f

Please sign in to comment.