Skip to content

Commit

Permalink
post-merge sync and peering fix (#4116)
Browse files Browse the repository at this point in the history
* add a merge-specific definiton of bestPeer and the supporting plumbing

Signed-off-by: garyschulte <garyschulte@gmail.com>

* set reached TTD when finishing a fast sync if appropriate

Signed-off-by: garyschulte <garyschulte@gmail.com>

* spdx header

Signed-off-by: garyschulte <garyschulte@gmail.com>

* fix BetterSyncTargetEvaluatorTest tests

Signed-off-by: garyschulte <garyschulte@gmail.com>
  • Loading branch information
garyschulte authored Jul 18, 2022
1 parent f885c46 commit e48b73b
Show file tree
Hide file tree
Showing 12 changed files with 238 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.hyperledger.besu.consensus.merge.MergeContext;
import org.hyperledger.besu.consensus.merge.MergeProtocolSchedule;
import org.hyperledger.besu.consensus.merge.PostMergeContext;
import org.hyperledger.besu.consensus.merge.TransitionBestPeerComparator;
import org.hyperledger.besu.consensus.merge.blockcreation.MergeCoordinator;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
Expand Down Expand Up @@ -91,13 +92,22 @@ protected EthProtocolManager createEthProtocolManager(
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter) {

var mergeContext = protocolContext.getConsensusContext(MergeContext.class);

var mergeBestPeerComparator =
new TransitionBestPeerComparator(
configOptionsSupplier
.get()
.getTerminalTotalDifficulty()
.map(Difficulty::of)
.orElseThrow());
ethPeers.setBestChainComparator(mergeBestPeerComparator);
mergeContext.observeNewIsPostMergeState(mergeBestPeerComparator);

if (mergePeerFilter.isPresent()) {
protocolContext
.getConsensusContext(MergeContext.class)
.observeNewIsPostMergeState(mergePeerFilter.get());
protocolContext
.getConsensusContext(MergeContext.class)
.addNewForkchoiceMessageListener(mergePeerFilter.get());

mergeContext.observeNewIsPostMergeState(mergePeerFilter.get());
mergeContext.addNewForkchoiceMessageListener(mergePeerFilter.get());
}

EthProtocolManager ethProtocolManager =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.PrivacyParameters;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncContext;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
Expand Down Expand Up @@ -119,6 +125,31 @@ protected MiningCoordinator createMiningCoordinator(
return composedCoordinator;
}

@Override
protected EthProtocolManager createEthProtocolManager(
final ProtocolContext protocolContext,
final boolean fastSyncEnabled,
final TransactionPool transactionPool,
final EthProtocolConfiguration ethereumWireProtocolConfiguration,
final EthPeers ethPeers,
final EthContext ethContext,
final EthMessages ethMessages,
final EthScheduler scheduler,
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter) {
return mergeBesuControllerBuilder.createEthProtocolManager(
protocolContext,
fastSyncEnabled,
transactionPool,
ethereumWireProtocolConfiguration,
ethPeers,
ethContext,
ethMessages,
scheduler,
peerValidators,
mergePeerFilter);
}

@Override
protected ProtocolSchedule createProtocolSchedule() {
return new TransitionProtocolSchedule(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.consensus.merge;

import static org.hyperledger.besu.ethereum.eth.manager.EthPeers.CHAIN_HEIGHT;

import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;

import java.math.BigInteger;
import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;

public class TransitionBestPeerComparator implements Comparator<EthPeer>, MergeStateHandler {

private static final AtomicReference<Difficulty> terminalTotalDifficulty =
new AtomicReference<>();

static final BiFunction<EthPeer, Difficulty, BigInteger> distanceFromTTD =
(a, ttd) ->
a.chainState()
.getEstimatedTotalDifficulty()
.getAsBigInteger()
.subtract(ttd.getAsBigInteger())
.abs()
.negate();

public static final Comparator<EthPeer> EXACT_DIFFICULTY =
(a, b) -> {
var ttd = terminalTotalDifficulty.get();
var aDelta = distanceFromTTD.apply(a, ttd);
var bDelta = distanceFromTTD.apply(b, ttd);
return aDelta.compareTo(bDelta);
};

public static final Comparator<EthPeer> BEST_MERGE_CHAIN =
EXACT_DIFFICULTY.thenComparing(CHAIN_HEIGHT);

public TransitionBestPeerComparator(final Difficulty configuredTerminalTotalDifficulty) {
terminalTotalDifficulty.set(configuredTerminalTotalDifficulty);
}

@Override
public void mergeStateChanged(
final boolean isPoS, final Optional<Difficulty> difficultyStoppedAt) {
if (isPoS && difficultyStoppedAt.isPresent()) {
terminalTotalDifficulty.set(difficultyStoppedAt.get());
}
}

@Override
public int compare(final EthPeer o1, final EthPeer o2) {
return BEST_MERGE_CHAIN.compare(o1, o2);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.consensus.merge;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;

import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;

import java.util.Optional;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class TransitionBestPeerComparatorTest {

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
EthPeer a;

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
EthPeer b;

@Test
public void assertDistanceFromTTDPrecedence() {
var comparator = new TransitionBestPeerComparator(Difficulty.of(5000));
when(a.chainState().getEstimatedTotalDifficulty()).thenReturn(Difficulty.of(5002));
when(b.chainState().getEstimatedTotalDifficulty()).thenReturn(Difficulty.of(4995));
// a has less distance from TTD:
assertThat(comparator.compare(a, b)).isEqualTo(1);
when(b.chainState().getEstimatedTotalDifficulty()).thenReturn(Difficulty.of(5001));
// b has less distance from TTD:
assertThat(comparator.compare(a, b)).isEqualTo(-1);
when(b.chainState().getEstimatedTotalDifficulty()).thenReturn(Difficulty.of(5002));
// a and b are equi-distant
assertThat(comparator.compare(a, b)).isEqualTo(0);
}

@Test
public void assertHandlesNewTTD() {
var comparator = new TransitionBestPeerComparator(Difficulty.of(5000));
when(a.chainState().getEstimatedTotalDifficulty()).thenReturn(Difficulty.of(5002));
when(b.chainState().getEstimatedTotalDifficulty()).thenReturn(Difficulty.of(4999));
assertThat(comparator.compare(a, b)).isEqualTo(-1);

// update TTD with actual value
comparator.mergeStateChanged(true, Optional.of(Difficulty.of(5002)));
assertThat(comparator.compare(a, b)).isEqualTo(1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,19 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EthPeers {
private static final Logger LOG = LoggerFactory.getLogger(EthPeers.class);
public static final Comparator<EthPeer> TOTAL_DIFFICULTY =
Comparator.comparing(((final EthPeer p) -> p.chainState().getEstimatedTotalDifficulty()));

public static final Comparator<EthPeer> CHAIN_HEIGHT =
Comparator.comparing(((final EthPeer p) -> p.chainState().getEstimatedHeight()));

public static final Comparator<EthPeer> BEST_CHAIN = TOTAL_DIFFICULTY.thenComparing(CHAIN_HEIGHT);
public static final Comparator<EthPeer> HEAVIEST_CHAIN =
TOTAL_DIFFICULTY.thenComparing(CHAIN_HEIGHT);

public static final Comparator<EthPeer> LEAST_TO_MOST_BUSY =
Comparator.comparing(EthPeer::outstandingRequests)
Expand All @@ -59,6 +64,8 @@ public class EthPeers {
private final Subscribers<DisconnectCallback> disconnectCallbacks = Subscribers.create();
private final Collection<PendingPeerRequest> pendingRequests = new CopyOnWriteArrayList<>();

private Comparator<EthPeer> bestPeerComparator;

public EthPeers(
final String protocolName,
final Clock clock,
Expand All @@ -80,6 +87,7 @@ public EthPeers(
this.permissioningProviders = permissioningProviders;
this.maxPeers = maxPeers;
this.maxMessageSize = maxMessageSize;
this.bestPeerComparator = HEAVIEST_CHAIN;
metricsSystem.createIntegerGauge(
BesuMetricCategory.PEERS,
"pending_peer_requests_current",
Expand Down Expand Up @@ -186,11 +194,11 @@ public Stream<EthPeer> streamAvailablePeers() {
}

public Stream<EthPeer> streamBestPeers() {
return streamAvailablePeers().sorted(BEST_CHAIN.reversed());
return streamAvailablePeers().sorted(getBestChainComparator().reversed());
}

public Optional<EthPeer> bestPeer() {
return streamAvailablePeers().max(BEST_CHAIN);
return streamAvailablePeers().max(getBestChainComparator());
}

public Optional<EthPeer> bestPeerWithHeightEstimate() {
Expand All @@ -199,7 +207,16 @@ public Optional<EthPeer> bestPeerWithHeightEstimate() {
}

public Optional<EthPeer> bestPeerMatchingCriteria(final Predicate<EthPeer> matchesCriteria) {
return streamAvailablePeers().filter(matchesCriteria).max(BEST_CHAIN);
return streamAvailablePeers().filter(matchesCriteria).max(getBestChainComparator());
}

public void setBestChainComparator(final Comparator<EthPeer> comparator) {
LOG.info("Updating the default best peer comparator");
bestPeerComparator = comparator;
}

public Comparator<EthPeer> getBestChainComparator() {
return bestPeerComparator;
}

@FunctionalInterface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,13 @@ private CompletableFuture<Void> handleSyncResult(final FastSyncState result) {
result.getPivotBlockNumber().getAsLong());
pivotBlockSelector.close();
syncState.markInitialSyncPhaseAsDone();
return terminationCondition.shouldContinueDownload()
? startFullSync()
: CompletableFuture.completedFuture(null);

if (terminationCondition.shouldContinueDownload()) {
return startFullSync();
} else {
syncState.setReachedTerminalDifficulty(true);
return CompletableFuture.completedFuture(null);
}
}

private CompletableFuture<Void> startFullSync() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ public BlockValidator getBlockValidatorForBlock(final Block block) {
}

public boolean isReady() {
LOG.debug(
"checking if BWS is ready: ttd reached {}, initial sync done {}",
syncState.hasReachedTerminalDifficulty().orElse(Boolean.FALSE),
syncState.isInitialSyncPhaseDone());
return syncState.hasReachedTerminalDifficulty().orElse(Boolean.FALSE)
&& syncState.isInitialSyncPhaseDone();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ private long countPeersThatCanDeterminePivotBlock() {
}

private boolean canPeerDeterminePivotBlock(final EthPeer peer) {
LOG.debug(
"peer {} hasEstimatedHeight {} isFullyValidated? {}",
peer.getShortNodeId(),
peer.chainState().hasEstimatedHeight(),
peer.isFullyValidated());
return peer.chainState().hasEstimatedHeight() && peer.isFullyValidated();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@ protected CompletableFuture<Optional<EthPeer>> selectBestAvailableSyncTarget() {
final EthPeer bestPeer = maybeBestPeer.get();
if (bestPeer.chainState().getEstimatedHeight() < pivotBlockHeader.getNumber()) {
LOG.info(
"No sync target with sufficient chain height, waiting for peers: {}",
ethContext.getEthPeers().peerCount());
"Best peer {} has chain height {} below pivotBlock height {}",
maybeBestPeer.map(EthPeer::getShortNodeId).orElse("none"),
maybeBestPeer.map(p -> p.chainState().getEstimatedHeight()).orElse(-1L),
pivotBlockHeader.getNumber());
return completedFuture(Optional.empty());
} else {
return confirmPivotBlockHeader(bestPeer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public boolean shouldSwitchSyncTarget(final EthPeer currentSyncTarget) {
return maybeBestPeer
.map(
bestPeer -> {
if (EthPeers.BEST_CHAIN.compare(bestPeer, currentSyncTarget) <= 0) {
if (ethPeers.getBestChainComparator().compare(bestPeer, currentSyncTarget) <= 0) {
// Our current target is better or equal to the best peer
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ public void comparesPeersWithHeightAndTd() {
assertThat(EthPeers.CHAIN_HEIGHT.compare(peerA, peerB)).isGreaterThan(0);
assertThat(EthPeers.TOTAL_DIFFICULTY.compare(peerA, peerB)).isLessThan(0);

assertThat(EthPeers.BEST_CHAIN.compare(peerA, peerB)).isLessThan(0);
assertThat(EthPeers.BEST_CHAIN.compare(peerB, peerA)).isGreaterThan(0);
assertThat(EthPeers.BEST_CHAIN.compare(peerA, peerA)).isEqualTo(0);
assertThat(EthPeers.BEST_CHAIN.compare(peerB, peerB)).isEqualTo(0);
assertThat(EthPeers.HEAVIEST_CHAIN.compare(peerA, peerB)).isLessThan(0);
assertThat(EthPeers.HEAVIEST_CHAIN.compare(peerB, peerA)).isGreaterThan(0);
assertThat(EthPeers.HEAVIEST_CHAIN.compare(peerA, peerA)).isEqualTo(0);
assertThat(EthPeers.HEAVIEST_CHAIN.compare(peerB, peerB)).isEqualTo(0);

assertThat(ethProtocolManager.ethContext().getEthPeers().bestPeer()).contains(peerB);
assertThat(ethProtocolManager.ethContext().getEthPeers().bestPeerWithHeightEstimate())
Expand All @@ -97,10 +97,10 @@ public void comparesPeersWithTdAndNoHeight() {
assertThat(EthPeers.CHAIN_HEIGHT.compare(peerA, peerB)).isEqualTo(0);
assertThat(EthPeers.TOTAL_DIFFICULTY.compare(peerA, peerB)).isGreaterThan(0);

assertThat(EthPeers.BEST_CHAIN.compare(peerA, peerB)).isGreaterThan(0);
assertThat(EthPeers.BEST_CHAIN.compare(peerB, peerA)).isLessThan(0);
assertThat(EthPeers.BEST_CHAIN.compare(peerA, peerA)).isEqualTo(0);
assertThat(EthPeers.BEST_CHAIN.compare(peerB, peerB)).isEqualTo(0);
assertThat(EthPeers.HEAVIEST_CHAIN.compare(peerA, peerB)).isGreaterThan(0);
assertThat(EthPeers.HEAVIEST_CHAIN.compare(peerB, peerA)).isLessThan(0);
assertThat(EthPeers.HEAVIEST_CHAIN.compare(peerA, peerA)).isEqualTo(0);
assertThat(EthPeers.HEAVIEST_CHAIN.compare(peerB, peerB)).isEqualTo(0);

assertThat(ethProtocolManager.ethContext().getEthPeers().bestPeer()).contains(peerA);
assertThat(ethProtocolManager.ethContext().getEthPeers().bestPeerWithHeightEstimate())
Expand Down
Loading

0 comments on commit e48b73b

Please sign in to comment.