Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exclude Tx Hashes when serving Blocks & apply network back pressure #6636

Merged
merged 12 commits into from
Jan 31, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class UserOperationsMessageSerializer : IZeroInnerMessageSerializer<UserO
public void Serialize(IByteBuffer byteBuffer, UserOperationsMessage message)
{
int length = GetLength(message, out int contentLength);
byteBuffer.EnsureWritable(length, true);
byteBuffer.EnsureWritable(length);
LukaszRozmej marked this conversation as resolved.
Show resolved Hide resolved
NettyRlpStream nettyRlpStream = new(byteBuffer);

nettyRlpStream.StartSequence(contentLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ public void Test_can_insert_get_and_remove_blocks(bool cached)
Block block = Build.A.Block.WithNumber(1).TestObject;
store.Insert(block);

Block? retrieved = store.Get(block.Number, block.Hash!, cached);
Block? retrieved = store.Get(block.Number, block.Hash!, RlpBehaviors.None, cached);
retrieved.Should().BeEquivalentTo(block);

store.Delete(block.Number, block.Hash!);

store.Get(block.Number, block.Hash!, cached).Should().BeNull();
store.Get(block.Number, block.Hash!, RlpBehaviors.None, cached).Should().BeNull();
}

[Test]
Expand All @@ -57,7 +57,7 @@ public void Test_can_get_block_that_was_stored_with_hash(bool cached)
Block block = Build.A.Block.WithNumber(1).TestObject;
db[block.Hash!.Bytes] = (new BlockDecoder()).Encode(block).Bytes;

Block? retrieved = store.Get(block.Number, block.Hash!, cached);
Block? retrieved = store.Get(block.Number, block.Hash!, RlpBehaviors.None, cached);
retrieved.Should().BeEquivalentTo(block);
}

Expand All @@ -83,12 +83,12 @@ public void Test_when_cached_does_not_touch_db_on_next_get()
Block block = Build.A.Block.WithNumber(1).TestObject;
store.Insert(block);

Block? retrieved = store.Get(block.Number, block.Hash!, true);
Block? retrieved = store.Get(block.Number, block.Hash!, RlpBehaviors.None, true);
retrieved.Should().BeEquivalentTo(block);

db.Clear();

retrieved = store.Get(block.Number, block.Hash!, true);
retrieved = store.Get(block.Number, block.Hash!, RlpBehaviors.None, true);
retrieved.Should().BeEquivalentTo(block);
}

Expand Down
6 changes: 5 additions & 1 deletion src/Nethermind/Nethermind.Blockchain/BlockTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1301,7 +1301,11 @@ private bool ShouldCache(long number)
blockNumber ??= _headerStore.GetBlockNumber(blockHash);
if (blockNumber is not null)
{
block = _blockStore.Get(blockNumber.Value, blockHash, shouldCache: false);
block = _blockStore.Get(
blockNumber.Value,
blockHash,
(options & BlockTreeLookupOptions.ExcludeTxHashes) != 0 ? RlpBehaviors.ExcludeHashes : RlpBehaviors.None,
shouldCache: false);
}

if (block is null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ public enum BlockTreeLookupOptions
RequireCanonical = 2,
DoNotCreateLevelIfMissing = 4,
AllowInvalid = 8,
All = 15
ExcludeTxHashes = 16,
All = 31
}

[Flags]
Expand Down
6 changes: 3 additions & 3 deletions src/Nethermind/Nethermind.Blockchain/Blocks/BlockStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ public void Delete(long blockNumber, Hash256 blockHash)
_blockDb.Remove(blockHash.Bytes);
}

public Block? Get(long blockNumber, Hash256 blockHash, bool shouldCache = false)
public Block? Get(long blockNumber, Hash256 blockHash, RlpBehaviors rlpBehaviors = RlpBehaviors.None, bool shouldCache = false)
{
Block? b = _blockDb.Get(blockNumber, blockHash, _blockDecoder, _blockCache, shouldCache);
Block? b = _blockDb.Get(blockNumber, blockHash, _blockDecoder, _blockCache, rlpBehaviors, shouldCache);
if (b is not null) return b;
return _blockDb.Get(blockHash, _blockDecoder, _blockCache, shouldCache);
return _blockDb.Get(blockHash, _blockDecoder, _blockCache, rlpBehaviors, shouldCache);
}

public ReceiptRecoveryBlock? GetReceiptRecoveryBlock(long blockNumber, Hash256 blockHash)
Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Blockchain/Blocks/IBlockStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public interface IBlockStore
{
void Insert(Block block, WriteFlags writeFlags = WriteFlags.None);
void Delete(long blockNumber, Hash256 blockHash);
Block? Get(long blockNumber, Hash256 blockHash, bool shouldCache = true);
Block? Get(long blockNumber, Hash256 blockHash, RlpBehaviors rlpBehaviors = RlpBehaviors.None, bool shouldCache = true);
IEnumerable<Block> GetAll();
ReceiptRecoveryBlock? GetReceiptRecoveryBlock(long blockNumber, Hash256 blockHash);
void Cache(Block block);
Expand Down
28 changes: 28 additions & 0 deletions src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
using System;
using System.Collections.Generic;
using System.Numerics;
using System.Runtime.InteropServices;
using System.Threading;
using Nethermind.Blockchain;
using Nethermind.Blockchain.Receipts;
using Nethermind.Consensus.BeaconBlockRoot;
Expand Down Expand Up @@ -86,6 +88,7 @@ public Block[] Process(Hash256 newBranchStateRoot, List<Block> suggestedBlocks,
{
if (suggestedBlocks.Count == 0) return Array.Empty<Block>();

TxHashCalculator.CalculateInBackground(suggestedBlocks);
BlocksProcessing?.Invoke(this, new BlocksProcessingEventArgs(suggestedBlocks));

/* We need to save the snapshot state root before reorganization in case the new branch has invalid blocks.
Expand Down Expand Up @@ -371,4 +374,29 @@ private void ApplyDaoTransition(Block block)
}
}
}

private class TxHashCalculator(List<Block> suggestedBlocks) : IThreadPoolWorkItem
{
public static void CalculateInBackground(List<Block> suggestedBlocks)
{
// Memory has been reserved on the transactions to delay calculate the hashes
// We calculate the hashes in the background to release that memory
ThreadPool.UnsafeQueueUserWorkItem(new TxHashCalculator(suggestedBlocks), preferLocal: false);
}

void IThreadPoolWorkItem.Execute()
{
// Hashes will be required for PersistentReceiptStorage in UpdateMainChain ForkchoiceUpdatedHandler
// Which occurs after the block has been processed; however the block is stored in cache and picked up
// from there so we can calculate the hashes now for that later use.
foreach (Block block in CollectionsMarshal.AsSpan(suggestedBlocks))
{
foreach (Transaction tx in block.Transactions)
{
// Calculate the hashes to release the memory from the transactionSequence
tx.CalculateHashInternal();
LukaszRozmej marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class AddCapabilityMessageSerializer : IZeroMessageSerializer<AddCapabili
public void Serialize(IByteBuffer byteBuffer, AddCapabilityMessage msg)
{
int totalLength = GetLength(msg, out int contentLength);
byteBuffer.EnsureWritable(totalLength, true);
byteBuffer.EnsureWritable(totalLength);

NettyRlpStream stream = new(byteBuffer);
stream.StartSequence(contentLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class DisconnectMessageSerializer : IZeroMessageSerializer<DisconnectMess
public void Serialize(IByteBuffer byteBuffer, DisconnectMessage msg)
{
int length = GetLength(msg, out int contentLength);
byteBuffer.EnsureWritable(length);
byteBuffer.EnsureWritable(length, force: true);
NettyRlpStream rlpStream = new(byteBuffer);

rlpStream.StartSequence(contentLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class HelloMessageSerializer : IZeroMessageSerializer<HelloMessage>
public void Serialize(IByteBuffer byteBuffer, HelloMessage msg)
{
(int totalLength, int innerLength) length = GetLength(msg);
byteBuffer.EnsureWritable(Rlp.LengthOfSequence(length.totalLength), true);
byteBuffer.EnsureWritable(Rlp.LengthOfSequence(length.totalLength), force: true);
NettyRlpStream stream = new(byteBuffer);
stream.StartSequence(length.totalLength);
stream.Encode(msg.P2PVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ protected static Hash256[] DeserializeHashes(RlpStream rlpStream)
public void Serialize(IByteBuffer byteBuffer, T message)
{
int length = GetLength(message, out int contentLength);
byteBuffer.EnsureWritable(length, true);
byteBuffer.EnsureWritable(length);
RlpStream rlpStream = new NettyRlpStream(byteBuffer);

rlpStream.StartSequence(contentLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class BlockBodiesMessageSerializer : IZeroInnerMessageSerializer<BlockBod
public void Serialize(IByteBuffer byteBuffer, BlockBodiesMessage message)
{
int totalLength = GetLength(message, out int contentLength);
byteBuffer.EnsureWritable(totalLength, true);
byteBuffer.EnsureWritable(totalLength);
NettyRlpStream stream = new(byteBuffer);
stream.StartSequence(contentLength);
foreach (BlockBody? body in message.Bodies.Bodies)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class BlockHeadersMessageSerializer : IZeroInnerMessageSerializer<BlockHe
public void Serialize(IByteBuffer byteBuffer, BlockHeadersMessage message)
{
int length = GetLength(message, out int contentLength);
byteBuffer.EnsureWritable(length, true);
byteBuffer.EnsureWritable(length);
RlpStream rlpStream = new NettyRlpStream(byteBuffer);

rlpStream.StartSequence(contentLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class GetBlockBodiesMessageSerializer : IZeroInnerMessageSerializer<GetBl
public void Serialize(IByteBuffer byteBuffer, GetBlockBodiesMessage message)
{
int length = GetLength(message, out int contentLength);
byteBuffer.EnsureWritable(length, true);
byteBuffer.EnsureWritable(length);
NettyRlpStream nettyRlpStream = new(byteBuffer);

nettyRlpStream.StartSequence(contentLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public static GetBlockHeadersMessage Deserialize(RlpStream rlpStream)
public void Serialize(IByteBuffer byteBuffer, GetBlockHeadersMessage message)
{
int length = GetLength(message, out int contentLength);
byteBuffer.EnsureWritable(length, true);
byteBuffer.EnsureWritable(length);
RlpStream rlpStream = new NettyRlpStream(byteBuffer);

rlpStream.StartSequence(contentLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class NewBlockHashesMessageSerializer : IZeroInnerMessageSerializer<NewBl
public void Serialize(IByteBuffer byteBuffer, NewBlockHashesMessage message)
{
int length = GetLength(message, out int contentLength);
byteBuffer.EnsureWritable(length, true);
byteBuffer.EnsureWritable(length);
NettyRlpStream nettyRlpStream = new(byteBuffer);

nettyRlpStream.StartSequence(contentLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class NewBlockMessageSerializer : IZeroInnerMessageSerializer<NewBlockMes
public void Serialize(IByteBuffer byteBuffer, NewBlockMessage message)
{
int length = GetLength(message, out int contentLength);
byteBuffer.EnsureWritable(length, true);
byteBuffer.EnsureWritable(length);
RlpStream rlpStream = new NettyRlpStream(byteBuffer);

rlpStream.StartSequence(contentLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class TransactionsMessageSerializer : IZeroInnerMessageSerializer<Transac
public void Serialize(IByteBuffer byteBuffer, TransactionsMessage message)
{
int length = GetLength(message, out int contentLength);
byteBuffer.EnsureWritable(length, true);
byteBuffer.EnsureWritable(length);
NettyRlpStream nettyRlpStream = new(byteBuffer);

nettyRlpStream.StartSequence(contentLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class NodeDataMessageSerializer : IZeroInnerMessageSerializer<NodeDataMes
public void Serialize(IByteBuffer byteBuffer, NodeDataMessage message)
{
int length = GetLength(message, out int contentLength);
byteBuffer.EnsureWritable(length, true);
byteBuffer.EnsureWritable(length);
RlpStream rlpStream = new NettyRlpStream(byteBuffer);

rlpStream.StartSequence(contentLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public void Serialize(IByteBuffer byteBuffer, ReceiptsMessage message)
{
int totalLength = GetLength(message, out int contentLength);

byteBuffer.EnsureWritable(totalLength, true);
byteBuffer.EnsureWritable(totalLength);
NettyRlpStream stream = new(byteBuffer);
stream.StartSequence(contentLength);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ protected Eth66MessageSerializer(IZeroInnerMessageSerializer<TEthMessage> ethMes
public void Serialize(IByteBuffer byteBuffer, TEth66Message message)
{
int length = GetLength(message, out int contentLength);
byteBuffer.EnsureWritable(length, true);
byteBuffer.EnsureWritable(length);
RlpStream rlpStream = new NettyRlpStream(byteBuffer);
rlpStream.StartSequence(contentLength);
rlpStream.Encode(message.RequestId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void Serialize(IByteBuffer byteBuffer, NewPooledTransactionHashesMessage6

int totalSize = Rlp.LengthOf(message.Types) + Rlp.LengthOfSequence(sizesLength) + Rlp.LengthOfSequence(hashesLength);

byteBuffer.EnsureWritable(totalSize, true);
byteBuffer.EnsureWritable(totalSize);

RlpStream rlpStream = new NettyRlpStream(byteBuffer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public void Serialize(IByteBuffer byteBuffer, BlockHeadersMessage message)
int totalLength = Rlp.LengthOfSequence(contentLength);

RlpStream rlpStream = new NettyRlpStream(byteBuffer);
byteBuffer.EnsureWritable(totalLength, true);
byteBuffer.EnsureWritable(totalLength);

rlpStream.StartSequence(contentLength);
rlpStream.Encode(message.RequestId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public void Serialize(IByteBuffer byteBuffer, GetBlockHeadersMessage message)
int totalLength = Rlp.LengthOfSequence(contentLength);

RlpStream rlpStream = new NettyRlpStream(byteBuffer);
byteBuffer.EnsureWritable(totalLength, true);
byteBuffer.EnsureWritable(totalLength);

rlpStream.StartSequence(contentLength);
rlpStream.Encode(message.RequestId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class NodeDataMessageSerializer : IZeroInnerMessageSerializer<NodeDataMes
public void Serialize(IByteBuffer byteBuffer, NodeDataMessage message)
{
int length = GetLength(message, out int contentLength);
byteBuffer.EnsureWritable(length, true);
byteBuffer.EnsureWritable(length);
RlpStream rlpStream = new NettyRlpStream(byteBuffer);

rlpStream.StartSequence(contentLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public void Serialize(IByteBuffer byteBuffer, AccountRangeMessage message)
{
(int contentLength, int pwasLength, int proofsLength) = GetLength(message);

byteBuffer.EnsureWritable(Rlp.LengthOfSequence(contentLength), true);
byteBuffer.EnsureWritable(Rlp.LengthOfSequence(contentLength));

NettyRlpStream stream = new(byteBuffer);
stream.StartSequence(contentLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class ByteCodesMessageSerializer : IZeroMessageSerializer<ByteCodesMessag
public void Serialize(IByteBuffer byteBuffer, ByteCodesMessage message)
{
(int contentLength, int codesLength) = GetLength(message);
byteBuffer.EnsureWritable(Rlp.LengthOfSequence(contentLength), true);
byteBuffer.EnsureWritable(Rlp.LengthOfSequence(contentLength));
RlpStream rlpStream = new NettyRlpStream(byteBuffer);

rlpStream.StartSequence(contentLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public void Serialize(IByteBuffer byteBuffer, GetTrieNodesMessage message)
{
(int contentLength, int allPathsLength, int[] pathsLengths) = CalculateLengths(message);

byteBuffer.EnsureWritable(Rlp.LengthOfSequence(contentLength), true);
byteBuffer.EnsureWritable(Rlp.LengthOfSequence(contentLength));
NettyRlpStream stream = new(byteBuffer);

stream.StartSequence(contentLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public abstract class SnapSerializerBase<T> : IZeroInnerMessageSerializer<T> whe
protected NettyRlpStream GetRlpStreamAndStartSequence(IByteBuffer byteBuffer, T msg)
{
int totalLength = GetLength(msg, out int contentLength);
byteBuffer.EnsureWritable(totalLength, true);
byteBuffer.EnsureWritable(totalLength);
NettyRlpStream stream = new(byteBuffer);
stream.StartSequence(contentLength);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public void Serialize(IByteBuffer byteBuffer, StorageRangeMessage message)
{
(int contentLength, int allSlotsLength, int[] accountSlotsLengths, int proofsLength) = CalculateLengths(message);

byteBuffer.EnsureWritable(Rlp.LengthOfSequence(contentLength), true);
byteBuffer.EnsureWritable(Rlp.LengthOfSequence(contentLength));
NettyRlpStream stream = new(byteBuffer);

stream.StartSequence(contentLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public void Serialize(IByteBuffer byteBuffer, TrieNodesMessage message)
{
(int contentLength, int nodesLength) = GetLength(message);

byteBuffer.EnsureWritable(Rlp.LengthOfSequence(contentLength), true);
byteBuffer.EnsureWritable(Rlp.LengthOfSequence(contentLength));

NettyRlpStream rlpStream = new(byteBuffer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public void Serialize(IByteBuffer byteBuffer, BlockWitnessHashesMessage message)

int contentLength = GetLength(message, out int totalLength);

byteBuffer.EnsureWritable(totalLength, true);
byteBuffer.EnsureWritable(totalLength);
nettyRlpStream.StartSequence(contentLength);
nettyRlpStream.Encode(message.RequestId);
if (message.Hashes is null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public void Serialize(IByteBuffer byteBuffer, AckEip8Message msg)
totalLength += Rlp.LengthOf(msg.Nonce);
totalLength += Rlp.LengthOf(msg.Version);

byteBuffer.EnsureWritable(Rlp.LengthOfSequence(totalLength), true);
byteBuffer.EnsureWritable(Rlp.LengthOfSequence(totalLength));
NettyRlpStream stream = new(byteBuffer);
stream.StartSequence(totalLength);
stream.Encode(msg.EphemeralPublicKey.Bytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public void Serialize(IByteBuffer byteBuffer, AuthEip8Message msg)
{
int totalLength = GetLength(msg);
// TODO: Account for the padding
byteBuffer.EnsureWritable(Rlp.LengthOfSequence(totalLength), true);
byteBuffer.EnsureWritable(Rlp.LengthOfSequence(totalLength));
NettyRlpStream stream = new(byteBuffer);
stream.StartSequence(totalLength);
stream.Encode(Bytes.Concat(msg.Signature.Bytes, msg.Signature.RecoveryId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class AuthMessageSerializer : IZeroMessageSerializer<AuthMessage>

public void Serialize(IByteBuffer byteBuffer, AuthMessage msg)
{
byteBuffer.EnsureWritable(Length, true);
byteBuffer.EnsureWritable(Length);
byteBuffer.WriteBytes(msg.Signature.Bytes);
byteBuffer.WriteByte(msg.Signature.RecoveryId);
byteBuffer.WriteBytes(msg.EphemeralPublicHash.Bytes);
Expand Down