Skip to content

Commit

Permalink
Make PeerRefresher more sophisticated with batching
Browse files Browse the repository at this point in the history
  • Loading branch information
LukaszRozmej committed May 24, 2022
1 parent f36c767 commit a553d9c
Show file tree
Hide file tree
Showing 14 changed files with 74 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@ public class BlockValidator : IBlockValidator
_headerValidator = headerValidator ?? throw new ArgumentNullException(nameof(headerValidator));
}

public bool ValidateHash(BlockHeader header)
{
return _headerValidator.ValidateHash(header);
}

public bool Validate(BlockHeader header, BlockHeader? parent, bool isUncle)
{
return _headerValidator.Validate(header, parent, isUncle);
Expand Down
16 changes: 6 additions & 10 deletions src/Nethermind/Nethermind.Consensus/Validators/HeaderValidator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,7 @@ public class HeaderValidator : IHeaderValidator
_daoBlockNumber = specProvider.DaoBlockNumber;
}

public virtual bool ValidateHash(BlockHeader header)
{
bool hashAsExpected = header.Hash == header.CalculateHash();
if (!hashAsExpected)
{
if (_logger.IsWarn) _logger.Warn($"Invalid block header ({header.Hash}) - invalid block hash");
}

return hashAsExpected;
}
public static bool ValidateHash(BlockHeader header) => header.Hash == header.CalculateHash();

/// <summary>
/// Note that this does not validate seal which is the responsibility of <see cref="ISealValidator"/>>
Expand All @@ -73,6 +64,11 @@ public virtual bool ValidateHash(BlockHeader header)
public virtual bool Validate(BlockHeader header, BlockHeader? parent, bool isUncle = false)
{
bool hashAsExpected = ValidateHash(header);

if (!hashAsExpected)
{
if (_logger.IsWarn) _logger.Warn($"Invalid block header ({header.Hash}) - invalid block hash");
}

IReleaseSpec spec = _specProvider.GetSpec(header.Number);
bool extraDataValid = ValidateExtraData(header, parent, spec, isUncle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ namespace Nethermind.Consensus.Validators
{
public interface IHeaderValidator
{
bool ValidateHash(BlockHeader header);
bool Validate(BlockHeader header, BlockHeader? parent, bool isUncle = false);
bool Validate(BlockHeader header, bool isUncle = false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private IEngineRpcModule CreateEngineModule(MergeTestBlockchain chain, ISyncConf
{
IPeerRefresher peerRefresher = Substitute.For<IPeerRefresher>();

chain.BeaconPivot = new BeaconPivot(syncConfig ?? new SyncConfig(), chain.MergeConfig, new MemDb(), chain.BlockTree, peerRefresher, chain.LogManager);
chain.BeaconPivot = new BeaconPivot(syncConfig ?? new SyncConfig(), chain.MergeConfig, new MemDb(), chain.BlockTree, chain.LogManager);
BlockCacheService blockCacheService = new();
chain.BeaconSync = new BeaconSync(chain.BeaconPivot, chain.BlockTree, syncConfig ?? new SyncConfig(), blockCacheService, chain.LogManager);
return new EngineRpcModule(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private class Context
_syncConfig,
LimboLogs.Instance);
TotalDifficultyBasedBetterPeerStrategy bestPeerStrategy = new (syncProgressResolver, LimboLogs.Instance);
BeaconPivot = beaconPivot ?? new BeaconPivot(_syncConfig, _mergeConfig, _metadataDb, BlockTree, new PeerRefresher(peerPool), LimboLogs.Instance);
BeaconPivot = beaconPivot ?? new BeaconPivot(_syncConfig, _mergeConfig, _metadataDb, BlockTree, LimboLogs.Instance);
BeaconSync = new(BeaconPivot, BlockTree, _syncConfig, new BlockCacheService(), LimboLogs.Instance);
ISyncModeSelector selector = new MultiSyncModeSelector(syncProgressResolver, peerPool, _syncConfig, BeaconSync, bestPeerStrategy, LimboLogs.Instance);
Feed = new BeaconHeadersSyncFeed(poSSwitcher, selector, blockTree, peerPool, _syncConfig, report, BeaconPivot, _mergeConfig, LimboLogs.Instance);
Expand Down Expand Up @@ -255,7 +255,7 @@ private void BuildHeadersSyncBatchResponse(HeadersSyncBatch batch, IBlockTree bl
private IBeaconPivot PreparePivot(long blockNumber, ISyncConfig syncConfig, IBlockTree blockTree, BlockHeader? pivotHeader = null)
{
IPeerRefresher peerRefresher = Substitute.For<IPeerRefresher>();
IBeaconPivot pivot = new BeaconPivot(syncConfig, new MergeConfig() { Enabled = true }, new MemDb(), blockTree, peerRefresher, LimboLogs.Instance);
IBeaconPivot pivot = new BeaconPivot(syncConfig, new MergeConfig() { Enabled = true }, new MemDb(), blockTree, LimboLogs.Instance);
pivot.EnsurePivot(pivotHeader ?? Build.A.BlockHeader.WithNumber(blockNumber).TestObject);
return pivot;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void Setup()
public void Beacon_pivot_defaults_to_sync_config_values_when_there_is_no_pivot()
{
IPeerRefresher peerRefresher = Substitute.For<IPeerRefresher>();
IBeaconPivot pivot = new BeaconPivot(_syncConfig, new MergeConfig() { Enabled = true }, new MemDb(), Substitute.For<IBlockTree>(), peerRefresher, LimboLogs.Instance);
IBeaconPivot pivot = new BeaconPivot(_syncConfig, new MergeConfig() { Enabled = true }, new MemDb(), Substitute.For<IBlockTree>(), LimboLogs.Instance);
pivot.PivotHash.Should().Be(_syncConfig.PivotHashParsed);
pivot.PivotNumber.Should().Be(_syncConfig.PivotNumberParsed);
pivot.PivotDestinationNumber.Should().Be(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class ForkchoiceUpdatedV1Handler : IForkchoiceUpdatedV1Handler
if (_blockCacheService.BlockCache.TryGetValue(forkchoiceState.HeadBlockHash, out Block? block))
{
_mergeSyncController.InitSyncing(block.Header);
_peerRefresher.RefreshPeers(block.ParentHash);
_blockCacheService.SyncingHead = forkchoiceState.HeadBlockHash;
_blockCacheService.FinalizedHash = forkchoiceState.FinalizedBlockHash;

Expand All @@ -106,10 +107,7 @@ public class ForkchoiceUpdatedV1Handler : IForkchoiceUpdatedV1Handler

if (!_blockTree.WasProcessed(newHeadBlock.Number, newHeadBlock.Hash ?? newHeadBlock.CalculateHash()))
{
// ToDO of course we shouldn't refresh the peers in this way. This need to be optimized and we need to rethink refreshing
if (i % 10 == 0)
_peerRefresher.RefreshPeers(newHeadBlock.Hash!);
++i;
_peerRefresher.RefreshPeers(newHeadBlock.ParentHash);
_blockCacheService.SyncingHead = forkchoiceState.HeadBlockHash;
_blockCacheService.FinalizedHash = forkchoiceState.FinalizedBlockHash;
if (_logger.IsInfo) { _logger.Info($"Syncing beacon headers... Request: {requestStr}"); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public async Task<ResultWrapper<PayloadStatusV1>> HandleAsync(BlockRequestResult
return NewPayloadV1Result.Invalid(null, $"Block {request} could not be parsed as a block");
}

if (_blockValidator.ValidateHash(block.Header) == false)
if (!HeaderValidator.ValidateHash(block.Header))
{
if (_logger.IsWarn)
_logger.Warn($"InvalidBlockHash. Result of {requestStr}");
Expand Down
6 changes: 4 additions & 2 deletions src/Nethermind/Nethermind.Merge.Plugin/MergePlugin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,10 @@ public Task InitSynchronization()
if (_api.UnclesValidator is null) throw new ArgumentNullException(nameof(_api.UnclesValidator));

// ToDo strange place for validators initialization
_peerRefresher = new PeerRefresher(_api.SyncPeerPool);
_beaconPivot = new BeaconPivot(_syncConfig, _mergeConfig, _api.DbProvider.MetadataDb, _api.BlockTree, new PeerRefresher(_api.SyncPeerPool), _api.LogManager);
PeerRefresher peerRefresher = new(_api.SyncPeerPool, _api.TimerFactory);
_peerRefresher = peerRefresher;
_api.DisposeStack.Push(peerRefresher);
_beaconPivot = new BeaconPivot(_syncConfig, _mergeConfig, _api.DbProvider.MetadataDb, _api.BlockTree, _api.LogManager);
_api.HeaderValidator = new MergeHeaderValidator(_poSSwitcher, _api.BlockTree, _api.SpecProvider, _api.SealValidator, _api.LogManager);
_api.UnclesValidator = new MergeUnclesValidator(_poSSwitcher, _api.UnclesValidator);
_api.BlockValidator = new BlockValidator(_api.TxValidator, _api.HeaderValidator, _api.UnclesValidator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public class BeaconPivot : IBeaconPivot
private readonly IMergeConfig _mergeConfig;
private readonly IDb _metadataDb;
private readonly IBlockTree _blockTree;
private readonly IPeerRefresher _peerRefresher;
private readonly ILogger _logger;
private BlockHeader? _currentBeaconPivot;
private BlockHeader? _pivotParent;
Expand Down Expand Up @@ -62,14 +61,12 @@ public class BeaconPivot : IBeaconPivot
IMergeConfig mergeConfig,
IDb metadataDb,
IBlockTree blockTree,
IPeerRefresher peerRefresher,
ILogManager logManager)
{
_syncConfig = syncConfig;
_mergeConfig = mergeConfig;
_metadataDb = metadataDb;
_blockTree = blockTree;
_peerRefresher = peerRefresher;
_logger = logManager.GetClassLogger();
LoadBeaconPivot();
}
Expand All @@ -90,8 +87,6 @@ public void EnsurePivot(BlockHeader? blockHeader)
bool beaconPivotExists = BeaconPivotExists();
if (blockHeader != null)
{
_peerRefresher.RefreshPeers(blockHeader.Hash!);

// ToDo Sarah in some cases this could be wrong
if (beaconPivotExists && (PivotNumber > blockHeader.Number || blockHeader.Hash == PivotHash))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,71 @@
// along with the Nethermind. If not, see <http://www.gnu.org/licenses/>.
//

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Nethermind.Core.Crypto;
using Nethermind.Core.Timers;
using Nethermind.Synchronization.Peers;

namespace Nethermind.Merge.Plugin.Synchronization;

public class PeerRefresher : IPeerRefresher
public class PeerRefresher : IPeerRefresher, IAsyncDisposable
{
private readonly ISyncPeerPool _syncPeerPool;
private static readonly TimeSpan _minRefreshDelay = TimeSpan.FromSeconds(10);
private DateTime _lastRefresh = DateTime.MinValue;
private Keccak _lastHash = Keccak.Zero;
private readonly ITimer _refreshTimer;

public PeerRefresher(ISyncPeerPool syncPeerPool)
public PeerRefresher(ISyncPeerPool syncPeerPool, ITimerFactory timerFactory)
{
_refreshTimer = timerFactory.CreateTimer(_minRefreshDelay);
_refreshTimer.Elapsed += TimerOnElapsed;
_refreshTimer.AutoReset = false;
_syncPeerPool = syncPeerPool;
}

public void RefreshPeers(Keccak blockHash)
public void RefreshPeers(Keccak? blockHash)
{
IEnumerable<PeerInfo> peers = _syncPeerPool.InitializedPeers;
foreach (PeerInfo peer in peers)
if (blockHash is not null)
{
_lastHash = blockHash;
TimeSpan timePassed = DateTime.Now - _lastRefresh;
if (timePassed > _minRefreshDelay)
{
Refresh(blockHash);
}
else if (!_refreshTimer.Enabled)
{
_refreshTimer.Interval = _minRefreshDelay - timePassed;
_refreshTimer.Start();
}
}
}

private void TimerOnElapsed(object? sender, EventArgs e)
{
Refresh(_lastHash);
}

private void Refresh(Keccak blockHash)
{
_lastRefresh = DateTime.Now;
foreach (PeerInfo peer in _syncPeerPool.InitializedPeers)
{
_syncPeerPool.RefreshTotalDifficulty(peer.SyncPeer, blockHash);
}
}

public ValueTask DisposeAsync()
{
_refreshTimer.Dispose();
return default;
}
}

public interface IPeerRefresher
{
void RefreshPeers(Keccak blockHash);
void RefreshPeers(Keccak? blockHash);
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ public async Task Merge_Happy_path(long headNumber, int options, int threshold,
IBlockCacheService blockCacheService = new BlockCacheService();
PoSSwitcher posSwitcher = new(new MergeConfig() { Enabled = true, TerminalTotalDifficulty = "0" }, new SyncConfig(), metadataDb, notSyncedTree,
RopstenSpecProvider.Instance, blockCacheService, LimboLogs.Instance);
BeaconPivot beaconPivot = new(new SyncConfig(), mergeConfig, metadataDb, notSyncedTree,
new PeerRefresher(Substitute.For<ISyncPeerPool>()), LimboLogs.Instance);
BeaconPivot beaconPivot = new(new SyncConfig(), mergeConfig, metadataDb, notSyncedTree, LimboLogs.Instance);
beaconPivot.EnsurePivot(blockTrees.SyncedTree.FindHeader(16, BlockTreeLookupOptions.None));
MergeBlockDownloader downloader = new(posSwitcher, beaconPivot, ctx.Feed, ctx.PeerPool, notSyncedTree,
Always.Valid, Always.Valid, NullSyncReport.Instance, receiptStorage, RopstenSpecProvider.Instance,
Expand Down Expand Up @@ -118,8 +117,7 @@ public async Task Can_reach_terminal_block(long headNumber, int options, int thr
IBlockCacheService blockCacheService = new BlockCacheService();
PoSSwitcher posSwitcher = new(new MergeConfig() { Enabled = true, TerminalTotalDifficulty = "10000000" }, new SyncConfig(), metadataDb, notSyncedTree,
RopstenSpecProvider.Instance, blockCacheService, LimboLogs.Instance);
BeaconPivot beaconPivot = new(new SyncConfig(), mergeConfig, metadataDb, notSyncedTree,
new PeerRefresher(Substitute.For<ISyncPeerPool>()), LimboLogs.Instance);
BeaconPivot beaconPivot = new(new SyncConfig(), mergeConfig, metadataDb, notSyncedTree, LimboLogs.Instance);
if (withBeaconPivot)
beaconPivot.EnsurePivot(blockTrees.SyncedTree.FindHeader(16, BlockTreeLookupOptions.None));
MergeBlockDownloader downloader = new(posSwitcher, beaconPivot, ctx.Feed, ctx.PeerPool, notSyncedTree,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ synchronizerType switch
if (IsMerge(synchronizerType))
{
IBeaconPivot beaconPivot = new BeaconPivot(syncConfig, mergeConfig, dbProvider.MetadataDb,
BlockTree, new PeerRefresher(SyncPeerPool), _logManager);
BlockTree, _logManager);
blockDownloaderFactory = new MergeBlockDownloaderFactory(
poSSwitcher,
beaconPivot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
using Nethermind.Blockchain;
using Nethermind.Blockchain.Find;
using Nethermind.Blockchain.Synchronization;
using Nethermind.Consensus.Validators;
using Nethermind.Core;
using Nethermind.Core.Crypto;
using Nethermind.Core.Extensions;
Expand Down Expand Up @@ -623,6 +624,14 @@ private async Task ExecuteRefreshTask(RefreshTotalDiffTask refreshTotalDiffTask,
return;
}
if (!HeaderValidator.ValidateHash(header))
{
if (_logger.IsTrace) _logger.Trace($"InitPeerInfo failed for node: {syncPeer.Node:c}{Environment.NewLine}Invalid block hash.");
_stats.ReportSyncEvent(syncPeer.Node, syncPeer.IsInitialized ? NodeStatsEventType.SyncFailed : NodeStatsEventType.SyncInitFailed);
syncPeer.Disconnect(DisconnectReason.DisconnectRequested, "refresh peer info fault - invalid header hash");
return;
}
if (_logger.IsTrace) _logger.Trace($"Received head block info from {syncPeer.Node:c} with head block {header.ToString(BlockHeader.Format.Short)}, total difficulty {header.TotalDifficulty}");
if (!syncPeer.IsInitialized) _stats.ReportSyncEvent(syncPeer.Node, NodeStatsEventType.SyncInitCompleted);
Expand Down

0 comments on commit a553d9c

Please sign in to comment.