Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore sync committee contributions that are a subset of ones we've already seen #5440

Merged
merged 8 commits into from
May 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ For information on changes in released versions of Teku, see the [releases page]
- Added `is_optimistic` field to `/eth/v1/node/syncing` response.
- Using execution engine endpoint as Eth1 endpoint when latter is not provided.
- Check `Eth1Address` checksum ([EIP-55](https://eips.ethereum.org/EIPS/eip-55)) if address is mixed-case.
- Ignore aggregate attestation gossip that does not include any new validators.
- Ignore aggregate attestation and sync contribution gossip that does not include any new validators.

### Bug Fixes
- Added stricter limits on attestation pool size.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class SyncCommitteeGossipAcceptanceTest extends AcceptanceTestBase {
private TekuNode primaryNode;
private TekuNode secondaryNode;
private TekuValidatorNode validatorClient;
private TekuNode watcherNode;

@BeforeEach
public void setup() {
Expand All @@ -43,7 +44,6 @@ public void setup() {
createTekuNode(
config ->
configureNode(config, genesisTime)
.withLogging("debug")
.withInteropValidators(0, 0)
.withPeers(primaryNode));
validatorClient =
Expand All @@ -53,27 +53,30 @@ public void setup() {
.withNetwork(network)
.withInteropValidators(NODE_VALIDATORS, NODE_VALIDATORS)
.withBeaconNode(secondaryNode));

// Use a third node to watch for published aggregates.
watcherNode =
createTekuNode(
config ->
configureNode(config, genesisTime)
.withPeers(primaryNode, secondaryNode)
.withInteropValidators(0, 0));
}

@Test
public void shouldContainSyncCommitteeAggregates() throws Exception {
primaryNode.start();
primaryNode.startEventListener(List.of(EventType.contribution_and_proof));

secondaryNode.start();
secondaryNode.startEventListener(List.of(EventType.contribution_and_proof));
validatorClient.start();
watcherNode.start();
watcherNode.startEventListener(List.of(EventType.contribution_and_proof));

primaryNode.waitForEpoch(1);

// primary node has validators 1-7, expect gossip from aggregators 8-15 coming through
primaryNode.waitForContributionAndProofEvent(
proof -> proof.message.aggregatorIndex.isGreaterThanOrEqualTo(8));

// secondary node has remote validators 8-15, expect gossip from aggregators 1-7
secondaryNode.waitForContributionAndProofEvent(
proof -> proof.message.aggregatorIndex.isLessThan(8));
// Wait until we get a contribution over gossip. The watcher node doesn't run any validators.
watcherNode.waitForContributionAndProofEvent();

// And make sure that the contributions get combined properly into a full aggregate in the block
secondaryNode.waitForFullSyncCommitteeAggregate();
validatorClient.stop();
secondaryNode.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ public void startEventListener(final List<EventType> eventTypes) {
maybeEventStreamListener = Optional.of(new EventStreamListener(getEventUrl(eventTypes)));
}

public void waitForContributionAndProofEvent() {
waitForContributionAndProofEvent(proof -> true);
}

public void waitForContributionAndProofEvent(
final Predicate<SignedContributionAndProof> condition) {
waitFor(
Expand Down Expand Up @@ -343,13 +347,11 @@ public void waitForFullSyncCommitteeAggregate() {
actualSyncBitCount == syncCommitteeSize
? 1.0
: actualSyncBitCount / (double) syncCommitteeSize;
if (percentageOfBitsSet < 1.0) {
LOG.debug(
String.format(
"Sync committee bits are only %s%% full, expecting %s%%: %s",
percentageOfBitsSet * 100, 100, syncCommitteeBits));
}
assertThat(percentageOfBitsSet >= 1.0).isTrue();
assertThat(percentageOfBitsSet >= 1.0)
.overridingErrorMessage(
"Sync committee bits are only %s%% full, expecting %s%%: %s",
percentageOfBitsSet * 100, 100, syncCommitteeBits)
.isTrue();
},
5,
MINUTES);
Expand Down Expand Up @@ -742,7 +744,7 @@ public Config withRealNetwork() {

public Config withPeers(final TekuNode... nodes) {
final String peers =
Arrays.stream(nodes).map(TekuNode::getMultiAddr).collect(Collectors.joining(", "));
Arrays.stream(nodes).map(TekuNode::getMultiAddr).collect(Collectors.joining(","));
LOG.debug("Set peers: {}", peers);
configMap.put("p2p-static-peers", peers);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,13 @@ public SignedContributionAndProofTestBuilder createValidSignedContributionAndPro

public SignedContributionAndProofTestBuilder createValidSignedContributionAndProofBuilder(
final UInt64 slot, final Bytes32 beaconBlockRoot) {
return createValidSignedContributionAndProofBuilder(slot, beaconBlockRoot, Optional.empty());
}

public SignedContributionAndProofTestBuilder createValidSignedContributionAndProofBuilder(
final UInt64 slot,
final Bytes32 beaconBlockRoot,
final Optional<Integer> requiredSubcommittee) {
final SyncCommitteeUtil syncCommitteeUtil = spec.getSyncCommitteeUtilRequired(slot);
final SignedBlockAndState latestBlockAndState = getLatestBlockAndState();
final UInt64 epoch = syncCommitteeUtil.getEpochForDutiesAtSlot(slot);
Expand All @@ -450,6 +457,9 @@ public SignedContributionAndProofTestBuilder createValidSignedContributionAndPro
final Signer signer = getSigner(validatorIndex.intValue());
final SyncSubcommitteeAssignments assignments = entry.getValue();
for (int subcommitteeIndex : assignments.getAssignedSubcommittees()) {
if (requiredSubcommittee.isPresent() && requiredSubcommittee.get() != subcommitteeIndex) {
continue;
}
final SyncAggregatorSelectionData syncAggregatorSelectionData =
syncCommitteeUtil.createSyncAggregatorSelectionData(
slot, UInt64.valueOf(subcommitteeIndex));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.IntFunction;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.bls.BLS;
Expand Down Expand Up @@ -167,6 +168,21 @@ public SignedContributionAndProofTestBuilder addParticipant(
return this;
}

public SignedContributionAndProofTestBuilder addAllParticipants(
final Function<UInt64, Signer> getSigner) {
final Map<UInt64, SyncSubcommitteeAssignments> syncSubcommittees =
syncCommitteeUtil.getSyncSubcommittees(
state, syncCommitteeUtil.getEpochForDutiesAtSlot(slot));
removeAllParticipants();
syncSubcommittees.forEach(
(validatorIndex, assignments) -> {
if (assignments.getAssignedSubcommittees().contains(subcommitteeIndex)) {
addParticipant(validatorIndex, getSigner.apply(validatorIndex));
}
});
return this;
}

public SignedContributionAndProofTestBuilder resetParticipantsToOnlyAggregator() {
syncSignatures.clear();
subcommitteeParticipationIndices.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ public class Constants {
// Target 2 different attestation data (aggregators normally agree) for two slots
public static final int VALID_ATTESTATION_DATA_SET_SIZE = 2 * 64 * 2;
public static final int VALID_VALIDATOR_SET_SIZE = 10000;
public static final int VALID_CONTRIBUTION_AND_PROOF_SET_SIZE = 10000;
public static final int VALID_SYNC_COMMITTEE_MESSAGE_SET_SIZE = 10000;
// Only need to maintain a cache for the current slot, so just needs to be as large as the
// sync committee size.
public static final int VALID_CONTRIBUTION_AND_PROOF_SET_SIZE = 512;
public static final int VALID_SYNC_COMMITTEE_MESSAGE_SET_SIZE = 512;
Comment on lines +40 to +41
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it still breaks, maybe increase this to be large enough for 2 slots.. i have vague memories...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be all good now. Having that too low would make it more likely to pass as we'd process more messages than we need to. But for sync committee messages I'm pretty confident there's no need to keep two slots since we ignore anything not from the current slot anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, all good now.


public static final Duration ETH1_INDIVIDUAL_BLOCK_RETRY_TIMEOUT = Duration.ofMillis(500);
public static final Duration ETH1_DEPOSIT_REQUEST_RETRY_TIMEOUT = Duration.ofSeconds(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static tech.pegasys.teku.spec.config.Constants.VALID_CONTRIBUTION_AND_PROOF_SET_SIZE;
import static tech.pegasys.teku.spec.constants.NetworkConstants.SYNC_COMMITTEE_SUBNET_COUNT;
import static tech.pegasys.teku.spec.constants.ValidatorConstants.TARGET_AGGREGATORS_PER_SYNC_SUBCOMMITTEE;
import static tech.pegasys.teku.statetransition.validation.InternalValidationResult.ACCEPT;
import static tech.pegasys.teku.statetransition.validation.InternalValidationResult.IGNORE;

Expand All @@ -27,6 +28,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.bls.BLSPublicKey;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.collections.LimitedSet;
Expand All @@ -44,15 +46,19 @@
import tech.pegasys.teku.spec.logic.common.util.AsyncBLSSignatureVerifier;
import tech.pegasys.teku.spec.logic.common.util.AsyncBatchBLSSignatureVerifier;
import tech.pegasys.teku.spec.logic.common.util.SyncCommitteeUtil;
import tech.pegasys.teku.statetransition.util.SeenAggregatesCache;
import tech.pegasys.teku.statetransition.validation.InternalValidationResult;
import tech.pegasys.teku.statetransition.validation.ValidationResultCode;
import tech.pegasys.teku.storage.client.RecentChainData;

public class SignedContributionAndProofValidator {
private static final Logger LOG = LogManager.getLogger();
private final Spec spec;
private final Set<UniquenessKey> seenIndices =
private final Set<SourceUniquenessKey> seenIndices =
LimitedSet.create(VALID_CONTRIBUTION_AND_PROOF_SET_SIZE);
private final SeenAggregatesCache<TargetUniquenessKey> seenAggregatesCache =
new SeenAggregatesCache<>(
VALID_CONTRIBUTION_AND_PROOF_SET_SIZE, TARGET_AGGREGATORS_PER_SYNC_SUBCOMMITTEE);
private final SyncCommitteeStateUtils syncCommitteeStateUtils;
private final AsyncBLSSignatureVerifier signatureVerifier;
private final SyncCommitteeCurrentSlotUtil slotUtil;
Expand All @@ -66,7 +72,7 @@ public SignedContributionAndProofValidator(
this.spec = spec;
this.syncCommitteeStateUtils = syncCommitteeStateUtils;
this.signatureVerifier = signatureVerifier;
slotUtil = new SyncCommitteeCurrentSlotUtil(recentChainData, spec, timeProvider);
this.slotUtil = new SyncCommitteeCurrentSlotUtil(recentChainData, spec, timeProvider);
}

public SafeFuture<InternalValidationResult> validate(final SignedContributionAndProof proof) {
Expand All @@ -77,8 +83,17 @@ public SafeFuture<InternalValidationResult> validate(final SignedContributionAnd
// aggregator with index contribution_and_proof.aggregator_index for the slot contribution.slot.
// (this requires maintaining a cache of size `SYNC_COMMITTEE_SIZE` for this topic that can be
// flushed after each slot).
final UniquenessKey uniquenessKey = getUniquenessKey(contributionAndProof, contribution);
if (seenIndices.contains(uniquenessKey)) {
final SourceUniquenessKey sourceUniquenessKey =
getUniquenessKey(contributionAndProof, contribution);
if (seenIndices.contains(sourceUniquenessKey)) {
return SafeFuture.completedFuture(IGNORE);
}
final TargetUniquenessKey targetUniquenessKey =
new TargetUniquenessKey(
contribution.getSlot(),
contribution.getBeaconBlockRoot(),
contribution.getSubcommitteeIndex());
if (seenAggregatesCache.isAlreadySeen(targetUniquenessKey, contribution.getAggregationBits())) {
return SafeFuture.completedFuture(IGNORE);
}

Expand Down Expand Up @@ -124,7 +139,8 @@ public SafeFuture<InternalValidationResult> validate(final SignedContributionAnd
contributionAndProof,
contribution,
syncCommitteeUtil,
uniquenessKey,
sourceUniquenessKey,
targetUniquenessKey,
maybeState.get());
});
}
Expand All @@ -147,7 +163,8 @@ private SafeFuture<InternalValidationResult> validateWithState(
final ContributionAndProof contributionAndProof,
final SyncCommitteeContribution contribution,
final SyncCommitteeUtil syncCommitteeUtil,
final UniquenessKey uniquenessKey,
final SourceUniquenessKey sourceUniquenessKey,
final TargetUniquenessKey targetUniquenessKey,
final BeaconStateAltair state) {
final BeaconStateAccessors beaconStateAccessors =
spec.atSlot(contribution.getSlot()).beaconStateAccessors();
Expand Down Expand Up @@ -251,11 +268,15 @@ private SafeFuture<InternalValidationResult> validateWithState(
contribution.getSignature());
}

if (!seenIndices.add(uniquenessKey)) {
if (!seenIndices.add(sourceUniquenessKey)) {
// Got added by another thread while we were validating it
return IGNORE;
}

if (!seenAggregatesCache.add(
targetUniquenessKey, contribution.getAggregationBits())) {
return IGNORE;
}
return ACCEPT;
});
}
Expand Down Expand Up @@ -284,20 +305,20 @@ private boolean isInSyncSubcommittee(
.contains(contribution.getSubcommitteeIndex().intValue());
}

private UniquenessKey getUniquenessKey(
private SourceUniquenessKey getUniquenessKey(
final ContributionAndProof contributionAndProof, SyncCommitteeContribution contribution) {
return new UniquenessKey(
return new SourceUniquenessKey(
contributionAndProof.getAggregatorIndex(),
contribution.getSlot(),
contribution.getSubcommitteeIndex());
}

private static class UniquenessKey {
private static class SourceUniquenessKey {
private final UInt64 aggregatorIndex;
private final UInt64 slot;
private final UInt64 subcommitteeIndex;

private UniquenessKey(
private SourceUniquenessKey(
final UInt64 aggregatorIndex, final UInt64 slot, final UInt64 subcommitteeIndex) {
this.aggregatorIndex = aggregatorIndex;
this.slot = slot;
Expand All @@ -312,7 +333,7 @@ public boolean equals(final Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
final UniquenessKey that = (UniquenessKey) o;
final SourceUniquenessKey that = (SourceUniquenessKey) o;
return Objects.equals(aggregatorIndex, that.aggregatorIndex)
&& Objects.equals(slot, that.slot)
&& Objects.equals(subcommitteeIndex, that.subcommitteeIndex);
Expand All @@ -323,4 +344,36 @@ public int hashCode() {
return Objects.hash(aggregatorIndex, slot, subcommitteeIndex);
}
}

private static class TargetUniquenessKey {
private final UInt64 slot;
private final Bytes32 blockRoot;
private final UInt64 subcommiteeIndex;

private TargetUniquenessKey(
final UInt64 slot, final Bytes32 blockRoot, final UInt64 subcommiteeIndex) {
this.slot = slot;
this.blockRoot = blockRoot;
this.subcommiteeIndex = subcommiteeIndex;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final TargetUniquenessKey that = (TargetUniquenessKey) o;
return Objects.equals(slot, that.slot)
&& Objects.equals(blockRoot, that.blockRoot)
&& Objects.equals(subcommiteeIndex, that.subcommiteeIndex);
}

@Override
public int hashCode() {
return Objects.hash(slot, blockRoot, subcommiteeIndex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,22 @@
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.infrastructure.collections.LimitedMap;
import tech.pegasys.teku.infrastructure.collections.LimitedSet;
import tech.pegasys.teku.infrastructure.ssz.collections.SszBitlist;
import tech.pegasys.teku.infrastructure.ssz.collections.SszBitSet;

public class SeenAggregatesCache {
public class SeenAggregatesCache<KeyT> {

private final Map<Bytes32, Set<SszBitlist>> seenAggregationBitsByDataRoot;
private final Map<KeyT, Set<SszBitSet>> seenAggregationBitsByDataRoot;
private final int aggregateSetSize;

public SeenAggregatesCache(final int rootCacheSize, final int aggregateSetSize) {
this.seenAggregationBitsByDataRoot = LimitedMap.create(rootCacheSize);
this.aggregateSetSize = aggregateSetSize;
}

public boolean add(final Bytes32 root, final SszBitlist aggregationBits) {
final Set<SszBitlist> seenBitlists =
public boolean add(final KeyT root, final SszBitSet aggregationBits) {
final Set<SszBitSet> seenBitlists =
seenAggregationBitsByDataRoot.computeIfAbsent(
root,
// Aim to hold all aggregation bits but have a limit for safety.
Expand All @@ -44,14 +43,14 @@ public boolean add(final Bytes32 root, final SszBitlist aggregationBits) {
return seenBitlists.add(aggregationBits);
}

public boolean isAlreadySeen(final Bytes32 root, final SszBitlist aggregationBits) {
final Set<SszBitlist> seenAggregates =
public boolean isAlreadySeen(final KeyT root, final SszBitSet aggregationBits) {
final Set<SszBitSet> seenAggregates =
seenAggregationBitsByDataRoot.getOrDefault(root, Collections.emptySet());
return isAlreadySeen(seenAggregates, aggregationBits);
}

private boolean isAlreadySeen(
final Set<SszBitlist> seenAggregates, final SszBitlist aggregationBits) {
final Set<SszBitSet> seenAggregates, final SszBitSet aggregationBits) {
return seenAggregates.stream().anyMatch(seen -> seen.isSuperSetOf(aggregationBits));
}
}
Loading