Skip to content

Commit

Permalink
subnets subscription revamp (#7341)
Browse files Browse the repository at this point in the history
* attestations subnets revamp
  • Loading branch information
mehdi-aouadi committed Aug 3, 2023
1 parent 567271f commit 1703fa0
Show file tree
Hide file tree
Showing 51 changed files with 561 additions and 505 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ For information on changes in released versions of Teku, see the [releases page]

- Upgrading the minimum Java version to Java 17, which means users will need to upgrade their Java installation to at least `Java 17`. The docker versions relying on `jdk16` will no longer be published, so docker users explicitly referencing the `jdk16` build need to update their package to reference `jdk17`, as tags `develop-jdk16`, `develop-jdk16-arm64`, `latest-jdk16` will no longer be updated.
- Users who make heavy use of API calls to fetch non finalized states data other than head may wish to adjust the states-cache if they see excessive `regeneration of state` messages. This can be accomplished via the `--Xstore-state-cache-size`, which previously defaulted to 160.
- The Development option `--Xp2p-minimum-subnet-subscriptions` has been removed and will no longer be recognised as a command line option.

### Additions and Improvements

- Introduce `--exchange-capabilities-monitoring-enabled` parameter. If enabled, EL will be queried periodically for the Engine API methods it supports. If incompatibility is detected, a warning is raised in the logs. The default is `true`.
- Add support for [Lukso network](https://lukso.network/) `--network=lukso`
- The development option `--Xfork-choice-update-head-on-block-import-enabled` was changed to default to `false` to ensure fork-choice is run when new blocks arrive.
- The default state-cache size has been changed to 8 (previously 160), and there is now an epoch-states-cache, which defaults to a maximum of 6 elements.
- Update attestation subnet subscriptions strategy according to [the spec changes](https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#attestation-subnet-subscription). All nodes (including non-validating ones) will subscribe to 2 subnets regardless of the number of validators

### Bug Fixes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.ethereum.events.SlotEventsChannel;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.networking.eth2.gossip.subnets.StableSubnetSubscriber;
import tech.pegasys.teku.spec.Spec;

public class ActiveValidatorTracker implements SlotEventsChannel {
Expand All @@ -34,11 +33,7 @@ public class ActiveValidatorTracker implements SlotEventsChannel {
private final NavigableMap<UInt64, Set<Integer>> validatorsPerEpoch =
new ConcurrentSkipListMap<>();

private final StableSubnetSubscriber stableSubnetSubscriber;

public ActiveValidatorTracker(
final StableSubnetSubscriber stableSubnetSubscriber, final Spec spec) {
this.stableSubnetSubscriber = stableSubnetSubscriber;
public ActiveValidatorTracker(final Spec spec) {
this.spec = spec;
}

Expand All @@ -54,7 +49,6 @@ public void onSlot(final UInt64 slot) {
final UInt64 epoch = spec.computeEpochAtSlot(slot);
final int validatorCount = getNumberOfValidatorsForEpoch(epoch);
LOG.debug("{} active validators counted for epoch {}", validatorCount, epoch);
stableSubnetSubscriber.onSlot(slot, validatorCount);

// PerformanceTracker uses validator counts to determine expected attestation count.
// Thus we wait ATTESTATION_INCLUSION_RANGE epochs, after which the performance is determined,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,17 @@

package tech.pegasys.teku.validator.coordinator;

import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static tech.pegasys.teku.validator.coordinator.performance.DefaultPerformanceTracker.ATTESTATION_INCLUSION_RANGE;

import org.junit.jupiter.api.Test;
import org.mockito.InOrder;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.networking.eth2.gossip.subnets.StableSubnetSubscriber;
import tech.pegasys.teku.networking.eth2.gossip.subnets.ValidatorBasedStableSubnetSubscriber;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.TestSpecFactory;

class ActiveValidatorTrackerTest {
private final Spec spec = TestSpecFactory.createMinimalPhase0();
private final StableSubnetSubscriber stableSubnetSubscriber =
mock(ValidatorBasedStableSubnetSubscriber.class);

private final ActiveValidatorTracker tracker =
new ActiveValidatorTracker(stableSubnetSubscriber, spec);
private final ActiveValidatorTracker tracker = new ActiveValidatorTracker(spec);

@Test
void shouldUpdateValidatorCountAtStartOfEpoch() {
Expand All @@ -44,9 +35,6 @@ void shouldUpdateValidatorCountAtStartOfEpoch() {

final UInt64 epochStartSlot = spec.computeStartSlotAtEpoch(epoch);
tracker.onSlot(epochStartSlot);

final InOrder inOrder = inOrder(stableSubnetSubscriber);
inOrder.verify(stableSubnetSubscriber).onSlot(epochStartSlot, 3);
}

@Test
Expand All @@ -59,9 +47,6 @@ void shouldNotCountDuplicateValidators() {

final UInt64 epochStartSlot = spec.computeStartSlotAtEpoch(epoch);
tracker.onSlot(epochStartSlot);

final InOrder inOrder = inOrder(stableSubnetSubscriber);
inOrder.verify(stableSubnetSubscriber).onSlot(epochStartSlot, 1);
}

@Test
Expand All @@ -79,10 +64,6 @@ void shouldPruneValidatorCountsAtTheEndOfAttestationInclusionRangeEpochs() {
// For the purpose of testing, we get the slots out of order, so all the requests get dropped
tracker.onSlot(afterInclusionRangeStartSlot);
tracker.onSlot(epochStartSlot);

// And both slot updates wind up setting 0 validators
verify(stableSubnetSubscriber).onSlot(afterInclusionRangeStartSlot, 0);
verify(stableSubnetSubscriber).onSlot(epochStartSlot, 0);
}

@Test
Expand All @@ -100,8 +81,5 @@ void shouldNotPruneBeforeTheEndOfAttestationInclusionRangeEpochs() {
// For the purpose of testing, we get the slots out of order, to see if the requests get dropped
tracker.onSlot(rightBeforeInclusionRangeStartSlot);
tracker.onSlot(epochStartSlot);

// And both slot updates wind up setting 3 validators
verify(stableSubnetSubscriber).onSlot(epochStartSlot, 3);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public Map<String, String> getConfigMap() {

configAttributes.put("BLS_WITHDRAWAL_PREFIX", getBlsWithdrawalPrefix().toHexString());
configAttributes.put("TARGET_AGGREGATORS_PER_COMMITTEE", getTargetAggregatorsPerCommittee());
configAttributes.put("RANDOM_SUBNETS_PER_VALIDATOR", getRandomSubnetsPerValidator());
configAttributes.put("SUBNETS_PER_NODE", getSubnetsPerNode());
configAttributes.put(
"EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION", getEpochsPerRandomSubnetSubscription());
Expand Down Expand Up @@ -106,10 +105,6 @@ private String getMaxRequestBlocks() {
return Integer.toString(specConfig.getNetworkingConfig().getMaxRequestBlocks());
}

private String getRandomSubnetsPerValidator() {
return Integer.toString(ValidatorConstants.RANDOM_SUBNETS_PER_VALIDATOR);
}

private String getTtfbTimeout() {
return Integer.toString(specConfig.getNetworkingConfig().getTtfbTimeout());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ SAFE_SLOTS_TO_UPDATE_JUSTIFIED: 2
ETH1_FOLLOW_DISTANCE: 16
# 2**4 (= 16)
TARGET_AGGREGATORS_PER_COMMITTEE: 16
# 2**0 (= 1)
RANDOM_SUBNETS_PER_VALIDATOR: 1
# 2**8 (= 256)
EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION: 256
# 14 (estimate from Eth1 mainnet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public class SpecConfigReader {
// Phase0 constants which may exist in legacy config files, but should now be ignored
"BLS_WITHDRAWAL_PREFIX",
"TARGET_AGGREGATORS_PER_COMMITTEE",
"RANDOM_SUBNETS_PER_VALIDATOR",
"EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION",
"DOMAIN_BEACON_PROPOSER",
"DOMAIN_BEACON_ATTESTER",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ public class NetworkConstants {
public static final int SYNC_COMMITTEE_SUBNET_COUNT = 4;
public static final int INTERVALS_PER_SLOT = 3;
public static final int DEFAULT_SAFE_SLOTS_TO_IMPORT_OPTIMISTICALLY = 128;
public static final int NODE_ID_BITS = 256;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
public class ValidatorConstants {
// Phase0
public static final int TARGET_AGGREGATORS_PER_COMMITTEE = 16;
public static final int RANDOM_SUBNETS_PER_VALIDATOR = 1;
// Altair
public static final int TARGET_AGGREGATORS_PER_SYNC_SUBCOMMITTEE = 16;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
package tech.pegasys.teku.spec.datastructures.validator;

import com.google.common.base.MoreObjects;
import java.util.Comparator;
import java.util.Objects;
import org.jetbrains.annotations.NotNull;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;

public class SubnetSubscription {
public class SubnetSubscription implements Comparable<SubnetSubscription> {

private final int subnetId;
private final UInt64 unsubscriptionSlot;
Expand All @@ -35,6 +37,13 @@ public UInt64 getUnsubscriptionSlot() {
return unsubscriptionSlot;
}

@Override
public int compareTo(@NotNull final SubnetSubscription o) {
return Comparator.comparing(SubnetSubscription::getUnsubscriptionSlot)
.thenComparing(SubnetSubscription::getSubnetId)
.compare(this, o);
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,21 @@
import it.unimi.dsi.fastutil.ints.IntList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.apache.tuweni.units.bigints.UInt256;
import tech.pegasys.teku.infrastructure.bytes.Bytes4;
import tech.pegasys.teku.infrastructure.crypto.Hash;
import tech.pegasys.teku.infrastructure.crypto.Sha256;
import tech.pegasys.teku.infrastructure.ssz.Merkleizable;
import tech.pegasys.teku.infrastructure.ssz.collections.SszByteVector;
import tech.pegasys.teku.infrastructure.ssz.primitive.SszUInt64;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.kzg.KZGCommitment;
import tech.pegasys.teku.spec.config.SpecConfig;
import tech.pegasys.teku.spec.constants.NetworkConstants;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlock;
import tech.pegasys.teku.spec.datastructures.state.ForkData;
Expand Down Expand Up @@ -186,6 +191,61 @@ private IntList computeCommitteeShuffle(
.subList(fromIndex, toIndex);
}

public List<UInt64> computeSubscribedSubnets(final UInt256 nodeId, final UInt64 epoch) {
return IntStream.range(0, specConfig.getNetworkingConfig().getSubnetsPerNode())
.mapToObj(index -> computeSubscribedSubnet(nodeId, epoch, index))
.collect(Collectors.toList());
}

private UInt64 computeSubscribedSubnet(
final UInt256 nodeId, final UInt64 epoch, final int index) {

final int nodeIdPrefix =
nodeId
.shiftRight(
NetworkConstants.NODE_ID_BITS
- specConfig.getNetworkingConfig().getAttestationSubnetPrefixBits())
.intValue();

final UInt64 nodeOffset =
UInt64.valueOf(
nodeId.mod(specConfig.getNetworkingConfig().getEpochsPerSubnetSubscription()).toLong());

final Bytes32 permutationSeed =
Hash.sha256(
uint64ToBytes(
epoch
.plus(nodeOffset)
.dividedBy(specConfig.getNetworkingConfig().getEpochsPerSubnetSubscription())));

final int permutedPrefix =
computeShuffledIndex(
nodeIdPrefix,
1 << specConfig.getNetworkingConfig().getAttestationSubnetPrefixBits(),
permutationSeed);

return UInt64.valueOf(
(permutedPrefix + index) % specConfig.getNetworkingConfig().getAttestationSubnetCount());
}

public UInt64 calculateNodeSubnetUnsubscriptionSlot(
final UInt256 nodeId, final UInt64 currentSlot) {
final int epochsPerSubnetSubscription =
specConfig.getNetworkingConfig().getEpochsPerSubnetSubscription();
final UInt64 nodeOffset = UInt64.valueOf(nodeId.mod(epochsPerSubnetSubscription).toLong());
final UInt64 currentEpoch = computeEpochAtSlot(currentSlot);
final UInt64 currentEpochRemainder = currentEpoch.mod(epochsPerSubnetSubscription);
UInt64 nextPeriodEpoch =
currentEpoch
.plus(epochsPerSubnetSubscription)
.minus(currentEpochRemainder)
.minus(nodeOffset);
if (nextPeriodEpoch.isLessThanOrEqualTo(currentEpoch)) {
nextPeriodEpoch = nextPeriodEpoch.plus(epochsPerSubnetSubscription);
}
return computeStartSlotAtEpoch(nextPeriodEpoch);
}

IntList shuffleList(IntList input, Bytes32 seed) {
final int[] indices = input.toIntArray();
shuffleList(indices, seed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ SAFE_SLOTS_TO_UPDATE_JUSTIFIED: 8
ETH1_FOLLOW_DISTANCE: 1024
# 2**4 (= 16)
TARGET_AGGREGATORS_PER_COMMITTEE: 16
# 2**0 (= 1)
RANDOM_SUBNETS_PER_VALIDATOR: 1
# 2**8 (= 256)
EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION: 256
# 6 (estimate from xDai mainnet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
import static org.mockito.Mockito.when;

import it.unimi.dsi.fastutil.ints.IntList;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.tuweni.bytes.Bytes32;
import org.apache.tuweni.units.bigints.UInt256;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -168,6 +170,37 @@ public void isSlotAtNthEpochBoundary_invalidNParameter_negative() {
.hasMessageContaining("Parameter n must be greater than 0");
}

@ParameterizedTest
@MethodSource("provideSubnetsForNodeIds")
public void testDiscoveryNodeBasedSubnetIds(
final UInt256 nodeId, final UInt64 epoch, List<UInt64> subnetIds) {
final List<UInt64> nodeSubnetIds = miscHelpers.computeSubscribedSubnets(nodeId, epoch);
assertThat(nodeSubnetIds).hasSize(subnetIds.size());
assertThat(nodeSubnetIds).isEqualTo(subnetIds);
}

@ParameterizedTest
@MethodSource("provideNodeIdsAndSlots")
public void unsubscriptionEpochMustMatchSubnetsCalculationResultChange(
final UInt256 nodeId, final UInt64 slotAtEpoch) {
for (int epoch = 0; epoch < 1000; epoch++) {
final List<UInt64> currentSubnets =
miscHelpers.computeSubscribedSubnets(nodeId, UInt64.valueOf(epoch));
final List<UInt64> nextSubnets =
miscHelpers.computeSubscribedSubnets(nodeId, UInt64.valueOf(epoch + 1));
final UInt64 currentSlot =
miscHelpers.computeStartSlotAtEpoch(UInt64.valueOf(epoch)).plus(slotAtEpoch);
final UInt64 unsubscriptionSlot =
miscHelpers.calculateNodeSubnetUnsubscriptionSlot(nodeId, currentSlot);
final UInt64 unsubscriptionEpoch = miscHelpers.computeEpochAtSlot(unsubscriptionSlot);
if (!currentSubnets.equals(nextSubnets)) {
assertThat(unsubscriptionEpoch).isEqualTo(UInt64.valueOf(epoch + 1));
} else {
assertThat(unsubscriptionEpoch.isGreaterThan(epoch)).isTrue();
}
}
}

@ParameterizedTest
@MethodSource("getComputesSlotAtTimeArguments")
public void computesSlotAtTime(final long currentTime, final UInt64 expectedSlot) {
Expand Down Expand Up @@ -221,6 +254,35 @@ public static Stream<Arguments> getNValues() {
Arguments.of(1), Arguments.of(2), Arguments.of(3), Arguments.of(4), Arguments.of(5));
}

public static Stream<Arguments> provideSubnetsForNodeIds() {
return Stream.of(
Arguments.of(
UInt256.valueOf(434726285098L),
UInt64.valueOf(6717051035888874875L),
List.of(UInt64.valueOf(28), UInt64.valueOf(29))),
Arguments.of(
UInt256.valueOf(288055627580L),
UInt64.valueOf("13392352527348795112"),
List.of(UInt64.valueOf(8), UInt64.valueOf(9))),
Arguments.of(
UInt256.valueOf(
new BigInteger(
"57467522110468688239177851250859789869070302005900722885377252304169193209346")),
UInt64.valueOf(6226203858325459337L),
List.of(UInt64.valueOf(44), UInt64.valueOf(45))));
}

public static Stream<Arguments> provideNodeIdsAndSlots() {
return Stream.of(
Arguments.of(UInt256.valueOf(434726285098L), UInt64.valueOf(0)),
Arguments.of(UInt256.valueOf(288055627580L), UInt64.valueOf(5)),
Arguments.of(
UInt256.valueOf(
new BigInteger(
"57467522110468688239177851250859789869070302005900722885377252304169193209346")),
UInt64.valueOf(7)));
}

public static Stream<Arguments> getCommitteeComputationArguments() {
return Stream.of(Arguments.of(2_100_000, 1024), Arguments.of(1_049_088, 2047));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
"PROPORTIONAL_SLASHING_MULTIPLIER_ALTAIR": "2",
"PROPOSER_REWARD_QUOTIENT": "8",
"PROPOSER_SCORE_BOOST": "70",
"RANDOM_SUBNETS_PER_VALIDATOR": "1",
"SAFE_SLOTS_TO_UPDATE_JUSTIFIED": "8",
"SECONDS_PER_ETH1_BLOCK": "14",
"SECONDS_PER_SLOT": "12",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
"PROPORTIONAL_SLASHING_MULTIPLIER_BELLATRIX": "3",
"PROPOSER_REWARD_QUOTIENT": "8",
"PROPOSER_SCORE_BOOST": "70",
"RANDOM_SUBNETS_PER_VALIDATOR": "1",
"SAFE_SLOTS_TO_UPDATE_JUSTIFIED": "8",
"SECONDS_PER_ETH1_BLOCK": "14",
"SECONDS_PER_SLOT": "12",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ SAFE_SLOTS_TO_UPDATE_JUSTIFIED: 8
ETH1_FOLLOW_DISTANCE: 2048
# 2**4 (= 16)
TARGET_AGGREGATORS_PER_COMMITTEE: 16
# 2**0 (= 1)
RANDOM_SUBNETS_PER_VALIDATOR: 1
# 2**8 (= 256)
EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION: 256
# 14 (estimate from Eth1 mainnet)
Expand Down
Loading

0 comments on commit 1703fa0

Please sign in to comment.