Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix downloader hangs in some case #4362

Merged
merged 3 commits into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions src/Nethermind/Nethermind.Core/Block.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
// Copyright (c) 2021 Demerzel Solutions Limited
// This file is part of the Nethermind library.
//
//
// The Nethermind library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
//
// The Nethermind library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Nethermind. If not, see <http://www.gnu.org/licenses/>.

Expand Down Expand Up @@ -47,7 +47,7 @@ public Block(BlockHeader blockHeader)
public Block WithReplacedHeader(BlockHeader newHeader) => new(newHeader, Body);

public Block WithReplacedBody(BlockBody newBody) => new(Header, newBody);

public Block WithReplacedBodyCloned(BlockBody newBody) => new(Header.Clone(), newBody);

public BlockHeader Header { get; }
Expand Down Expand Up @@ -97,11 +97,13 @@ public Block(BlockHeader blockHeader)
public UInt256 Difficulty => Header.Difficulty; // do not add setter here

public UInt256? TotalDifficulty => Header.TotalDifficulty; // do not add setter here

public UInt256 BaseFeePerGas => Header.BaseFeePerGas; // do not add setter here

public bool IsPostMerge => Header.IsPostMerge; // do not add setter here

public bool IsBodyMissing => Header.HasBody && Body.IsEmpty;

public override string ToString()
{
return ToString(Format.Short);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ bool HasMoreToSync(out BlockHeader[]? headers, out int headersToRequest)
Block currentBlock = blocks[blockIndex];
if (_logger.IsTrace) _logger.Trace($"Received {currentBlock} from {bestPeer}");

if (currentBlock.IsBodyMissing)
{
throw new EthSyncException($"{bestPeer} didn't send body for block {currentBlock.ToString(Block.Format.Short)}.");
}

// can move this to block tree now?
if (!_blockValidator.ValidateSuggestedBlock(currentBlock))
{
Expand Down
117 changes: 76 additions & 41 deletions src/Nethermind/Nethermind.Synchronization.Test/BlockDownloaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
using Nethermind.Trie.Pruning;
using Nethermind.TxPool;
using NSubstitute;
using NSubstitute.ExceptionExtensions;
using NSubstitute.Exceptions;
using NUnit.Framework;
using BlockTree = Nethermind.Blockchain.BlockTree;
Expand Down Expand Up @@ -245,52 +246,71 @@ public async Task Can_sync_with_peer_when_it_times_out_on_full_batch(int thresho
Assert.AreEqual(Math.Max(0, peerInfo.HeadNumber), ctx.BlockTree.BestSuggestedHeader.Number);
}

[Test]
public async Task Headers_already_known()
[TestCase(32, 32, true)]
[TestCase(32, 16, true)]
[TestCase(500, 250, true)]
[TestCase(32, 32, false)]
[TestCase(32, 16, false)]
[TestCase(500, 250, false)]
public async Task Can_sync_partially_when_only_some_bodies_is_available(int blockCount, int availableBlock, bool mergeDownloader)
{
Context ctx = new();
BlockDownloader downloader = CreateBlockDownloader(ctx);
BlockDownloader downloader = mergeDownloader ? CreateMergeBlockDownloader(ctx) : CreateBlockDownloader(ctx);

ISyncPeer syncPeer = Substitute.For<ISyncPeer>();
syncPeer.GetBlockHeaders(Arg.Any<long>(), Arg.Any<int>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns(ci => ctx.ResponseBuilder.BuildHeaderResponse(ci.ArgAt<long>(0), ci.ArgAt<int>(1), Response.AllCorrect | Response.AllKnown));
.Returns(async ci => await ctx.ResponseBuilder.BuildHeaderResponse(ci.ArgAt<long>(0), ci.ArgAt<int>(1), Response.AllCorrect | Response.WithTransactions & ~Response.AllKnown));

List<Keccak> requestedHashes = new();
syncPeer.GetBlockBodies(Arg.Any<IReadOnlyList<Keccak>>(), Arg.Any<CancellationToken>())
.Returns(ci => ctx.ResponseBuilder.BuildBlocksResponse(ci.ArgAt<IList<Keccak>>(0), Response.AllCorrect | Response.AllKnown));
.Returns(ci =>
{
IList<Keccak> blockHashes = ci.ArgAt<IList<Keccak>>(0);
int toTake = availableBlock - requestedHashes.Count;
blockHashes = blockHashes.Take(toTake).ToList();
requestedHashes.AddRange(blockHashes);

PeerInfo peerInfo = new(syncPeer);
syncPeer.HeadNumber.Returns(64);
if (blockHashes.Count == 0)
{
return Array.Empty<BlockBody>();
}

await downloader.DownloadHeaders(peerInfo, new BlocksRequest(DownloaderOptions.WithBodies, 0), CancellationToken.None)
.ContinueWith(t => Assert.True(t.IsCompletedSuccessfully));
return ctx.ResponseBuilder.BuildBlocksResponse(blockHashes,
Response.AllCorrect | Response.WithTransactions & ~Response.AllKnown).Result;
});

syncPeer.HeadNumber.Returns(128);
await downloader.DownloadBlocks(peerInfo, new BlocksRequest(), CancellationToken.None)
.ContinueWith(t => Assert.True(t.IsCompletedSuccessfully));
syncPeer.TotalDifficulty.Returns(UInt256.MaxValue);
syncPeer.HeadNumber.Returns(blockCount);

PeerInfo peerInfo = new(syncPeer);

ctx.BlockTree.BestSuggestedBody.Number.Should().Be(0);
await downloader.DownloadBlocks(peerInfo, new BlocksRequest(DownloaderOptions.Process), CancellationToken.None).ContinueWith(t => { });
ctx.BlockTree.BestSuggestedBody.Number.Should().Be(availableBlock);
}

[TestCase(33L)]
[TestCase(65L)]
public async Task Peer_sends_just_one_item_when_advertising_more_blocks(long headNumber)
[Test]
public async Task Headers_already_known()
{
Context ctx = new();
BlockDownloader downloader = CreateBlockDownloader(ctx);

ISyncPeer syncPeer = Substitute.For<ISyncPeer>();
syncPeer.GetBlockHeaders(Arg.Any<long>(), Arg.Any<int>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns(ci => ctx.ResponseBuilder.BuildHeaderResponse(ci.ArgAt<long>(0), ci.ArgAt<int>(1), Response.AllCorrect));
.Returns(ci => ctx.ResponseBuilder.BuildHeaderResponse(ci.ArgAt<long>(0), ci.ArgAt<int>(1), Response.AllCorrect | Response.AllKnown));

syncPeer.GetBlockBodies(Arg.Any<IReadOnlyList<Keccak>>(), Arg.Any<CancellationToken>())
.Returns(ci => ctx.ResponseBuilder.BuildBlocksResponse(ci.ArgAt<IList<Keccak>>(0), Response.AllCorrect | Response.JustFirst));
.Returns(ci => ctx.ResponseBuilder.BuildBlocksResponse(ci.ArgAt<IList<Keccak>>(0), Response.AllCorrect | Response.AllKnown));

PeerInfo peerInfo = new(syncPeer);
syncPeer.TotalDifficulty.Returns(UInt256.MaxValue);
syncPeer.HeadNumber.Returns(headNumber);
syncPeer.HeadNumber.Returns(64);

Task task = downloader.DownloadBlocks(peerInfo, new BlocksRequest(), CancellationToken.None);
await task.ContinueWith(t => Assert.True(t.IsFaulted));
await downloader.DownloadHeaders(peerInfo, new BlocksRequest(DownloaderOptions.WithBodies, 0), CancellationToken.None)
.ContinueWith(t => Assert.True(t.IsCompletedSuccessfully));

Assert.AreEqual(0, ctx.BlockTree.BestSuggestedHeader.Number);
syncPeer.HeadNumber.Returns(128);
await downloader.DownloadBlocks(peerInfo, new BlocksRequest(), CancellationToken.None)
.ContinueWith(t => Assert.True(t.IsCompletedSuccessfully));
}

[Test]
Expand Down Expand Up @@ -634,7 +654,7 @@ public async Task Throws_on_block_task_exception()
syncPeer.TotalDifficulty.Returns(UInt256.MaxValue);
Task<BlockHeader[]> buildHeadersResponse = null;
syncPeer.GetBlockHeaders(Arg.Any<long>(), Arg.Any<int>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns(ci => buildHeadersResponse = ctx.ResponseBuilder.BuildHeaderResponse(ci.ArgAt<long>(0), ci.ArgAt<int>(1), Response.AllCorrect));
.Returns(ci => buildHeadersResponse = ctx.ResponseBuilder.BuildHeaderResponse(ci.ArgAt<long>(0), ci.ArgAt<int>(1), Response.AllCorrect | Response.WithTransactions));

syncPeer.GetBlockBodies(Arg.Any<IReadOnlyList<Keccak>>(), Arg.Any<CancellationToken>())
.Returns(Task.FromException<BlockBody[]>(new TimeoutException()));
Expand Down Expand Up @@ -666,7 +686,7 @@ public async Task Throws_on_receipt_task_exception_when_downloading_receipts(int

Task<BlockHeader[]> buildHeadersResponse = null;
syncPeer.GetBlockHeaders(Arg.Any<long>(), Arg.Any<int>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns(ci => buildHeadersResponse = ctx.ResponseBuilder.BuildHeaderResponse(ci.ArgAt<long>(0), ci.ArgAt<int>(1), Response.AllCorrect));
.Returns(ci => buildHeadersResponse = ctx.ResponseBuilder.BuildHeaderResponse(ci.ArgAt<long>(0), ci.ArgAt<int>(1), Response.AllCorrect | Response.WithTransactions));

Task<BlockBody[]> buildBlocksResponse = null;
syncPeer.GetBlockBodies(Arg.Any<IReadOnlyList<Keccak>>(), Arg.Any<CancellationToken>())
Expand Down Expand Up @@ -766,7 +786,7 @@ public async Task Throws_on_block_bodies_count_higher_than_receipts_list_count(i

Task<BlockHeader[]> buildHeadersResponse = null;
syncPeer.GetBlockHeaders(Arg.Any<long>(), Arg.Any<int>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns(ci => buildHeadersResponse = ctx.ResponseBuilder.BuildHeaderResponse(ci.ArgAt<long>(0), ci.ArgAt<int>(1), Response.AllCorrect));
.Returns(ci => buildHeadersResponse = ctx.ResponseBuilder.BuildHeaderResponse(ci.ArgAt<long>(0), ci.ArgAt<int>(1), Response.AllCorrect | Response.WithTransactions));

Task<BlockBody[]> buildBlocksResponse = null;
syncPeer.GetBlockBodies(Arg.Any<IReadOnlyList<Keccak>>(), Arg.Any<CancellationToken>())
Expand Down Expand Up @@ -797,7 +817,7 @@ public async Task Does_throw_on_transaction_count_different_than_receipts_count_
syncPeer.TotalDifficulty.Returns(UInt256.MaxValue);
Task<BlockHeader[]> buildHeadersResponse = null;
syncPeer.GetBlockHeaders(Arg.Any<long>(), Arg.Any<int>(), Arg.Any<int>(), Arg.Any<CancellationToken>())
.Returns(ci => buildHeadersResponse = ctx.ResponseBuilder.BuildHeaderResponse(ci.ArgAt<long>(0), ci.ArgAt<int>(1), Response.AllCorrect));
.Returns(ci => buildHeadersResponse = ctx.ResponseBuilder.BuildHeaderResponse(ci.ArgAt<long>(0), ci.ArgAt<int>(1), Response.AllCorrect | Response.WithTransactions));

Task<BlockBody[]> buildBlocksResponse = null;
syncPeer.GetBlockBodies(Arg.Any<IReadOnlyList<Keccak>>(), Arg.Any<CancellationToken>())
Expand Down Expand Up @@ -1095,7 +1115,7 @@ public async Task<BlockHeader[]> BuildHeaderResponse(long startNumber, int numbe
bool justFirst = flags.HasFlag(Response.JustFirst);
bool allKnown = flags.HasFlag(Response.AllKnown);
bool timeoutOnFullBatch = flags.HasFlag(Response.TimeoutOnFullBatch);
bool noBody = flags.HasFlag(Response.NoBody);
bool withTransaction = flags.HasFlag(Response.WithTransactions);

if (timeoutOnFullBatch && number == SyncBatchSize.Max)
{
Expand All @@ -1110,9 +1130,18 @@ public async Task<BlockHeader[]> BuildHeaderResponse(long startNumber, int numbe
for (int i = 1; i < number; i++)
{
Keccak receiptRoot = i == 1 ? Keccak.EmptyTreeHash : new Keccak("0x9904791428367d3f36f2be68daf170039dd0b3d6b23da00697de816a05fb5cc1");
headers[i] = consistent
? Build.A.BlockHeader.WithReceiptsRoot(receiptRoot).WithParent(headers[i - 1]).WithUnclesHash(noBody ? Keccak.OfAnEmptySequenceRlp : Keccak.Zero).TestObject
: Build.A.BlockHeader.WithReceiptsRoot(receiptRoot).WithNumber(headers[i - 1].Number + 1).TestObject;
BlockHeaderBuilder blockHeaderBuilder = consistent
? Build.A.BlockHeader.WithReceiptsRoot(receiptRoot).WithParent(headers[i - 1])
: Build.A.BlockHeader.WithReceiptsRoot(receiptRoot).WithNumber(headers[i - 1].Number + 1);

if (withTransaction)
{
// We don't know the TX root yet, it should be populated by `BuildBlocksResponse` and `BuildReceiptsResponse`.
blockHeaderBuilder.WithTransactionsRoot(Keccak.Compute("something"));
blockHeaderBuilder.WithReceiptsRoot(Keccak.Compute("something"));
}

headers[i] = blockHeaderBuilder.TestObject;

if (allKnown)
{
Expand Down Expand Up @@ -1158,7 +1187,21 @@ public async Task<BlockBody[]> BuildBlocksResponse(IList<Keccak> blockHashes, Re

BlockHeader[] blockHeaders = new BlockHeader[blockHashes.Count];
BlockBody[] blockBodies = new BlockBody[blockHashes.Count];
blockBodies[0] = new BlockBody(new Transaction[0], new BlockHeader[0]);

Block BuildBlockForHeader(BlockHeader header, int txSeed)
{
BlockBuilder blockBuilder = Build.A.Block.WithHeader(header);

if (withTransactions && header.TxRoot != Keccak.EmptyTreeHash)
{
blockBuilder.WithTransactions(Build.A.Transaction.WithValue(txSeed * 2).SignedAndResolved().TestObject,
Build.A.Transaction.WithValue(txSeed * 2 + 1).SignedAndResolved().TestObject);
}

return blockBuilder.TestObject;
}

blockBodies[0] = BuildBlockForHeader(startHeader, 0).Body;
blockHeaders[0] = startHeader;

_bodies[startHeader.Hash] = blockBodies[0];
Expand All @@ -1177,16 +1220,8 @@ public async Task<BlockBody[]> BuildBlocksResponse(IList<Keccak> blockHashes, Re
? blockHeaders[i]
: blockHeaders[i - 1];

BlockBuilder blockBuilder = Build.A.Block.WithHeader(header);

if (withTransactions && header.ReceiptsRoot != Keccak.EmptyTreeHash)
{
blockBuilder.WithTransactions(Build.A.Transaction.WithValue(i * 2).SignedAndResolved().TestObject,
Build.A.Transaction.WithValue(i * 2 + 1).SignedAndResolved().TestObject);
}

Block block = blockBuilder.TestObject;
blockBodies[i] = new BlockBody(block.Transactions, block.Uncles);
Block block = BuildBlockForHeader(header, i);
blockBodies[i] = block.Body;
_bodies[blockHashes[i]] = blockBodies[i];

if (allKnown)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public class BlockDownloadContext
private readonly bool _downloadReceipts;
private readonly IReceiptsRecovery _receiptsRecovery;

public BlockDownloadContext(ISpecProvider specProvider, PeerInfo syncPeer, BlockHeader?[] headers, bool downloadReceipts, IReceiptsRecovery receiptsRecovery)
public BlockDownloadContext(ISpecProvider specProvider, PeerInfo syncPeer, BlockHeader?[] headers,
bool downloadReceipts, IReceiptsRecovery receiptsRecovery)
{
_indexMapping = new Dictionary<int, int>();
_downloadReceipts = downloadReceipts;
Expand Down Expand Up @@ -129,6 +130,12 @@ public bool TrySetReceipts(int index, TxReceipt[]? receipts, out Block block)
return result;
}

public Block GetBlockByRequestIdx(int index)
{
int mappedIndex = _indexMapping[index];
return Blocks[mappedIndex];
}

private void ValidateReceipts(Block block, TxReceipt[] blockReceipts)
{
Keccak receiptsRoot = new ReceiptTrie(_specProvider.GetSpec(block.Number), blockReceipts).RootHash;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,11 @@ bool HasMoreToSync()
Block currentBlock = blocks[blockIndex];
if (_logger.IsTrace) _logger.Trace($"Received {currentBlock} from {bestPeer}");

if (currentBlock.IsBodyMissing)
{
throw new EthSyncException($"{bestPeer} didn't send body for block {currentBlock.ToString(Block.Format.Short)}.");
}

// can move this to block tree now?
if (!_blockValidator.ValidateSuggestedBlock(currentBlock))
{
Expand Down Expand Up @@ -457,6 +462,12 @@ protected async Task RequestBodies(PeerInfo peer, CancellationToken cancellation
context.SetBody(i + offset, result[i]);
}

if (result.Length == 0)
{
if (_logger.IsTrace) _logger.Trace($"Peer sent no bodies. Peer: {peer}, Request: {hashesToRequest.Count}");
return;
}

offset += result.Length;
}
}
Expand All @@ -475,7 +486,13 @@ protected async Task RequestReceipts(PeerInfo peer, CancellationToken cancellati
for (int i = 0; i < result.Length; i++)
{
TxReceipt[] txReceipts = result[i];
if (!context.TrySetReceipts(i + offset, txReceipts, out Block block))
Block block = context.GetBlockByRequestIdx(i + offset);
if (block.IsBodyMissing)
{
if (_logger.IsTrace) _logger.Trace($"Found incomplete blocks. {block.Hash}");
return;
}
if (!context.TrySetReceipts(i + offset, txReceipts, out block))
{
throw new EthSyncException($"{peer} sent invalid receipts for block {block.ToString(Block.Format.Short)}.");
}
Expand Down