Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce state access for TxPool #5433

Merged
merged 4 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions src/Nethermind/Nethermind.TxPool/Collections/SortedPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ public abstract partial class SortedPool<TKey, TValue, TGroupKey>
private readonly IComparer<TValue> _groupComparer;

// group buckets, keep the items grouped by group key and sorted in group
private readonly IDictionary<TGroupKey, EnhancedSortedSet<TValue>> _buckets;
protected readonly Dictionary<TGroupKey, EnhancedSortedSet<TValue>> _buckets;

private readonly IDictionary<TKey, TValue> _cacheMap;
private readonly Dictionary<TKey, TValue> _cacheMap;
private bool _isFull = false;

// comparer for worst elements in buckets
Expand Down Expand Up @@ -411,8 +411,10 @@ public void UpdatePool(Func<TGroupKey, IReadOnlySortedSet<TValue>, IEnumerable<(
{
foreach ((TGroupKey groupKey, EnhancedSortedSet<TValue> bucket) in _buckets)
{
changingElements(groupKey, bucket);
benaadams marked this conversation as resolved.
Show resolved Hide resolved
UpdateGroup(groupKey, bucket, changingElements);
if (bucket.Count > 0)
benaadams marked this conversation as resolved.
Show resolved Hide resolved
{
UpdateGroup(groupKey, bucket, changingElements);
}
}
}

Expand All @@ -422,7 +424,10 @@ public void UpdateGroup(TGroupKey groupKey, Func<TGroupKey, IReadOnlySortedSet<T
if (groupKey is null) throw new ArgumentNullException(nameof(groupKey));
if (_buckets.TryGetValue(groupKey, out EnhancedSortedSet<TValue>? bucket))
{
UpdateGroup(groupKey, bucket, changingElements);
if (bucket.Count > 0)
{
UpdateGroup(groupKey, bucket, changingElements);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;

using Nethermind.Core;
using Nethermind.Core.Collections;
using Nethermind.Core.Crypto;
using Nethermind.Core.Resettables;
using Nethermind.Int256;
using Nethermind.Logging;
using Nethermind.TxPool.Comparison;

Expand Down Expand Up @@ -62,5 +64,66 @@ protected override void UpdateGroup(Address groupKey, EnhancedSortedSet<Transact
TryRemove(_transactionsToRemove[i].Hash!);
}
}

[MethodImpl(MethodImplOptions.Synchronized)]
public void UpdatePool(IAccountStateProvider accounts, Func<Address, Account, EnhancedSortedSet<Transaction>, IEnumerable<(Transaction Tx, UInt256? changedGasBottleneck)>> changingElements)
{
foreach ((Address address, EnhancedSortedSet<Transaction> bucket) in _buckets)
{
if (bucket.Count > 0)
benaadams marked this conversation as resolved.
Show resolved Hide resolved
{
Account? account = accounts.GetAccount(address);
UpdateGroup(address, account, bucket, changingElements);
}
}
}

private void UpdateGroup(Address groupKey, Account groupValue, EnhancedSortedSet<Transaction> bucket, Func<Address, Account, EnhancedSortedSet<Transaction>, IEnumerable<(Transaction Tx, UInt256? changedGasBottleneck)>> changingElements)
{
_transactionsToRemove.Clear();
Transaction? lastElement = bucket.Max;

foreach ((Transaction tx, UInt256? changedGasBottleneck) in changingElements(groupKey, groupValue, bucket))
{
if (changedGasBottleneck is null)
{
_transactionsToRemove.Add(tx);
}
else if (Equals(lastElement, tx))
{
bool reAdd = _worstSortedValues.Remove(tx);
tx.GasBottleneck = changedGasBottleneck;
if (reAdd)
{
_worstSortedValues.Add(tx, tx.Hash!);
}

UpdateWorstValue();
}
else
{
tx.GasBottleneck = changedGasBottleneck;
}
}

ReadOnlySpan<Transaction> txs = CollectionsMarshal.AsSpan(_transactionsToRemove);
for (int i = 0; i < txs.Length; i++)
{
TryRemove(txs[i].Hash!);
}
}

[MethodImpl(MethodImplOptions.Synchronized)]
public void UpdateGroup(Address groupKey, Account groupValue, Func<Address, Account, EnhancedSortedSet<Transaction>, IEnumerable<(Transaction Tx, UInt256? changedGasBottleneck)>> changingElements)
{
if (groupKey is null) throw new ArgumentNullException(nameof(groupKey));
if (_buckets.TryGetValue(groupKey, out EnhancedSortedSet<Transaction>? bucket))
{
if (bucket.Count > 0)
{
UpdateGroup(groupKey, groupValue, bucket, changingElements);
}
}
}
}
}
10 changes: 5 additions & 5 deletions src/Nethermind/Nethermind.TxPool/Filters/DeployedCodeFilter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ namespace Nethermind.TxPool.Filters
internal sealed class DeployedCodeFilter : IIncomingTxFilter
{
private readonly IChainHeadSpecProvider _specProvider;
private readonly IAccountStateProvider _stateProvider;

public DeployedCodeFilter(IChainHeadSpecProvider specProvider, IAccountStateProvider stateProvider)
public DeployedCodeFilter(IChainHeadSpecProvider specProvider)
{
_specProvider = specProvider;
_stateProvider = stateProvider;
}
public AcceptTxResult Accept(Transaction tx, TxFilteringState state, TxHandlingOptions txHandlingOptions) =>
_stateProvider.IsInvalidContractSender(_specProvider.GetCurrentHeadSpec(), tx.SenderAddress!)
public AcceptTxResult Accept(Transaction tx, TxFilteringState state, TxHandlingOptions txHandlingOptions)
{
return _specProvider.GetCurrentHeadSpec().IsEip3607Enabled && state.SenderAccount.HasCode
? AcceptTxResult.SenderIsContract
: AcceptTxResult.Accepted;
}
}
}
81 changes: 50 additions & 31 deletions src/Nethermind/Nethermind.TxPool/TxPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@

using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Nethermind.Core;
using Nethermind.Core.Caching;
using Nethermind.Core.Collections;
using Nethermind.Core.Crypto;
using Nethermind.Core.Specs;
using Nethermind.Core.Timers;
Expand Down Expand Up @@ -51,12 +51,15 @@ public class TxPool : ITxPool, IDisposable

private readonly Channel<BlockReplacementEventArgs> _headBlocksChannel = Channel.CreateUnbounded<BlockReplacementEventArgs>(new UnboundedChannelOptions() { SingleReader = true, SingleWriter = true });

private readonly Func<Address, Account, EnhancedSortedSet<Transaction>, IEnumerable<(Transaction Tx, UInt256? changedGasBottleneck)>> _updateBucket;

/// <summary>
/// Indexes transactions
/// </summary>
private ulong _txIndex;

private readonly ITimer? _timer;
private Transaction[]? _transactionSnapshot;

/// <summary>
/// This class stores all known pending transactions that can be used for block production
Expand Down Expand Up @@ -118,10 +121,13 @@ public TxPool(
postHashFilters.Add(incomingTxFilter);
}

postHashFilters.Add(new DeployedCodeFilter(_specProvider, _accounts));
postHashFilters.Add(new DeployedCodeFilter(_specProvider));

_postHashFilters = postHashFilters.ToArray();

// Capture closures once rather than per invocation
_updateBucket = UpdateBucket;

int? reportMinutes = txPoolConfig.ReportMinutes;
if (_logger.IsInfo && reportMinutes.HasValue)
{
Expand Down Expand Up @@ -151,6 +157,8 @@ private void OnHeadChange(object? sender, BlockReplacementEventArgs e)
// TODO: I think this is dangerous if many blocks are processed one after another
try
{
// Clear snapshot
_transactionSnapshot = null;
_hashCache.ClearCurrentBlockCache();
_headBlocksChannel.Writer.TryWrite(e);
}
Expand Down Expand Up @@ -208,33 +216,34 @@ private void ReAddReorganisedTransactions(Block? previousBlock)
}
}

private void RemoveProcessedTransactions(IReadOnlyList<Transaction> blockTransactions)
private void RemoveProcessedTransactions(Transaction[] blockTransactions)
{
long transactionsInBlock = blockTransactions.Count;
long discoveredForPendingTxs = 0;
long discoveredForHashCache = 0;
long eip1559Txs = 0;

for (int i = 0; i < transactionsInBlock; i++)
for (int i = 0; i < blockTransactions.Length; i++)
{
Keccak txHash = blockTransactions[i].Hash ?? throw new ArgumentException("Hash was unexpectedly null!");
Transaction transaction = blockTransactions[i];
Keccak txHash = transaction.Hash ?? throw new ArgumentException("Hash was unexpectedly null!");

if (!IsKnown(txHash!))
if (!IsKnown(txHash))
{
discoveredForHashCache++;
}

if (!RemoveIncludedTransaction(blockTransactions[i]))
if (!RemoveIncludedTransaction(transaction))
{
discoveredForPendingTxs++;
}

if (blockTransactions[i].IsEip1559)
if (transaction.IsEip1559)
{
eip1559Txs++;
}
}

long transactionsInBlock = blockTransactions.Length;
if (transactionsInBlock != 0)
{
Metrics.DarkPoolRatioLevel1 = (float)discoveredForHashCache / transactionsInBlock;
Expand All @@ -255,7 +264,7 @@ public void AddPeer(ITxPoolPeer peer)
PeerInfo peerInfo = new(peer);
if (_broadcaster.AddPeer(peerInfo))
{
_broadcaster.BroadcastOnce(peerInfo, _transactions.GetSnapshot());
_broadcaster.BroadcastOnce(peerInfo, _transactionSnapshot ??= _transactions.GetSnapshot());

if (_logger.IsTrace) _logger.Trace($"Added a peer to TX pool: {peer}");
}
Expand Down Expand Up @@ -294,10 +303,18 @@ public AcceptTxResult SubmitTx(Transaction tx, TxHandlingOptions handlingOptions
if (!accepted)
{
Metrics.PendingTransactionsDiscarded++;
return accepted;
}
else
{
accepted = AddCore(tx, state, startBroadcast);
if (accepted)
{
// Clear snapshot
_transactionSnapshot = null;
}
}

return AddCore(tx, startBroadcast);
return accepted;
}

private AcceptTxResult FilterTransactions(Transaction tx, TxHandlingOptions handlingOptions, TxFilteringState state)
Expand Down Expand Up @@ -325,7 +342,7 @@ private AcceptTxResult FilterTransactions(Transaction tx, TxHandlingOptions hand
return AcceptTxResult.Accepted;
}

private AcceptTxResult AddCore(Transaction tx, bool isPersistentBroadcast)
private AcceptTxResult AddCore(Transaction tx, TxFilteringState state, bool isPersistentBroadcast)
{
lock (_locker)
{
Expand All @@ -335,7 +352,7 @@ private AcceptTxResult AddCore(Transaction tx, bool isPersistentBroadcast)
bool inserted = _transactions.TryInsert(tx.Hash!, tx, out Transaction? removed);
if (inserted)
{
_transactions.UpdateGroup(tx.SenderAddress!, UpdateBucketWithAddedTransaction);
_transactions.UpdateGroup(tx.SenderAddress!, state.SenderAccount, UpdateBucketWithAddedTransaction);
Metrics.PendingTransactionsAdded++;
if (tx.IsEip1559) { Metrics.Pending1559TransactionsAdded++; }

Expand Down Expand Up @@ -364,12 +381,11 @@ private AcceptTxResult AddCore(Transaction tx, bool isPersistentBroadcast)
return AcceptTxResult.Accepted;
}

private IEnumerable<(Transaction Tx, Action<Transaction>? Change)> UpdateBucketWithAddedTransaction(
Address address, IReadOnlyCollection<Transaction> transactions)
private IEnumerable<(Transaction Tx, UInt256? changedGasBottleneck)> UpdateBucketWithAddedTransaction(
Address address, Account account, EnhancedSortedSet<Transaction> transactions)
{
if (transactions.Count != 0)
{
Account account = _accounts.GetAccount(address);
UInt256 balance = account.Balance;
long currentNonce = (long)(account.Nonce);

Expand All @@ -380,8 +396,8 @@ private AcceptTxResult AddCore(Transaction tx, bool isPersistentBroadcast)
}
}

private IEnumerable<(Transaction Tx, Action<Transaction>? Change)> UpdateGasBottleneck(
IReadOnlyCollection<Transaction> transactions, long currentNonce, UInt256 balance)
private IEnumerable<(Transaction Tx, UInt256? changedGasBottleneck)> UpdateGasBottleneck(
EnhancedSortedSet<Transaction> transactions, long currentNonce, UInt256 balance)
{
UInt256? previousTxBottleneck = null;
int i = 0;
Expand Down Expand Up @@ -413,7 +429,7 @@ private AcceptTxResult AddCore(Transaction tx, bool isPersistentBroadcast)

if (tx.GasBottleneck != gasBottleneck)
{
yield return (tx, SetGasBottleneckChange(gasBottleneck));
yield return (tx, gasBottleneck);
}

previousTxBottleneck = gasBottleneck;
Expand All @@ -422,30 +438,33 @@ private AcceptTxResult AddCore(Transaction tx, bool isPersistentBroadcast)
}
}

private static Action<Transaction> SetGasBottleneckChange(UInt256 gasBottleneck)
{
return t => t.GasBottleneck = gasBottleneck;
}

private void UpdateBuckets()
{
lock (_locker)
{
// ensure the capacity of the pool
if (_transactions.Count > _txPoolConfig.Size)
if (_logger.IsWarn) _logger.Warn($"TxPool exceeds the config size {_transactions.Count}/{_txPoolConfig.Size}");
_transactions.UpdatePool(UpdateBucket);
_transactions.UpdatePool(_accounts, _updateBucket);
}
}

private IEnumerable<(Transaction Tx, Action<Transaction>? Change)> UpdateBucket(Address address, IReadOnlyCollection<Transaction> transactions)
private IEnumerable<(Transaction Tx, UInt256? changedGasBottleneck)> UpdateBucket(Address address, Account account, EnhancedSortedSet<Transaction> transactions)
{
if (transactions.Count != 0)
{
Account? account = _accounts.GetAccount(address);
UInt256 balance = account.Balance;
long currentNonce = (long)(account.Nonce);
Transaction? tx = transactions.FirstOrDefault(t => t.Nonce == currentNonce);
Transaction? tx = null;
foreach (Transaction txn in transactions)
{
if (txn.Nonce == currentNonce)
{
tx = txn;
break;
}
}

bool shouldBeDumped = false;

if (tx is null)
Expand Down Expand Up @@ -597,12 +616,12 @@ private static void AddNodeInfoEntryForTxPool()

private void TimerOnElapsed(object? sender, EventArgs e)
{
WriteTxnPoolReport(_logger);
WriteTxPoolReport(_logger);

_timer!.Enabled = true;
}

private static void WriteTxnPoolReport(ILogger logger)
private static void WriteTxPoolReport(ILogger logger)
{
if (!logger.IsInfo)
{
Expand Down