Skip to content

Commit

Permalink
Improve eth/66 support (hyperledger#3616)
Browse files Browse the repository at this point in the history
Currently Besu has a limited support for sending NewPooledTransactionHashes messages, and other aspect related to reduce transactions synchronization traffic, described in the Ethereum Wire Protocol version 66.

Specifically:

    Besu only uses NewPooledTransactionHashes for new local transactions, while it could be extended to any transaction added to the transaction pool
    Besu does not limit the sending of the full transaction messages to a small fraction of the connected peers, and sends the new transaction hashes to all the remaining peers

This PR, extends eth/66 support and does some code refactoring, to remove some reduntant code and rename some classes to identify they are related to the NewPooledTransactionHashes message.

The main changes are:

    Do not have a separate tracker for transaction hashes, since for them we can reuse PeerTransactionTracker, that tracks full transactions exchange history and sending queue with a peer. So PeerPendingTransactionTracker has been removed. --tx-pool-hashes-max-size is now deprecated and has no more effect and it will be removed in a future release.
    When a new peer connects, if it support eth/6[56] then we send all the transaction hashes we have in the pool, otherwise we send the full transactions.
    When new transactions are added to the pool, we send full transactions to peers without eth/6[56] support, or to a small fractions of all peers, and then we send only transaction hashes to the remaining peer that support eth/6[56]. Both transactions and transaction hashes are only sent if not already exchanged with that specific peer.


Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
  • Loading branch information
fab-10 authored and garyschulte committed May 2, 2022
1 parent 84e9286 commit 357995e
Show file tree
Hide file tree
Showing 46 changed files with 665 additions and 732 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Expand Up @@ -3,10 +3,14 @@
## 22.1.3

### Breaking Changes
- Remove the experimental flag for bonsai tries CLI options '--data-storage-format' and '--bonsai-maximum-back-layers-to-load' [#3578](https://github.com/hyperledger/besu/pull/3578)
- Remove the experimental flag for bonsai tries CLI options `--data-storage-format` and `--bonsai-maximum-back-layers-to-load` [#3578](https://github.com/hyperledger/besu/pull/3578)

### Deprecations
- `--tx-pool-hashes-max-size` is now deprecated and has no more effect and it will be removed in a future release.

### Additions and Improvements
- Tune transaction synchronization parameter to adapt to mainnet traffic [#3610](https://github.com/hyperledger/besu/pull/3610)
- Improve eth/66 support [#3616](https://github.com/hyperledger/besu/pull/3616)

## 22.1.2

Expand Down
12 changes: 8 additions & 4 deletions besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java
Expand Up @@ -21,6 +21,7 @@
import static org.hyperledger.besu.cli.DefaultCommandValues.getDefaultBesuDataPath;
import static org.hyperledger.besu.cli.config.NetworkName.MAINNET;
import static org.hyperledger.besu.cli.util.CommandLineUtils.DEPENDENCY_WARNING_MSG;
import static org.hyperledger.besu.cli.util.CommandLineUtils.DEPRECATED_AND_USELESS_WARNING_MSG;
import static org.hyperledger.besu.cli.util.CommandLineUtils.DEPRECATION_WARNING_MSG;
import static org.hyperledger.besu.config.experimental.MergeConfigOptions.isMergeEnabled;
import static org.hyperledger.besu.controller.BesuController.DATABASE_PATH;
Expand Down Expand Up @@ -1117,10 +1118,10 @@ static class JsonRPCWebsocketOptionGroup {
names = {"--tx-pool-hashes-max-size"},
paramLabel = MANDATORY_INTEGER_FORMAT_HELP,
description =
"Maximum number of pending transaction hashes that will be kept in the transaction pool (default: ${DEFAULT-VALUE})",
"Deprecated, has not effect. Maximum number of pending transaction hashes that will be kept in the transaction pool",
arity = "1")
private final Integer pooledTransactionHashesSize =
TransactionPoolConfiguration.MAX_PENDING_TRANSACTIONS_HASHES;
@SuppressWarnings("unused")
private final Integer pooledTransactionHashesSize = null; // NOSONAR

@Option(
names = {"--tx-pool-retention-hours"},
Expand Down Expand Up @@ -1826,6 +1827,10 @@ private void issueOptionWarnings() {
"--privacy-onchain-groups-enabled",
"--privacy-flexible-groups-enabled");
}

if (pooledTransactionHashesSize != null) { // NOSONAR
logger.warn(DEPRECATED_AND_USELESS_WARNING_MSG, "--tx-pool-hashes-max-size");
}
}

private void configure() throws Exception {
Expand Down Expand Up @@ -2651,7 +2656,6 @@ private TransactionPoolConfiguration buildTransactionPoolConfiguration() {
return unstableTransactionPoolOptions
.toDomainObject()
.txPoolMaxSize(txPoolMaxSize)
.pooledTransactionHashesSize(pooledTransactionHashesSize)
.pendingTxRetentionPeriod(pendingTxRetentionPeriod)
.priceBump(Percentage.fromInt(priceBump))
.txFeeCap(txFeeCap)
Expand Down
Expand Up @@ -29,6 +29,8 @@ public class CommandLineUtils {
public static final String MULTI_DEPENDENCY_WARNING_MSG =
"{} ignored because none of {} was defined.";
public static final String DEPRECATION_WARNING_MSG = "{} has been deprecated, use {} instead.";
public static final String DEPRECATED_AND_USELESS_WARNING_MSG =
"{} has been deprecated and is now useless, remove it.";

/**
* Check if options are passed that require an option to be true to have any effect and log a
Expand Down
Expand Up @@ -26,6 +26,7 @@
import static org.hyperledger.besu.cli.config.NetworkName.RINKEBY;
import static org.hyperledger.besu.cli.config.NetworkName.ROPSTEN;
import static org.hyperledger.besu.cli.util.CommandLineUtils.DEPENDENCY_WARNING_MSG;
import static org.hyperledger.besu.cli.util.CommandLineUtils.DEPRECATED_AND_USELESS_WARNING_MSG;
import static org.hyperledger.besu.cli.util.CommandLineUtils.DEPRECATION_WARNING_MSG;
import static org.hyperledger.besu.ethereum.api.jsonrpc.RpcApis.ENGINE;
import static org.hyperledger.besu.ethereum.api.jsonrpc.RpcApis.ETH;
Expand Down Expand Up @@ -3884,6 +3885,13 @@ public void onchainPrivacyGroupEnabledOptionIsDeprecated() {
"--privacy-flexible-groups-enabled");
}

@Test
public void txPoolHashesMaxSizeOptionIsDeprecated() {
parseCommand("--tx-pool-hashes-max-size", "1024");

verify(mockLogger).warn(DEPRECATED_AND_USELESS_WARNING_MSG, "--tx-pool-hashes-max-size");
}

@Test
public void flexiblePrivacyGroupEnabledFlagValueIsSet() {
parseCommand(
Expand Down
Expand Up @@ -133,7 +133,6 @@ public void proposerAddressCanBeExtractFromAConstructedBlock() {
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
5,
5,
TestClock.fixed(),
metricsSystem,
blockchain::getChainHeadHeader,
Expand Down Expand Up @@ -169,7 +168,6 @@ public void insertsValidVoteIntoConstructedBlock() {
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
5,
5,
TestClock.fixed(),
metricsSystem,
blockchain::getChainHeadHeader,
Expand Down Expand Up @@ -207,7 +205,6 @@ public void insertsNoVoteWhenAtEpoch() {
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
5,
5,
TestClock.fixed(),
metricsSystem,
blockchain::getChainHeadHeader,
Expand Down
Expand Up @@ -96,7 +96,6 @@ public void extraDataCreatedOnEpochBlocksContainsValidators() {
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
1,
5,
TestClock.fixed(),
metricsSystem,
CliqueMinerExecutorTest::mockBlockHeader,
Expand Down Expand Up @@ -141,7 +140,6 @@ public void extraDataForNonEpochBlocksDoesNotContainValidaors() {
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
1,
5,
TestClock.fixed(),
metricsSystem,
CliqueMinerExecutorTest::mockBlockHeader,
Expand Down Expand Up @@ -186,7 +184,6 @@ public void shouldUseLatestVanityData() {
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
1,
5,
TestClock.fixed(),
metricsSystem,
CliqueMinerExecutorTest::mockBlockHeader,
Expand Down
Expand Up @@ -334,7 +334,6 @@ private static ControllerAndState createControllerAndFinalState(
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
1,
1,
clock,
metricsSystem,
blockChain::getChainHeadHeader,
Expand Down
Expand Up @@ -118,7 +118,6 @@ public BlockHeaderValidator.Builder createBlockHeaderRuleset(
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
1,
5,
TestClock.fixed(),
metricsSystem,
blockchain::getChainHeadHeader,
Expand Down
Expand Up @@ -107,7 +107,6 @@ public void headerProducedPassesValidationRules() {
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
1,
5,
TestClock.fixed(),
metricsSystem,
blockchain::getChainHeadHeader,
Expand Down
Expand Up @@ -439,7 +439,6 @@ private static ControllerAndState createControllerAndFinalState(
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
1,
1,
clock,
metricsSystem,
blockChain::getChainHeadHeader,
Expand Down
Expand Up @@ -49,10 +49,8 @@
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.PeerPendingTransactionTracker;
import org.hyperledger.besu.ethereum.eth.transactions.PeerTransactionTracker;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionBroadcaster;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool.TransactionBatchAddedListener;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.GasPricePendingTransactionsSorter;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
Expand All @@ -75,8 +73,7 @@
@RunWith(MockitoJUnitRunner.class)
public class EthGetFilterChangesIntegrationTest {

@Mock private TransactionBatchAddedListener batchAddedListener;
@Mock private TransactionBatchAddedListener pendingBatchAddedListener;
@Mock private TransactionBroadcaster batchAddedListener;
private MutableBlockchain blockchain;
private final String ETH_METHOD = "eth_getFilterChanges";
private final String JSON_RPC_VERSION = "2.0";
Expand All @@ -86,7 +83,6 @@ public class EthGetFilterChangesIntegrationTest {
private GasPricePendingTransactionsSorter transactions;

private static final int MAX_TRANSACTIONS = 5;
private static final int MAX_HASHES = 5;
private static final KeyPair keyPair = SignatureAlgorithmFactory.getInstance().generateKeyPair();
private final Transaction transaction = createTransaction(1);
private FilterManager filterManager;
Expand All @@ -101,16 +97,12 @@ public void setUp() {
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
MAX_TRANSACTIONS,
MAX_HASHES,
TestClock.fixed(),
metricsSystem,
blockchain::getChainHeadHeader,
TransactionPoolConfiguration.DEFAULT_PRICE_BUMP);
final ProtocolContext protocolContext = executionContext.getProtocolContext();

PeerTransactionTracker peerTransactionTracker = mock(PeerTransactionTracker.class);
PeerPendingTransactionTracker peerPendingTransactionTracker =
mock(PeerPendingTransactionTracker.class);
EthContext ethContext = mock(EthContext.class);
EthPeers ethPeers = mock(EthPeers.class);
when(ethContext.getEthPeers()).thenReturn(ethPeers);
Expand All @@ -121,11 +113,8 @@ public void setUp() {
executionContext.getProtocolSchedule(),
protocolContext,
batchAddedListener,
pendingBatchAddedListener,
syncState,
ethContext,
peerTransactionTracker,
peerPendingTransactionTracker,
new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(),
metricsSystem,
TransactionPoolConfiguration.DEFAULT);
Expand Down
Expand Up @@ -49,10 +49,8 @@
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.PeerPendingTransactionTracker;
import org.hyperledger.besu.ethereum.eth.transactions.PeerTransactionTracker;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionBroadcaster;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool.TransactionBatchAddedListener;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.sorter.BaseFeePendingTransactionsSorter;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
Expand All @@ -75,8 +73,7 @@
@RunWith(MockitoJUnitRunner.class)
public class EthGetFilterChangesIntegrationTest {

@Mock private TransactionBatchAddedListener batchAddedListener;
@Mock private TransactionBatchAddedListener pendingBatchAddedListener;
@Mock private TransactionBroadcaster batchAddedListener;
private MutableBlockchain blockchain;
private final String ETH_METHOD = "eth_getFilterChanges";
private final String JSON_RPC_VERSION = "2.0";
Expand All @@ -86,7 +83,6 @@ public class EthGetFilterChangesIntegrationTest {
private BaseFeePendingTransactionsSorter transactions;

private static final int MAX_TRANSACTIONS = 5;
private static final int MAX_HASHES = 5;
private static final KeyPair keyPair = SignatureAlgorithmFactory.getInstance().generateKeyPair();
private final Transaction transaction = createTransaction(1);
private FilterManager filterManager;
Expand All @@ -101,16 +97,12 @@ public void setUp() {
new BaseFeePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
MAX_TRANSACTIONS,
MAX_HASHES,
TestClock.fixed(),
metricsSystem,
blockchain::getChainHeadHeader,
TransactionPoolConfiguration.DEFAULT_PRICE_BUMP);
final ProtocolContext protocolContext = executionContext.getProtocolContext();

PeerTransactionTracker peerTransactionTracker = mock(PeerTransactionTracker.class);
PeerPendingTransactionTracker peerPendingTransactionTracker =
mock(PeerPendingTransactionTracker.class);
EthContext ethContext = mock(EthContext.class);
EthPeers ethPeers = mock(EthPeers.class);
when(ethContext.getEthPeers()).thenReturn(ethPeers);
Expand All @@ -121,11 +113,8 @@ public void setUp() {
executionContext.getProtocolSchedule(),
protocolContext,
batchAddedListener,
pendingBatchAddedListener,
syncState,
ethContext,
peerTransactionTracker,
peerPendingTransactionTracker,
new MiningParameters.Builder().minTransactionGasPrice(Wei.ZERO).build(),
metricsSystem,
TransactionPoolConfiguration.DEFAULT);
Expand Down
Expand Up @@ -84,7 +84,6 @@ public class BlockTransactionSelectorTest {
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
5,
5,
TestClock.fixed(),
metricsSystem,
BlockTransactionSelectorTest::mockBlockHeader,
Expand Down Expand Up @@ -337,7 +336,6 @@ public void useSingleGasSpaceForAllTransactions() {
new BaseFeePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
5,
5,
TestClock.fixed(),
metricsSystem,
() -> {
Expand Down
Expand Up @@ -96,7 +96,6 @@ public void createMainnetBlock1() throws IOException {
new BaseFeePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
1,
5,
TestClock.fixed(),
metricsSystem,
executionContextTestFixture.getProtocolContext().getBlockchain()::getChainHeadHeader,
Expand Down Expand Up @@ -158,7 +157,6 @@ public void createMainnetBlock1_fixedDifficulty1() {
new BaseFeePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
1,
5,
TestClock.fixed(),
metricsSystem,
executionContextTestFixture.getProtocolContext().getBlockchain()::getChainHeadHeader,
Expand Down Expand Up @@ -215,7 +213,6 @@ public void rewardBeneficiary_zeroReward_skipZeroRewardsFalse() {
new BaseFeePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
1,
5,
TestClock.fixed(),
metricsSystem,
executionContextTestFixture.getProtocolContext().getBlockchain()::getChainHeadHeader,
Expand Down Expand Up @@ -288,7 +285,6 @@ public void rewardBeneficiary_zeroReward_skipZeroRewardsTrue() {
new BaseFeePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
1,
5,
TestClock.fixed(),
metricsSystem,
executionContextTestFixture.getProtocolContext().getBlockchain()::getChainHeadHeader,
Expand Down
Expand Up @@ -45,7 +45,6 @@ public void startingMiningWithoutCoinbaseThrowsException() {
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
1,
5,
TestClock.fixed(),
metricsSystem,
PoWMinerExecutorTest::mockBlockHeader,
Expand Down Expand Up @@ -75,7 +74,6 @@ public void settingCoinbaseToNullThrowsException() {
new GasPricePendingTransactionsSorter(
TransactionPoolConfiguration.DEFAULT_TX_RETENTION_HOURS,
1,
5,
TestClock.fixed(),
metricsSystem,
PoWMinerExecutorTest::mockBlockHeader,
Expand Down
Expand Up @@ -38,9 +38,11 @@

import java.math.BigInteger;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.tuweni.bytes.Bytes;
Expand Down Expand Up @@ -680,6 +682,17 @@ public boolean isGoQuorumPrivateTransaction(final boolean goQuorumCompatibilityM
return GoQuorumPrivateTransactionDetector.isGoQuorumPrivateTransactionV(v.get());
}

/**
* Return the list of transaction hashes extracted from the collection of Transaction passed as
* argument
*
* @param transactions a collection of transactions
* @return the list of transaction hashes
*/
public static List<Hash> toHashList(final Collection<Transaction> transactions) {
return transactions.stream().map(Transaction::getHash).collect(Collectors.toUnmodifiableList());
}

private static Bytes32 computeSenderRecoveryHash(
final TransactionType transactionType,
final long nonce,
Expand Down
Expand Up @@ -515,6 +515,11 @@ public Bytes nodeId() {
return connection.getPeerInfo().getNodeId();
}

public boolean hasSupportForMessage(final int messageCode) {
return getAgreedCapabilities().stream()
.anyMatch(cap -> EthProtocol.get().isValidMessageCode(cap.getVersion(), messageCode));
}

@Override
public String toString() {
return String.format("Peer %s...", nodeId().toString().substring(0, 20));
Expand Down

0 comments on commit 357995e

Please sign in to comment.