Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

post-merge sync and peering fix #4116

Merged
merged 4 commits into from
Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.hyperledger.besu.consensus.merge.MergeContext;
import org.hyperledger.besu.consensus.merge.MergeProtocolSchedule;
import org.hyperledger.besu.consensus.merge.PostMergeContext;
import org.hyperledger.besu.consensus.merge.TransitionBestPeerComparator;
import org.hyperledger.besu.consensus.merge.blockcreation.MergeCoordinator;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.ProtocolContext;
Expand Down Expand Up @@ -91,13 +92,22 @@ protected EthProtocolManager createEthProtocolManager(
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter) {

var mergeContext = protocolContext.getConsensusContext(MergeContext.class);

var mergeBestPeerComparator =
new TransitionBestPeerComparator(
configOptionsSupplier
.get()
.getTerminalTotalDifficulty()
.map(Difficulty::of)
.orElseThrow());
ethPeers.setBestChainComparator(mergeBestPeerComparator);
mergeContext.observeNewIsPostMergeState(mergeBestPeerComparator);

if (mergePeerFilter.isPresent()) {
protocolContext
.getConsensusContext(MergeContext.class)
.observeNewIsPostMergeState(mergePeerFilter.get());
protocolContext
.getConsensusContext(MergeContext.class)
.addNewForkchoiceMessageListener(mergePeerFilter.get());

mergeContext.observeNewIsPostMergeState(mergePeerFilter.get());
mergeContext.addNewForkchoiceMessageListener(mergePeerFilter.get());
}

EthProtocolManager ethProtocolManager =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.PrivacyParameters;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthMessages;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncContext;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
Expand Down Expand Up @@ -119,6 +125,31 @@ protected MiningCoordinator createMiningCoordinator(
return composedCoordinator;
}

@Override
protected EthProtocolManager createEthProtocolManager(
final ProtocolContext protocolContext,
final boolean fastSyncEnabled,
final TransactionPool transactionPool,
final EthProtocolConfiguration ethereumWireProtocolConfiguration,
final EthPeers ethPeers,
final EthContext ethContext,
final EthMessages ethMessages,
final EthScheduler scheduler,
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter) {
return mergeBesuControllerBuilder.createEthProtocolManager(
protocolContext,
fastSyncEnabled,
transactionPool,
ethereumWireProtocolConfiguration,
ethPeers,
ethContext,
ethMessages,
scheduler,
peerValidators,
mergePeerFilter);
}

@Override
protected ProtocolSchedule createProtocolSchedule() {
return new TransitionProtocolSchedule(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.consensus.merge;

import static org.hyperledger.besu.ethereum.eth.manager.EthPeers.CHAIN_HEIGHT;

import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;

import java.math.BigInteger;
import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;

public class TransitionBestPeerComparator implements Comparator<EthPeer>, MergeStateHandler {

private static final AtomicReference<Difficulty> terminalTotalDifficulty =
new AtomicReference<>();

static final BiFunction<EthPeer, Difficulty, BigInteger> distanceFromTTD =
(a, ttd) ->
a.chainState()
.getEstimatedTotalDifficulty()
.getAsBigInteger()
.subtract(ttd.getAsBigInteger())
.abs()
.negate();

public static final Comparator<EthPeer> EXACT_DIFFICULTY =
(a, b) -> {
var ttd = terminalTotalDifficulty.get();
var aDelta = distanceFromTTD.apply(a, ttd);
var bDelta = distanceFromTTD.apply(b, ttd);
return aDelta.compareTo(bDelta);
};

public static final Comparator<EthPeer> BEST_MERGE_CHAIN =
EXACT_DIFFICULTY.thenComparing(CHAIN_HEIGHT);

public TransitionBestPeerComparator(final Difficulty configuredTerminalTotalDifficulty) {
terminalTotalDifficulty.set(configuredTerminalTotalDifficulty);
}

@Override
public void mergeStateChanged(
final boolean isPoS, final Optional<Difficulty> difficultyStoppedAt) {
if (isPoS && difficultyStoppedAt.isPresent()) {
terminalTotalDifficulty.set(difficultyStoppedAt.get());
}
}

@Override
public int compare(final EthPeer o1, final EthPeer o2) {
return BEST_MERGE_CHAIN.compare(o1, o2);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.consensus.merge;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;

import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;

import java.util.Optional;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class TransitionBestPeerComparatorTest {

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
EthPeer a;

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
EthPeer b;

@Test
public void assertDistanceFromTTDPrecedence() {
var comparator = new TransitionBestPeerComparator(Difficulty.of(5000));
when(a.chainState().getEstimatedTotalDifficulty()).thenReturn(Difficulty.of(5002));
when(b.chainState().getEstimatedTotalDifficulty()).thenReturn(Difficulty.of(4995));
// a has less distance from TTD:
assertThat(comparator.compare(a, b)).isEqualTo(1);
when(b.chainState().getEstimatedTotalDifficulty()).thenReturn(Difficulty.of(5001));
// b has less distance from TTD:
assertThat(comparator.compare(a, b)).isEqualTo(-1);
when(b.chainState().getEstimatedTotalDifficulty()).thenReturn(Difficulty.of(5002));
// a and b are equi-distant
assertThat(comparator.compare(a, b)).isEqualTo(0);
}

@Test
public void assertHandlesNewTTD() {
var comparator = new TransitionBestPeerComparator(Difficulty.of(5000));
when(a.chainState().getEstimatedTotalDifficulty()).thenReturn(Difficulty.of(5002));
when(b.chainState().getEstimatedTotalDifficulty()).thenReturn(Difficulty.of(4999));
assertThat(comparator.compare(a, b)).isEqualTo(-1);

// update TTD with actual value
comparator.mergeStateChanged(true, Optional.of(Difficulty.of(5002)));
assertThat(comparator.compare(a, b)).isEqualTo(1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,19 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EthPeers {
private static final Logger LOG = LoggerFactory.getLogger(EthPeers.class);
public static final Comparator<EthPeer> TOTAL_DIFFICULTY =
Comparator.comparing(((final EthPeer p) -> p.chainState().getEstimatedTotalDifficulty()));

public static final Comparator<EthPeer> CHAIN_HEIGHT =
Comparator.comparing(((final EthPeer p) -> p.chainState().getEstimatedHeight()));

public static final Comparator<EthPeer> BEST_CHAIN = TOTAL_DIFFICULTY.thenComparing(CHAIN_HEIGHT);
public static final Comparator<EthPeer> HEAVIEST_CHAIN =
TOTAL_DIFFICULTY.thenComparing(CHAIN_HEIGHT);

public static final Comparator<EthPeer> LEAST_TO_MOST_BUSY =
Comparator.comparing(EthPeer::outstandingRequests)
Expand All @@ -59,6 +64,8 @@ public class EthPeers {
private final Subscribers<DisconnectCallback> disconnectCallbacks = Subscribers.create();
private final Collection<PendingPeerRequest> pendingRequests = new CopyOnWriteArrayList<>();

private Comparator<EthPeer> bestPeerComparator;

public EthPeers(
final String protocolName,
final Clock clock,
Expand All @@ -80,6 +87,7 @@ public EthPeers(
this.permissioningProviders = permissioningProviders;
this.maxPeers = maxPeers;
this.maxMessageSize = maxMessageSize;
this.bestPeerComparator = HEAVIEST_CHAIN;
metricsSystem.createIntegerGauge(
BesuMetricCategory.PEERS,
"pending_peer_requests_current",
Expand Down Expand Up @@ -186,11 +194,11 @@ public Stream<EthPeer> streamAvailablePeers() {
}

public Stream<EthPeer> streamBestPeers() {
return streamAvailablePeers().sorted(BEST_CHAIN.reversed());
return streamAvailablePeers().sorted(getBestChainComparator().reversed());
}

public Optional<EthPeer> bestPeer() {
return streamAvailablePeers().max(BEST_CHAIN);
return streamAvailablePeers().max(getBestChainComparator());
}

public Optional<EthPeer> bestPeerWithHeightEstimate() {
Expand All @@ -199,7 +207,16 @@ public Optional<EthPeer> bestPeerWithHeightEstimate() {
}

public Optional<EthPeer> bestPeerMatchingCriteria(final Predicate<EthPeer> matchesCriteria) {
return streamAvailablePeers().filter(matchesCriteria).max(BEST_CHAIN);
return streamAvailablePeers().filter(matchesCriteria).max(getBestChainComparator());
}

public void setBestChainComparator(final Comparator<EthPeer> comparator) {
LOG.info("Updating the default best peer comparator");
bestPeerComparator = comparator;
}

public Comparator<EthPeer> getBestChainComparator() {
return bestPeerComparator;
}

@FunctionalInterface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,13 @@ private CompletableFuture<Void> handleSyncResult(final FastSyncState result) {
result.getPivotBlockNumber().getAsLong());
pivotBlockSelector.close();
syncState.markInitialSyncPhaseAsDone();
return terminationCondition.shouldContinueDownload()
? startFullSync()
: CompletableFuture.completedFuture(null);

if (terminationCondition.shouldContinueDownload()) {
return startFullSync();
} else {
syncState.setReachedTerminalDifficulty(true);
return CompletableFuture.completedFuture(null);
}
}

private CompletableFuture<Void> startFullSync() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ public BlockValidator getBlockValidatorForBlock(final Block block) {
}

public boolean isReady() {
LOG.debug(
"checking if BWS is ready: ttd reached {}, initial sync done {}",
syncState.hasReachedTerminalDifficulty().orElse(Boolean.FALSE),
syncState.isInitialSyncPhaseDone());
return syncState.hasReachedTerminalDifficulty().orElse(Boolean.FALSE)
&& syncState.isInitialSyncPhaseDone();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ private long countPeersThatCanDeterminePivotBlock() {
}

private boolean canPeerDeterminePivotBlock(final EthPeer peer) {
LOG.debug(
"peer {} hasEstimatedHeight {} isFullyValidated? {}",
peer.getShortNodeId(),
peer.chainState().hasEstimatedHeight(),
peer.isFullyValidated());
return peer.chainState().hasEstimatedHeight() && peer.isFullyValidated();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@ protected CompletableFuture<Optional<EthPeer>> selectBestAvailableSyncTarget() {
final EthPeer bestPeer = maybeBestPeer.get();
if (bestPeer.chainState().getEstimatedHeight() < pivotBlockHeader.getNumber()) {
LOG.info(
"No sync target with sufficient chain height, waiting for peers: {}",
ethContext.getEthPeers().peerCount());
"Best peer {} has chain height {} below pivotBlock height {}",
maybeBestPeer.map(EthPeer::getShortNodeId).orElse("none"),
maybeBestPeer.map(p -> p.chainState().getEstimatedHeight()).orElse(-1L),
pivotBlockHeader.getNumber());
return completedFuture(Optional.empty());
} else {
return confirmPivotBlockHeader(bestPeer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public boolean shouldSwitchSyncTarget(final EthPeer currentSyncTarget) {
return maybeBestPeer
.map(
bestPeer -> {
if (EthPeers.BEST_CHAIN.compare(bestPeer, currentSyncTarget) <= 0) {
if (ethPeers.getBestChainComparator().compare(bestPeer, currentSyncTarget) <= 0) {
// Our current target is better or equal to the best peer
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ public void comparesPeersWithHeightAndTd() {
assertThat(EthPeers.CHAIN_HEIGHT.compare(peerA, peerB)).isGreaterThan(0);
assertThat(EthPeers.TOTAL_DIFFICULTY.compare(peerA, peerB)).isLessThan(0);

assertThat(EthPeers.BEST_CHAIN.compare(peerA, peerB)).isLessThan(0);
assertThat(EthPeers.BEST_CHAIN.compare(peerB, peerA)).isGreaterThan(0);
assertThat(EthPeers.BEST_CHAIN.compare(peerA, peerA)).isEqualTo(0);
assertThat(EthPeers.BEST_CHAIN.compare(peerB, peerB)).isEqualTo(0);
assertThat(EthPeers.HEAVIEST_CHAIN.compare(peerA, peerB)).isLessThan(0);
assertThat(EthPeers.HEAVIEST_CHAIN.compare(peerB, peerA)).isGreaterThan(0);
assertThat(EthPeers.HEAVIEST_CHAIN.compare(peerA, peerA)).isEqualTo(0);
assertThat(EthPeers.HEAVIEST_CHAIN.compare(peerB, peerB)).isEqualTo(0);

assertThat(ethProtocolManager.ethContext().getEthPeers().bestPeer()).contains(peerB);
assertThat(ethProtocolManager.ethContext().getEthPeers().bestPeerWithHeightEstimate())
Expand All @@ -97,10 +97,10 @@ public void comparesPeersWithTdAndNoHeight() {
assertThat(EthPeers.CHAIN_HEIGHT.compare(peerA, peerB)).isEqualTo(0);
assertThat(EthPeers.TOTAL_DIFFICULTY.compare(peerA, peerB)).isGreaterThan(0);

assertThat(EthPeers.BEST_CHAIN.compare(peerA, peerB)).isGreaterThan(0);
assertThat(EthPeers.BEST_CHAIN.compare(peerB, peerA)).isLessThan(0);
assertThat(EthPeers.BEST_CHAIN.compare(peerA, peerA)).isEqualTo(0);
assertThat(EthPeers.BEST_CHAIN.compare(peerB, peerB)).isEqualTo(0);
assertThat(EthPeers.HEAVIEST_CHAIN.compare(peerA, peerB)).isGreaterThan(0);
assertThat(EthPeers.HEAVIEST_CHAIN.compare(peerB, peerA)).isLessThan(0);
assertThat(EthPeers.HEAVIEST_CHAIN.compare(peerA, peerA)).isEqualTo(0);
assertThat(EthPeers.HEAVIEST_CHAIN.compare(peerB, peerB)).isEqualTo(0);

assertThat(ethProtocolManager.ethContext().getEthPeers().bestPeer()).contains(peerA);
assertThat(ethProtocolManager.ethContext().getEthPeers().bestPeerWithHeightEstimate())
Expand Down
Loading