Skip to content

Commit

Permalink
Reduce state access for TxPool (#5433)
Browse files Browse the repository at this point in the history
* Reduce state access for TxPool

* DeployedCodeFilter doesn't need to look up account

* Only resnapshot on change

* Change count tests to Asserts
  • Loading branch information
benaadams committed Mar 16, 2023
1 parent b004087 commit 3c8c453
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 40 deletions.
10 changes: 7 additions & 3 deletions src/Nethermind/Nethermind.TxPool/Collections/SortedPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Runtime.CompilerServices;
Expand All @@ -26,9 +27,9 @@ public abstract partial class SortedPool<TKey, TValue, TGroupKey>
private readonly IComparer<TValue> _groupComparer;

// group buckets, keep the items grouped by group key and sorted in group
private readonly IDictionary<TGroupKey, EnhancedSortedSet<TValue>> _buckets;
protected readonly Dictionary<TGroupKey, EnhancedSortedSet<TValue>> _buckets;

private readonly IDictionary<TKey, TValue> _cacheMap;
private readonly Dictionary<TKey, TValue> _cacheMap;
private bool _isFull = false;

// comparer for worst elements in buckets
Expand Down Expand Up @@ -411,7 +412,8 @@ public void UpdatePool(Func<TGroupKey, IReadOnlySortedSet<TValue>, IEnumerable<(
{
foreach ((TGroupKey groupKey, EnhancedSortedSet<TValue> bucket) in _buckets)
{
changingElements(groupKey, bucket);
Debug.Assert(bucket.Count > 0);

UpdateGroup(groupKey, bucket, changingElements);
}
}
Expand All @@ -422,6 +424,8 @@ public void UpdateGroup(TGroupKey groupKey, Func<TGroupKey, IReadOnlySortedSet<T
if (groupKey is null) throw new ArgumentNullException(nameof(groupKey));
if (_buckets.TryGetValue(groupKey, out EnhancedSortedSet<TValue>? bucket))
{
Debug.Assert(bucket.Count > 0);

UpdateGroup(groupKey, bucket, changingElements);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;

using Nethermind.Core;
using Nethermind.Core.Collections;
using Nethermind.Core.Crypto;
using Nethermind.Core.Resettables;
using Nethermind.Int256;
using Nethermind.Logging;
using Nethermind.TxPool.Comparison;

Expand Down Expand Up @@ -62,5 +65,64 @@ protected override void UpdateGroup(Address groupKey, EnhancedSortedSet<Transact
TryRemove(_transactionsToRemove[i].Hash!);
}
}

[MethodImpl(MethodImplOptions.Synchronized)]
public void UpdatePool(IAccountStateProvider accounts, Func<Address, Account, EnhancedSortedSet<Transaction>, IEnumerable<(Transaction Tx, UInt256? changedGasBottleneck)>> changingElements)
{
foreach ((Address address, EnhancedSortedSet<Transaction> bucket) in _buckets)
{
Debug.Assert(bucket.Count > 0);

Account? account = accounts.GetAccount(address);
UpdateGroup(address, account, bucket, changingElements);
}
}

private void UpdateGroup(Address groupKey, Account groupValue, EnhancedSortedSet<Transaction> bucket, Func<Address, Account, EnhancedSortedSet<Transaction>, IEnumerable<(Transaction Tx, UInt256? changedGasBottleneck)>> changingElements)
{
_transactionsToRemove.Clear();
Transaction? lastElement = bucket.Max;

foreach ((Transaction tx, UInt256? changedGasBottleneck) in changingElements(groupKey, groupValue, bucket))
{
if (changedGasBottleneck is null)
{
_transactionsToRemove.Add(tx);
}
else if (Equals(lastElement, tx))
{
bool reAdd = _worstSortedValues.Remove(tx);
tx.GasBottleneck = changedGasBottleneck;
if (reAdd)
{
_worstSortedValues.Add(tx, tx.Hash!);
}

UpdateWorstValue();
}
else
{
tx.GasBottleneck = changedGasBottleneck;
}
}

ReadOnlySpan<Transaction> txs = CollectionsMarshal.AsSpan(_transactionsToRemove);
for (int i = 0; i < txs.Length; i++)
{
TryRemove(txs[i].Hash!);
}
}

[MethodImpl(MethodImplOptions.Synchronized)]
public void UpdateGroup(Address groupKey, Account groupValue, Func<Address, Account, EnhancedSortedSet<Transaction>, IEnumerable<(Transaction Tx, UInt256? changedGasBottleneck)>> changingElements)
{
if (groupKey is null) throw new ArgumentNullException(nameof(groupKey));
if (_buckets.TryGetValue(groupKey, out EnhancedSortedSet<Transaction>? bucket))
{
Debug.Assert(bucket.Count > 0);

UpdateGroup(groupKey, groupValue, bucket, changingElements);
}
}
}
}
10 changes: 5 additions & 5 deletions src/Nethermind/Nethermind.TxPool/Filters/DeployedCodeFilter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ namespace Nethermind.TxPool.Filters
internal sealed class DeployedCodeFilter : IIncomingTxFilter
{
private readonly IChainHeadSpecProvider _specProvider;
private readonly IAccountStateProvider _stateProvider;

public DeployedCodeFilter(IChainHeadSpecProvider specProvider, IAccountStateProvider stateProvider)
public DeployedCodeFilter(IChainHeadSpecProvider specProvider)
{
_specProvider = specProvider;
_stateProvider = stateProvider;
}
public AcceptTxResult Accept(Transaction tx, TxFilteringState state, TxHandlingOptions txHandlingOptions) =>
_stateProvider.IsInvalidContractSender(_specProvider.GetCurrentHeadSpec(), tx.SenderAddress!)
public AcceptTxResult Accept(Transaction tx, TxFilteringState state, TxHandlingOptions txHandlingOptions)
{
return _specProvider.GetCurrentHeadSpec().IsEip3607Enabled && state.SenderAccount.HasCode
? AcceptTxResult.SenderIsContract
: AcceptTxResult.Accepted;
}
}
}
81 changes: 50 additions & 31 deletions src/Nethermind/Nethermind.TxPool/TxPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@

using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Nethermind.Core;
using Nethermind.Core.Caching;
using Nethermind.Core.Collections;
using Nethermind.Core.Crypto;
using Nethermind.Core.Specs;
using Nethermind.Core.Timers;
Expand Down Expand Up @@ -51,12 +51,15 @@ public class TxPool : ITxPool, IDisposable

private readonly Channel<BlockReplacementEventArgs> _headBlocksChannel = Channel.CreateUnbounded<BlockReplacementEventArgs>(new UnboundedChannelOptions() { SingleReader = true, SingleWriter = true });

private readonly Func<Address, Account, EnhancedSortedSet<Transaction>, IEnumerable<(Transaction Tx, UInt256? changedGasBottleneck)>> _updateBucket;

/// <summary>
/// Indexes transactions
/// </summary>
private ulong _txIndex;

private readonly ITimer? _timer;
private Transaction[]? _transactionSnapshot;

/// <summary>
/// This class stores all known pending transactions that can be used for block production
Expand Down Expand Up @@ -118,10 +121,13 @@ public TxPool(
postHashFilters.Add(incomingTxFilter);
}

postHashFilters.Add(new DeployedCodeFilter(_specProvider, _accounts));
postHashFilters.Add(new DeployedCodeFilter(_specProvider));

_postHashFilters = postHashFilters.ToArray();

// Capture closures once rather than per invocation
_updateBucket = UpdateBucket;

int? reportMinutes = txPoolConfig.ReportMinutes;
if (_logger.IsInfo && reportMinutes.HasValue)
{
Expand Down Expand Up @@ -151,6 +157,8 @@ private void OnHeadChange(object? sender, BlockReplacementEventArgs e)
// TODO: I think this is dangerous if many blocks are processed one after another
try
{
// Clear snapshot
_transactionSnapshot = null;
_hashCache.ClearCurrentBlockCache();
_headBlocksChannel.Writer.TryWrite(e);
}
Expand Down Expand Up @@ -208,33 +216,34 @@ private void ReAddReorganisedTransactions(Block? previousBlock)
}
}

private void RemoveProcessedTransactions(IReadOnlyList<Transaction> blockTransactions)
private void RemoveProcessedTransactions(Transaction[] blockTransactions)
{
long transactionsInBlock = blockTransactions.Count;
long discoveredForPendingTxs = 0;
long discoveredForHashCache = 0;
long eip1559Txs = 0;

for (int i = 0; i < transactionsInBlock; i++)
for (int i = 0; i < blockTransactions.Length; i++)
{
Keccak txHash = blockTransactions[i].Hash ?? throw new ArgumentException("Hash was unexpectedly null!");
Transaction transaction = blockTransactions[i];
Keccak txHash = transaction.Hash ?? throw new ArgumentException("Hash was unexpectedly null!");

if (!IsKnown(txHash!))
if (!IsKnown(txHash))
{
discoveredForHashCache++;
}

if (!RemoveIncludedTransaction(blockTransactions[i]))
if (!RemoveIncludedTransaction(transaction))
{
discoveredForPendingTxs++;
}

if (blockTransactions[i].IsEip1559)
if (transaction.IsEip1559)
{
eip1559Txs++;
}
}

long transactionsInBlock = blockTransactions.Length;
if (transactionsInBlock != 0)
{
Metrics.DarkPoolRatioLevel1 = (float)discoveredForHashCache / transactionsInBlock;
Expand All @@ -255,7 +264,7 @@ public void AddPeer(ITxPoolPeer peer)
PeerInfo peerInfo = new(peer);
if (_broadcaster.AddPeer(peerInfo))
{
_broadcaster.BroadcastOnce(peerInfo, _transactions.GetSnapshot());
_broadcaster.BroadcastOnce(peerInfo, _transactionSnapshot ??= _transactions.GetSnapshot());

if (_logger.IsTrace) _logger.Trace($"Added a peer to TX pool: {peer}");
}
Expand Down Expand Up @@ -294,10 +303,18 @@ public AcceptTxResult SubmitTx(Transaction tx, TxHandlingOptions handlingOptions
if (!accepted)
{
Metrics.PendingTransactionsDiscarded++;
return accepted;
}
else
{
accepted = AddCore(tx, state, startBroadcast);
if (accepted)
{
// Clear snapshot
_transactionSnapshot = null;
}
}

return AddCore(tx, startBroadcast);
return accepted;
}

private AcceptTxResult FilterTransactions(Transaction tx, TxHandlingOptions handlingOptions, TxFilteringState state)
Expand Down Expand Up @@ -325,7 +342,7 @@ private AcceptTxResult FilterTransactions(Transaction tx, TxHandlingOptions hand
return AcceptTxResult.Accepted;
}

private AcceptTxResult AddCore(Transaction tx, bool isPersistentBroadcast)
private AcceptTxResult AddCore(Transaction tx, TxFilteringState state, bool isPersistentBroadcast)
{
lock (_locker)
{
Expand All @@ -335,7 +352,7 @@ private AcceptTxResult AddCore(Transaction tx, bool isPersistentBroadcast)
bool inserted = _transactions.TryInsert(tx.Hash!, tx, out Transaction? removed);
if (inserted)
{
_transactions.UpdateGroup(tx.SenderAddress!, UpdateBucketWithAddedTransaction);
_transactions.UpdateGroup(tx.SenderAddress!, state.SenderAccount, UpdateBucketWithAddedTransaction);
Metrics.PendingTransactionsAdded++;
if (tx.IsEip1559) { Metrics.Pending1559TransactionsAdded++; }

Expand Down Expand Up @@ -364,12 +381,11 @@ private AcceptTxResult AddCore(Transaction tx, bool isPersistentBroadcast)
return AcceptTxResult.Accepted;
}

private IEnumerable<(Transaction Tx, Action<Transaction>? Change)> UpdateBucketWithAddedTransaction(
Address address, IReadOnlyCollection<Transaction> transactions)
private IEnumerable<(Transaction Tx, UInt256? changedGasBottleneck)> UpdateBucketWithAddedTransaction(
Address address, Account account, EnhancedSortedSet<Transaction> transactions)
{
if (transactions.Count != 0)
{
Account account = _accounts.GetAccount(address);
UInt256 balance = account.Balance;
long currentNonce = (long)(account.Nonce);

Expand All @@ -380,8 +396,8 @@ private AcceptTxResult AddCore(Transaction tx, bool isPersistentBroadcast)
}
}

private IEnumerable<(Transaction Tx, Action<Transaction>? Change)> UpdateGasBottleneck(
IReadOnlyCollection<Transaction> transactions, long currentNonce, UInt256 balance)
private IEnumerable<(Transaction Tx, UInt256? changedGasBottleneck)> UpdateGasBottleneck(
EnhancedSortedSet<Transaction> transactions, long currentNonce, UInt256 balance)
{
UInt256? previousTxBottleneck = null;
int i = 0;
Expand Down Expand Up @@ -413,7 +429,7 @@ private AcceptTxResult AddCore(Transaction tx, bool isPersistentBroadcast)

if (tx.GasBottleneck != gasBottleneck)
{
yield return (tx, SetGasBottleneckChange(gasBottleneck));
yield return (tx, gasBottleneck);
}

previousTxBottleneck = gasBottleneck;
Expand All @@ -422,30 +438,33 @@ private AcceptTxResult AddCore(Transaction tx, bool isPersistentBroadcast)
}
}

private static Action<Transaction> SetGasBottleneckChange(UInt256 gasBottleneck)
{
return t => t.GasBottleneck = gasBottleneck;
}

private void UpdateBuckets()
{
lock (_locker)
{
// ensure the capacity of the pool
if (_transactions.Count > _txPoolConfig.Size)
if (_logger.IsWarn) _logger.Warn($"TxPool exceeds the config size {_transactions.Count}/{_txPoolConfig.Size}");
_transactions.UpdatePool(UpdateBucket);
_transactions.UpdatePool(_accounts, _updateBucket);
}
}

private IEnumerable<(Transaction Tx, Action<Transaction>? Change)> UpdateBucket(Address address, IReadOnlyCollection<Transaction> transactions)
private IEnumerable<(Transaction Tx, UInt256? changedGasBottleneck)> UpdateBucket(Address address, Account account, EnhancedSortedSet<Transaction> transactions)
{
if (transactions.Count != 0)
{
Account? account = _accounts.GetAccount(address);
UInt256 balance = account.Balance;
long currentNonce = (long)(account.Nonce);
Transaction? tx = transactions.FirstOrDefault(t => t.Nonce == currentNonce);
Transaction? tx = null;
foreach (Transaction txn in transactions)
{
if (txn.Nonce == currentNonce)
{
tx = txn;
break;
}
}

bool shouldBeDumped = false;

if (tx is null)
Expand Down Expand Up @@ -597,12 +616,12 @@ private static void AddNodeInfoEntryForTxPool()

private void TimerOnElapsed(object? sender, EventArgs e)
{
WriteTxnPoolReport(_logger);
WriteTxPoolReport(_logger);

_timer!.Enabled = true;
}

private static void WriteTxnPoolReport(ILogger logger)
private static void WriteTxPoolReport(ILogger logger)
{
if (!logger.IsInfo)
{
Expand Down

0 comments on commit 3c8c453

Please sign in to comment.