Skip to content

Commit

Permalink
Don't use exceptions for flow control during sync (#6425)
Browse files Browse the repository at this point in the history
* Don't use exceptions for flow control during sync

* Fix header caching

* Fix time reporting

* Better progress time
  • Loading branch information
benaadams committed Dec 27, 2023
1 parent 7e6c340 commit 0ad72e9
Show file tree
Hide file tree
Showing 12 changed files with 125 additions and 45 deletions.
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Blockchain/BlockTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1272,7 +1272,7 @@ private ChainLevelInfo UpdateOrCreateLevel(long number, Hash256 hash, BlockInfo
/// <returns></returns>
private bool ShouldCache(long number)
{
return number == 0L || Head is null || number <= Head.Number + 1;
return number == 0L || Head is null || number >= Head.Number - HeaderStore.CacheSize;
}

public ChainLevelInfo? FindLevel(long number)
Expand Down
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.Blockchain/Headers/HeaderStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ namespace Nethermind.Blockchain.Headers;

public class HeaderStore : IHeaderStore
{
// SyncProgressResolver MaxLookupBack is 128, add 16 wiggle room
private const int CacheSize = 128 + 16;
// SyncProgressResolver MaxLookupBack is 256, add 16 wiggle room
public const int CacheSize = 256 + 16;

private readonly IDb _headerDb;
private readonly IDb _blockNumberDb;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ internal void DisplayProgressReport(int pendingRequestsCount, BranchProgress bra
}

if (logger.IsInfo) logger.Info(
$"State Sync {TimeSpan.FromSeconds(SecondsInSync):dd\\.hh\\:mm\\:ss} | {dataSizeInfo} | branches: {branchProgress.Progress:P2} | kB/s: {savedKBytesPerSecond,5:F0} | accounts {SavedAccounts} | nodes {SavedNodesCount} | diagnostics: {pendingRequestsCount}.{AverageTimeInHandler:f2}ms");
$"State Sync {TimeSpan.FromSeconds(SecondsInSync):dd\\.hh\\:mm\\:ss} | {dataSizeInfo} | branches: {branchProgress.Progress:P2} | kB/s: {savedKBytesPerSecond,5:F0} | accounts {SavedAccounts} | nodes {SavedNodesCount} | pending: {pendingRequestsCount,3} | ave: {AverageTimeInHandler:f2}ms");
if (logger.IsDebug && DateTime.UtcNow - LastReportTime.full > TimeSpan.FromSeconds(10))
{
long allChecks = CheckWasInDependencies + CheckWasCached + StateWasThere + StateWasNotThere;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ void AddAgainAllItems()

_data.DisplayProgressReport(_pendingRequests.Count, _branchProgress, _logger);

handleWatch.Stop();
long total = handleWatch.ElapsedMilliseconds + _networkWatch.ElapsedMilliseconds;
if (total != 0)
{
Expand All @@ -326,9 +327,7 @@ void AddAgainAllItems()

Interlocked.Add(ref _handleWatch, handleWatch.ElapsedMilliseconds);
_data.LastDbReads = _data.DbChecks;
_data.AverageTimeInHandler =
(_data.AverageTimeInHandler * (_data.ProcessedRequestsCount - 1) +
_handleWatch) / _data.ProcessedRequestsCount;
_data.AverageTimeInHandler = _handleWatch / (decimal)_data.ProcessedRequestsCount;

Interlocked.Add(ref _data.HandledNodesCount, nonEmptyResponses);
return result;
Expand Down
6 changes: 1 addition & 5 deletions src/Nethermind/Nethermind.Trie/PatriciaTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1077,11 +1077,7 @@ public void Accept(ITreeVisitor visitor, Hash256 rootHash, VisitingOptions? visi
if (!rootHash.Equals(Keccak.EmptyTreeHash))
{
rootRef = RootHash == rootHash ? RootRef : TrieStore.FindCachedOrUnknown(rootHash);
try
{
rootRef!.ResolveNode(TrieStore);
}
catch (TrieException)
if (!rootRef!.TryResolveNode(TrieStore))
{
visitor.VisitMissingNode(rootHash, trieVisitContext);
return;
Expand Down
7 changes: 7 additions & 0 deletions src/Nethermind/Nethermind.Trie/Pruning/ITrieNodeResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,12 @@ public interface ITrieNodeResolver
/// <param name="hash"></param>
/// <returns></returns>
byte[]? LoadRlp(Hash256 hash, ReadFlags flags = ReadFlags.None);

/// <summary>
/// Loads RLP of the node.
/// </summary>
/// <param name="hash"></param>
/// <returns></returns>
byte[]? TryLoadRlp(Hash256 hash, ReadFlags flags = ReadFlags.None);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ public class NullTrieNodeResolver : ITrieNodeResolver

public TrieNode FindCachedOrUnknown(Hash256 hash) => new(NodeType.Unknown, hash);
public byte[]? LoadRlp(Hash256 hash, ReadFlags flags = ReadFlags.None) => null;
public byte[]? TryLoadRlp(Hash256 hash, ReadFlags flags = ReadFlags.None) => null;
}
}
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.Trie/Pruning/NullTrieStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class NullTrieStore : IReadOnlyTrieStore

public TrieNode FindCachedOrUnknown(Hash256 hash) => new(NodeType.Unknown, hash);

public byte[] TryLoadRlp(Hash256 hash, ReadFlags flags = ReadFlags.None) => null;

public byte[] LoadRlp(Hash256 hash, ReadFlags flags = ReadFlags.None) => Array.Empty<byte>();

public bool IsPersisted(in ValueHash256 keccak) => true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public ReadOnlyTrieStore(TrieStore trieStore, IKeyValueStore? readOnlyStore)
public TrieNode FindCachedOrUnknown(Hash256 hash) =>
_trieStore.FindCachedOrUnknown(hash, true);

public byte[]? TryLoadRlp(Hash256 hash, ReadFlags flags) => _trieStore.TryLoadRlp(hash, _readOnlyStore, flags);
public byte[] LoadRlp(Hash256 hash, ReadFlags flags) => _trieStore.LoadRlp(hash, _readOnlyStore, flags);

public bool IsPersisted(in ValueHash256 keccak) => _trieStore.IsPersisted(keccak);
Expand Down
16 changes: 13 additions & 3 deletions src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -322,22 +322,32 @@ public void FinishBlockCommit(TrieType trieType, long blockNumber, TrieNode? roo

public event EventHandler<ReorgBoundaryReached>? ReorgBoundaryReached;

public byte[] LoadRlp(Hash256 keccak, IKeyValueStore? keyValueStore, ReadFlags readFlags = ReadFlags.None)
public byte[]? TryLoadRlp(Hash256 keccak, IKeyValueStore? keyValueStore, ReadFlags readFlags = ReadFlags.None)
{
keyValueStore ??= _keyValueStore;
byte[]? rlp = keyValueStore.Get(keccak.Bytes, readFlags);

if (rlp is not null)
{
Metrics.LoadedFromDbNodesCount++;
}

return rlp;
}

public byte[] LoadRlp(Hash256 keccak, IKeyValueStore? keyValueStore, ReadFlags readFlags = ReadFlags.None)
{
byte[]? rlp = TryLoadRlp(keccak, keyValueStore, readFlags);
if (rlp is null)
{
throw new TrieNodeException($"Node {keccak} is missing from the DB", keccak);
}

Metrics.LoadedFromDbNodesCount++;

return rlp;
}

public virtual byte[] LoadRlp(Hash256 keccak, ReadFlags readFlags = ReadFlags.None) => LoadRlp(keccak, null, readFlags);
public virtual byte[]? TryLoadRlp(Hash256 keccak, ReadFlags readFlags = ReadFlags.None) => TryLoadRlp(keccak, null, readFlags);

public bool IsPersisted(in ValueHash256 keccak)
{
Expand Down
114 changes: 84 additions & 30 deletions src/Nethermind/Nethermind.Trie/TrieNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -304,52 +304,106 @@ public void ResolveNode(ITrieNodeResolver tree, ReadFlags readFlags = ReadFlags.
throw new InvalidAsynchronousStateException($"{nameof(_rlpStream)} is null when {nameof(NodeType)} is {NodeType}");
}

Metrics.TreeNodeRlpDecodings++;
_rlpStream.ReadSequenceLength();
if (!DecodeRlp(bufferPool, out int numberOfItems))
{
throw new TrieNodeException($"Unexpected number of items = {numberOfItems} when decoding a node from RLP ({FullRlp.AsSpan().ToHexString()})", Keccak ?? Nethermind.Core.Crypto.Keccak.Zero);
}
}
catch (RlpException rlpException)
{
throw new TrieNodeException($"Error when decoding node {Keccak}", Keccak ?? Nethermind.Core.Crypto.Keccak.Zero, rlpException);
}
}

/// <summary>
/// Highly optimized
/// </summary>
public bool TryResolveNode(ITrieNodeResolver tree, ReadFlags readFlags = ReadFlags.None, ICappedArrayPool? bufferPool = null)
{
try
{
if (NodeType == NodeType.Unknown)
{
if (FullRlp.IsNull)
{
if (Keccak is null)
{
return false;
}

// micro optimization to prevent searches beyond 3 items for branches (search up to three)
int numberOfItems = _rlpStream.PeekNumberOfItemsRemaining(null, 3);
FullRlp = tree.TryLoadRlp(Keccak, readFlags);
IsPersisted = true;

if (numberOfItems > 2)
if (FullRlp.IsNull)
{
return false;
}
}
}
else
{
NodeType = NodeType.Branch;
return true;
}
else if (numberOfItems == 2)

_rlpStream = FullRlp.AsRlpStream();
if (_rlpStream is null)
{
(byte[] key, bool isLeaf) = HexPrefix.FromBytes(_rlpStream.DecodeByteArraySpan());
throw new InvalidAsynchronousStateException($"{nameof(_rlpStream)} is null when {nameof(NodeType)} is {NodeType}");
}

// a hack to set internally and still verify attempts from the outside
// after the code is ready we should just add proper access control for methods from the outside and inside
bool isDirtyActual = IsDirty;
IsDirty = true;
return DecodeRlp(bufferPool, out _);
}
catch (RlpException)
{
return false;
}
}

if (isLeaf)
{
NodeType = NodeType.Leaf;
Key = key;
private bool DecodeRlp(ICappedArrayPool bufferPool, out int itemsCount)
{
Metrics.TreeNodeRlpDecodings++;
_rlpStream.ReadSequenceLength();

ReadOnlySpan<byte> valueSpan = _rlpStream.DecodeByteArraySpan();
CappedArray<byte> buffer = bufferPool.SafeRentBuffer(valueSpan.Length);
valueSpan.CopyTo(buffer.AsSpan());
Value = buffer;
}
else
{
NodeType = NodeType.Extension;
Key = key;
}
// micro optimization to prevent searches beyond 3 items for branches (search up to three)
int numberOfItems = itemsCount = _rlpStream.PeekNumberOfItemsRemaining(null, 3);

if (numberOfItems > 2)
{
NodeType = NodeType.Branch;
}
else if (numberOfItems == 2)
{
(byte[] key, bool isLeaf) = HexPrefix.FromBytes(_rlpStream.DecodeByteArraySpan());

// a hack to set internally and still verify attempts from the outside
// after the code is ready we should just add proper access control for methods from the outside and inside
bool isDirtyActual = IsDirty;
IsDirty = true;

if (isLeaf)
{
NodeType = NodeType.Leaf;
Key = key;

IsDirty = isDirtyActual;
ReadOnlySpan<byte> valueSpan = _rlpStream.DecodeByteArraySpan();
CappedArray<byte> buffer = bufferPool.SafeRentBuffer(valueSpan.Length);
valueSpan.CopyTo(buffer.AsSpan());
Value = buffer;
}
else
{
throw new TrieNodeException($"Unexpected number of items = {numberOfItems} when decoding a node from RLP ({FullRlp.AsSpan().ToHexString()})", Keccak ?? Nethermind.Core.Crypto.Keccak.Zero);
NodeType = NodeType.Extension;
Key = key;
}

IsDirty = isDirtyActual;
}
catch (RlpException rlpException)
else
{
throw new TrieNodeException($"Error when decoding node {Keccak}", Keccak ?? Nethermind.Core.Crypto.Keccak.Zero, rlpException);
return false;
}

return true;
}

public void ResolveKey(ITrieNodeResolver tree, bool isRoot, ICappedArrayPool? bufferPool = null)
Expand Down
10 changes: 10 additions & 0 deletions src/Nethermind/Nethermind.Trie/TrieNodeResolverWithReadFlags.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ public TrieNode FindCachedOrUnknown(Hash256 hash)
return _baseResolver.FindCachedOrUnknown(hash);
}

public byte[]? TryLoadRlp(Hash256 hash, ReadFlags flags = ReadFlags.None)
{
if (flags != ReadFlags.None)
{
return _baseResolver.TryLoadRlp(hash, flags | _defaultFlags);
}

return _baseResolver.TryLoadRlp(hash, _defaultFlags);
}

public byte[]? LoadRlp(Hash256 hash, ReadFlags flags = ReadFlags.None)
{
if (flags != ReadFlags.None)
Expand Down

0 comments on commit 0ad72e9

Please sign in to comment.