diff --git a/src/neo/Ledger/Blockchain.cs b/src/neo/Ledger/Blockchain.cs index 9090a5b339..5a7888881e 100644 --- a/src/neo/Ledger/Blockchain.cs +++ b/src/neo/Ledger/Blockchain.cs @@ -61,7 +61,6 @@ private class ParallelVerified { public Transaction Transaction; public bool Sho private readonly NeoSystem system; private readonly List header_index = new List(); private uint stored_header_count = 0; - private readonly Dictionary block_cache = new Dictionary(); private readonly Dictionary> block_cache_unverified = new Dictionary>(); internal readonly RelayCache ConsensusRelayCache = new RelayCache(100); private SnapshotView currentSnapshot; @@ -142,7 +141,6 @@ public Blockchain(NeoSystem system, IStore store) public bool ContainsBlock(UInt256 hash) { - if (block_cache.ContainsKey(hash)) return true; return View.ContainsBlock(hash); } @@ -189,8 +187,6 @@ public Block GetBlock(uint index) public Block GetBlock(UInt256 hash) { - if (block_cache.TryGetValue(hash, out Block block)) - return block; return View.GetBlock(hash); } @@ -215,8 +211,6 @@ public Header GetHeader(uint index) public Header GetHeader(UInt256 hash) { - if (block_cache.TryGetValue(hash, out Block block)) - return block.Header; return View.GetHeader(hash); } @@ -256,12 +250,19 @@ private void OnImport(IEnumerable blocks, bool verify) private void AddUnverifiedBlockToCache(Block block) { + // Check if any block proposal for height `block.Index` exists if (!block_cache_unverified.TryGetValue(block.Index, out LinkedList blocks)) { + // There are no blocks, a new LinkedList is created and, consequently, the current block is added to the list blocks = new LinkedList(); block_cache_unverified.Add(block.Index, blocks); } - + // Check if any block with the hash being added already exists on possible candidates to be processed + foreach (var unverifiedBlock in blocks) + { + if (block.Hash == unverifiedBlock.Hash) + return; + } blocks.AddLast(block); } @@ -294,53 +295,23 @@ private RelayResultReason OnNewBlock(Block block) { if (block.Index <= Height) return RelayResultReason.AlreadyExists; - if (block_cache.ContainsKey(block.Hash)) - return RelayResultReason.AlreadyExists; - if (block.Index - 1 >= header_index.Count) + if (block.Index - 1 > Height) { AddUnverifiedBlockToCache(block); return RelayResultReason.UnableToVerify; } - if (block.Index == header_index.Count) - { - if (!block.Verify(currentSnapshot)) - return RelayResultReason.Invalid; - } - else - { - if (!block.Hash.Equals(header_index[(int)block.Index])) - return RelayResultReason.Invalid; - } if (block.Index == Height + 1) { - Block block_persist = block; - List blocksToPersistList = new List(); - while (true) - { - blocksToPersistList.Add(block_persist); - if (block_persist.Index + 1 >= header_index.Count) break; - UInt256 hash = header_index[(int)block_persist.Index + 1]; - if (!block_cache.TryGetValue(hash, out block_persist)) break; - } - - int blocksPersisted = 0; - foreach (Block blockToPersist in blocksToPersistList) + if (!block.Verify(currentSnapshot)) { - block_cache_unverified.Remove(blockToPersist.Index); - Persist(blockToPersist); - - // 15000 is the default among of seconds per block, while MilliSecondsPerBlock is the current - uint extraBlocks = (15000 - MillisecondsPerBlock) / 1000; - - if (blocksPersisted++ < blocksToPersistList.Count - (2 + Math.Max(0, extraBlocks))) continue; - // Empirically calibrated for relaying the most recent 2 blocks persisted with 15s network - // Increase in the rate of 1 block per second in configurations with faster blocks - - if (blockToPersist.Index + 100 >= header_index.Count) - system.LocalNode.Tell(new LocalNode.RelayDirectly { Inventory = blockToPersist }); + system.SyncManager.Tell(new SyncManager.InvalidBlockIndex { InvalidIndex = block.Index }); + return RelayResultReason.Invalid; } + block_cache_unverified.Remove(block.Index); + Persist(block); + system.LocalNode.Tell(Message.Create(MessageCommand.Ping, PingPayload.Create(Singleton.Height))); + system.SyncManager.Tell(new SyncManager.PersistedBlockIndex { PersistedIndex = block.Index }); SaveHeaderHashList(); - if (block_cache_unverified.TryGetValue(Height + 1, out LinkedList unverifiedBlocks)) { foreach (var unverifiedBlock in unverifiedBlocks) @@ -348,24 +319,6 @@ private RelayResultReason OnNewBlock(Block block) block_cache_unverified.Remove(Height + 1); } } - else - { - block_cache.Add(block.Hash, block); - if (block.Index + 100 >= header_index.Count) - system.LocalNode.Tell(new LocalNode.RelayDirectly { Inventory = block }); - if (block.Index == header_index.Count) - { - header_index.Add(block.Hash); - using (SnapshotView snapshot = GetSnapshot()) - { - snapshot.Blocks.Add(block.Hash, block.Header.Trim()); - snapshot.HeaderHashIndex.GetAndChange().Set(block); - SaveHeaderHashList(snapshot); - snapshot.Commit(); - } - UpdateCurrentSnapshot(); - } - } return RelayResultReason.Succeed; } @@ -378,27 +331,6 @@ private RelayResultReason OnNewConsensus(ConsensusPayload payload) return RelayResultReason.Succeed; } - private void OnNewHeaders(Header[] headers) - { - using (SnapshotView snapshot = GetSnapshot()) - { - foreach (Header header in headers) - { - if (header.Index - 1 >= header_index.Count) break; - if (header.Index < header_index.Count) continue; - if (!header.Verify(snapshot)) break; - header_index.Add(header.Hash); - snapshot.Blocks.Add(header.Hash, header.Trim()); - snapshot.HeaderHashIndex.GetAndChange().Hash = header.Hash; - snapshot.HeaderHashIndex.GetAndChange().Index = header.Index; - } - SaveHeaderHashList(snapshot); - snapshot.Commit(); - } - UpdateCurrentSnapshot(); - system.TaskManager.Tell(new TaskManager.HeaderTaskCompleted(), Sender); - } - private void OnNewTransaction(Transaction transaction, bool relay) { RelayResultReason reason; @@ -445,7 +377,6 @@ private void OnParallelVerified(ParallelVerified parallelVerified) private void OnPersistCompleted(Block block) { - block_cache.Remove(block.Hash); MemPool.UpdatePoolForBlockPersisted(block, currentSnapshot); Context.System.EventStream.Publish(new PersistCompleted { Block = block }); } @@ -460,9 +391,6 @@ protected override void OnReceive(object message) case FillMemoryPool fill: OnFillMemoryPool(fill.Transactions); break; - case Header[] headers: - OnNewHeaders(headers); - break; case Block block: Sender.Tell(OnNewBlock(block)); break; @@ -614,7 +542,6 @@ internal protected override bool IsHighPriority(object message) { switch (message) { - case Header[] _: case Block _: case ConsensusPayload _: case Terminated _: diff --git a/src/neo/NeoSystem.cs b/src/neo/NeoSystem.cs index b837426d45..d4e7e7b503 100644 --- a/src/neo/NeoSystem.cs +++ b/src/neo/NeoSystem.cs @@ -21,6 +21,7 @@ public class NeoSystem : IDisposable public IActorRef Blockchain { get; } public IActorRef LocalNode { get; } internal IActorRef TaskManager { get; } + internal IActorRef SyncManager { get; } public IActorRef Consensus { get; private set; } private readonly IStore store; @@ -36,6 +37,7 @@ public NeoSystem(string storageEngine = null) this.Blockchain = ActorSystem.ActorOf(Ledger.Blockchain.Props(this, store)); this.LocalNode = ActorSystem.ActorOf(Network.P2P.LocalNode.Props(this)); this.TaskManager = ActorSystem.ActorOf(Network.P2P.TaskManager.Props(this)); + this.SyncManager = ActorSystem.ActorOf(Network.P2P.SyncManager.Props(this)); foreach (var plugin in Plugin.Plugins) plugin.OnPluginsLoaded(); } diff --git a/src/neo/Network/P2P/MessageCommand.cs b/src/neo/Network/P2P/MessageCommand.cs index ccddacfba2..1d5566cb31 100644 --- a/src/neo/Network/P2P/MessageCommand.cs +++ b/src/neo/Network/P2P/MessageCommand.cs @@ -20,7 +20,7 @@ public enum MessageCommand : byte Pong = 0x19, //synchronization - [ReflectionCache(typeof(GetBlocksPayload))] + [ReflectionCache(typeof(GetHeadersPayload))] GetHeaders = 0x20, [ReflectionCache(typeof(HeadersPayload))] Headers = 0x21, diff --git a/src/neo/Network/P2P/NodeSession.cs b/src/neo/Network/P2P/NodeSession.cs new file mode 100644 index 0000000000..e6e13c5d36 --- /dev/null +++ b/src/neo/Network/P2P/NodeSession.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; + +namespace Neo.Network.P2P +{ + internal class NodeSession + { + public readonly Dictionary InvTasks = new Dictionary(); + public Dictionary IndexTasks = new Dictionary(); + + public uint TimeoutTimes = 0; + public uint InvalidBlockCount = 0; + + public bool HasInvTask => InvTasks.Count > 0; + public bool HasIndexTask => IndexTasks.Count > 0; + } +} diff --git a/src/neo/Network/P2P/Payloads/GetHeadersPayload.cs b/src/neo/Network/P2P/Payloads/GetHeadersPayload.cs new file mode 100644 index 0000000000..1ca43a38e5 --- /dev/null +++ b/src/neo/Network/P2P/Payloads/GetHeadersPayload.cs @@ -0,0 +1,37 @@ +using Neo.IO; +using System; +using System.IO; + +namespace Neo.Network.P2P.Payloads +{ + public class GetHeadersPayload : ISerializable + { + public uint IndexStart; + public short Count; + + public int Size => sizeof(uint) + sizeof(short); + + public static GetHeadersPayload Create(uint index_start, short count = -1) + { + return new GetHeadersPayload + { + IndexStart = index_start, + Count = count + }; + } + + void ISerializable.Deserialize(BinaryReader reader) + { + IndexStart = reader.ReadUInt32(); + Count = reader.ReadInt16(); + if (Count < -1 || Count == 0 || Count > HeadersPayload.MaxHeadersCount) + throw new FormatException(); + } + + void ISerializable.Serialize(BinaryWriter writer) + { + writer.Write(IndexStart); + writer.Write(Count); + } + } +} diff --git a/src/neo/Network/P2P/ProtocolHandler.cs b/src/neo/Network/P2P/ProtocolHandler.cs index a97cb46922..0f3690f306 100644 --- a/src/neo/Network/P2P/ProtocolHandler.cs +++ b/src/neo/Network/P2P/ProtocolHandler.cs @@ -89,7 +89,7 @@ private void OnMessage(Message msg) OnAddrMessageReceived((AddrPayload)msg.Payload); break; case MessageCommand.Block: - OnInventoryReceived((Block)msg.Payload); + OnBlockReceived((Block)msg.Payload); break; case MessageCommand.Consensus: OnInventoryReceived((ConsensusPayload)msg.Payload); @@ -116,7 +116,7 @@ private void OnMessage(Message msg) OnGetDataMessageReceived((InvPayload)msg.Payload); break; case MessageCommand.GetHeaders: - OnGetHeadersMessageReceived((GetBlocksPayload)msg.Payload); + OnGetHeadersMessageReceived((GetHeadersPayload)msg.Payload); break; case MessageCommand.Headers: OnHeadersMessageReceived((HeadersPayload)msg.Payload); @@ -148,6 +148,12 @@ private void OnMessage(Message msg) } } + private void OnBlockReceived(Block payload) + { + if (payload != null) + system.SyncManager.Tell(payload, Context.Parent); + } + private void OnAddrMessageReceived(AddrPayload payload) { system.LocalNode.Tell(new Peer.Peers @@ -282,24 +288,20 @@ private void OnGetDataMessageReceived(InvPayload payload) /// /// Will be triggered when a MessageCommand.GetHeaders message is received. - /// Tell the specified number of blocks' headers starting with the requested HashStart to RemoteNode actor. + /// Tell the specified number of blocks' headers starting with the requested IndexStart to RemoteNode actor. /// A limit set by HeadersPayload.MaxHeadersCount is also applied to the number of requested Headers, namely payload.Count. /// - /// A GetBlocksPayload including start block Hash and number of blocks' headers requested. - private void OnGetHeadersMessageReceived(GetBlocksPayload payload) + /// A GetBlocksPayload including start block index and number of blocks' headers requested. + private void OnGetHeadersMessageReceived(GetHeadersPayload payload) { - UInt256 hash = payload.HashStart; - int count = payload.Count < 0 || payload.Count > HeadersPayload.MaxHeadersCount ? HeadersPayload.MaxHeadersCount : payload.Count; - DataCache cache = Blockchain.Singleton.View.Blocks; - TrimmedBlock state = cache.TryGet(hash); - if (state == null) return; + uint index = payload.IndexStart; + int count = payload.Count < 0 ? HeadersPayload.MaxHeadersCount : payload.Count; + if (index > Blockchain.Singleton.HeaderHeight) + return; List
headers = new List
(); - for (uint i = 1; i <= count; i++) + for (uint i = 0; i < count; i++) { - uint index = state.Index + i; - hash = Blockchain.Singleton.GetBlockHash(index); - if (hash == null) break; - Header header = cache.TryGet(hash)?.Header; + var header = Blockchain.Singleton.GetHeader(index + i); if (header == null) break; headers.Add(header); } diff --git a/src/neo/Network/P2P/RemoteNode.cs b/src/neo/Network/P2P/RemoteNode.cs index 44bb2ef107..f9baf11947 100644 --- a/src/neo/Network/P2P/RemoteNode.cs +++ b/src/neo/Network/P2P/RemoteNode.cs @@ -16,6 +16,7 @@ namespace Neo.Network.P2P public class RemoteNode : Connection { internal class Relay { public IInventory Inventory; } + internal NodeSession session = new NodeSession(); private readonly NeoSystem system; private readonly IActorRef protocol; @@ -161,7 +162,7 @@ private void OnPingPayload(PingPayload payload) if (payload.LastBlockIndex > LastBlockIndex) { LastBlockIndex = payload.LastBlockIndex; - system.TaskManager.Tell(new TaskManager.Update { LastBlockIndex = LastBlockIndex }); + system.SyncManager.Tell(new SyncManager.StartSync { }); } } @@ -195,7 +196,8 @@ private void OnSetFilter(BloomFilter filter) private void OnVerack() { verack = true; - system.TaskManager.Tell(new TaskManager.Register { Version = Version }); + system.TaskManager.Tell(new TaskManager.Register { Node = this }); + system.SyncManager.Tell(new SyncManager.Register { Node = this }); CheckMessageQueue(); } diff --git a/src/neo/Network/P2P/SyncManager.cs b/src/neo/Network/P2P/SyncManager.cs new file mode 100644 index 0000000000..da4234e071 --- /dev/null +++ b/src/neo/Network/P2P/SyncManager.cs @@ -0,0 +1,196 @@ +using Akka.Actor; +using Neo.Ledger; +using Neo.Network.P2P.Payloads; +using System; +using System.Collections.Generic; +using System.Linq; + +namespace Neo.Network.P2P +{ + internal class SyncManager : UntypedActor + { + public class Register { public RemoteNode Node; } + public class PersistedBlockIndex { public uint PersistedIndex; } + public class InvalidBlockIndex { public uint InvalidIndex; } + public class StartSync { } + private class Timer { } + + private static readonly TimeSpan TimerInterval = TimeSpan.FromSeconds(30); + private static readonly TimeSpan SyncTimeout = TimeSpan.FromMinutes(1); + + private readonly Dictionary nodes = new Dictionary(); + private readonly Dictionary receivedBlockIndex = new Dictionary(); + private readonly NeoSystem system; + private readonly ICancelable timer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimerInterval, TimerInterval, Context.Self, new Timer(), ActorRefs.NoSender); + private readonly List failedTasks = new List(); + + private const int MaxTasksCount = 50; + private const int PingCoolingOffPeriod = 60; // in secconds. + + private uint lastTaskIndex = 0; + + public SyncManager(NeoSystem system) + { + this.system = system; + this.lastTaskIndex = Blockchain.Singleton.Height; + } + + protected override void OnReceive(object message) + { + switch (message) + { + case Register register: + OnRegister(register.Node); + break; + case Block block: + OnReceiveBlock(block); + break; + case PersistedBlockIndex blockIndex: + OnReceivePersistedBlockIndex(blockIndex); + break; + case InvalidBlockIndex invalidBlockIndex: + OnReceiveInvalidBlockIndex(invalidBlockIndex); + break; + case StartSync _: + RequestSync(); + break; + case Timer _: + OnTimer(); + break; + case Terminated terminated: + OnTerminated(terminated.ActorRef); + break; + } + } + + private void OnRegister(RemoteNode node) + { + Context.Watch(Sender); + nodes.Add(Sender, node); + RequestSync(); + } + + private void OnReceiveBlock(Block block) + { + var node = nodes.Values.FirstOrDefault(p => p.session.IndexTasks.ContainsKey(block.Index)); + if (node is null) return; + node.session.IndexTasks.Remove(block.Index); + receivedBlockIndex.Add(block.Index, node); + system.Blockchain.Tell(block); + RequestSync(); + } + + private void OnReceivePersistedBlockIndex(PersistedBlockIndex blockIndex) + { + receivedBlockIndex.Remove(blockIndex.PersistedIndex); + } + + private void OnReceiveInvalidBlockIndex(InvalidBlockIndex invalidBlockIndex) + { + receivedBlockIndex.TryGetValue(invalidBlockIndex.InvalidIndex, out RemoteNode node); + if (node is null) return; + node.session.InvalidBlockCount++; + node.session.IndexTasks.Remove(invalidBlockIndex.InvalidIndex); + receivedBlockIndex.Remove(invalidBlockIndex.InvalidIndex); + AssignTask(invalidBlockIndex.InvalidIndex, node.session); + } + + private void RequestSync() + { + if (nodes.Count() == 0) return; + + SendPingMessage(); + + while (failedTasks.Count() > 0) + { + if (failedTasks[0] <= Blockchain.Singleton.Height) + { + failedTasks.Remove(failedTasks[0]); + continue; + } + if (!AssignTask(failedTasks[0])) return; + } + + int taskCounts = nodes.Values.Sum(p => p.session.IndexTasks.Count); + var highestBlockIndex = nodes.Values.Max(p => p.LastBlockIndex); + for (; taskCounts < MaxTasksCount; taskCounts++) + { + if (lastTaskIndex >= highestBlockIndex) break; + if (!AssignTask(++lastTaskIndex)) break; + } + } + + private void OnTimer() + { + foreach (var node in nodes.Values) + { + foreach (KeyValuePair kvp in node.session.IndexTasks) + { + if (DateTime.UtcNow - kvp.Value > SyncTimeout) + { + node.session.IndexTasks.Remove(kvp.Key); + node.session.TimeoutTimes++; + AssignTask(kvp.Key, node.session); + } + } + } + RequestSync(); + } + + private void OnTerminated(IActorRef actor) + { + if (!nodes.TryGetValue(Sender, out RemoteNode remoteNode)) + return; + NodeSession session = remoteNode.session; + foreach (uint index in session.IndexTasks.Keys) + AssignTask(index, session); + nodes.Remove(actor); + } + + private bool AssignTask(uint index, NodeSession filterSession = null) + { + if (index <= Blockchain.Singleton.Height || nodes.Values.Any(p => p.session != filterSession && p.session.IndexTasks.ContainsKey(index))) + return true; + Random rand = new Random(); + KeyValuePair remoteNode = nodes.Where(p => p.Value.session != filterSession && p.Value.LastBlockIndex >= index) + .OrderBy(p => p.Value.session.IndexTasks.Count) + .ThenBy(s => rand.Next()) + .FirstOrDefault(); + if (remoteNode.Value == null) + { + failedTasks.Add(index); + return false; + } + NodeSession session = remoteNode.Value.session; + session.IndexTasks.Add(index, DateTime.UtcNow); + remoteNode.Key.Tell(Message.Create(MessageCommand.GetBlockData, GetBlockDataPayload.Create(index, 1))); + failedTasks.Remove(index); + return true; + } + + private void SendPingMessage() + { + foreach (KeyValuePair item in nodes) + { + var node = item.Key; + var remoteNode = item.Value; + if (Blockchain.Singleton.Height >= remoteNode.LastBlockIndex + && TimeProvider.Current.UtcNow.ToTimestamp() - PingCoolingOffPeriod >= Blockchain.Singleton.GetBlock(Blockchain.Singleton.CurrentBlockHash)?.Timestamp) + { + node.Tell(Message.Create(MessageCommand.Ping, PingPayload.Create(Blockchain.Singleton.Height))); + } + } + } + + protected override void PostStop() + { + timer.CancelIfNotNull(); + base.PostStop(); + } + + public static Props Props(NeoSystem system) + { + return Akka.Actor.Props.Create(() => new SyncManager(system)); + } + } +} diff --git a/src/neo/Network/P2P/TaskManager.cs b/src/neo/Network/P2P/TaskManager.cs index 73fa49441a..a033b957b9 100644 --- a/src/neo/Network/P2P/TaskManager.cs +++ b/src/neo/Network/P2P/TaskManager.cs @@ -14,11 +14,9 @@ namespace Neo.Network.P2P { internal class TaskManager : UntypedActor { - public class Register { public VersionPayload Version; } - public class Update { public uint LastBlockIndex; } + public class Register { public RemoteNode Node; } public class NewTasks { public InvPayload Payload; } public class TaskCompleted { public UInt256 Hash; } - public class HeaderTaskCompleted { } public class RestartTasks { public InvPayload Payload; } private class Timer { } @@ -28,63 +26,41 @@ private class Timer { } private readonly NeoSystem system; private const int MaxConncurrentTasks = 3; - private const int PingCoolingOffPeriod = 60; // in secconds. /// /// A set of known hashes, of inventories or payloads, already received. /// private readonly HashSetCache knownHashes; private readonly Dictionary globalTasks = new Dictionary(); - private readonly Dictionary sessions = new Dictionary(); + private readonly Dictionary nodes = new Dictionary(); private readonly ICancelable timer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimerInterval, TimerInterval, Context.Self, new Timer(), ActorRefs.NoSender); - private readonly UInt256 HeaderTaskHash = UInt256.Zero; - private bool HasHeaderTask => globalTasks.ContainsKey(HeaderTaskHash); - public TaskManager(NeoSystem system) { this.system = system; this.knownHashes = new HashSetCache(Blockchain.Singleton.MemPool.Capacity * 2 / 5); } - private void OnHeaderTaskCompleted() - { - if (!sessions.TryGetValue(Sender, out TaskSession session)) - return; - session.Tasks.Remove(HeaderTaskHash); - DecrementGlobalTask(HeaderTaskHash); - RequestTasks(session); - } - private void OnNewTasks(InvPayload payload) { - if (!sessions.TryGetValue(Sender, out TaskSession session)) + if (!nodes.TryGetValue(Sender, out RemoteNode remoteNode)) return; // Do not accept payload of type InventoryType.TX if not synced on best known HeaderHeight - if (payload.Type == InventoryType.TX && Blockchain.Singleton.Height < Blockchain.Singleton.HeaderHeight) - { - RequestTasks(session); + if (payload.Type == InventoryType.TX && Blockchain.Singleton.Height < nodes.Values.Max(p => p.LastBlockIndex)) return; - } HashSet hashes = new HashSet(payload.Hashes); // Remove all previously processed knownHashes from the list that is being requested hashes.Remove(knownHashes); - // Add to AvailableTasks the ones, of type InventoryType.Block, that are global (already under process by other sessions) - if (payload.Type == InventoryType.Block) - session.AvailableTasks.UnionWith(hashes.Where(p => globalTasks.ContainsKey(p))); // Remove those that are already in process by other sessions hashes.Remove(globalTasks); if (hashes.Count == 0) - { - RequestTasks(session); return; - } // Update globalTasks with the ones that will be requested within this current session foreach (UInt256 hash in hashes) { IncrementGlobalTask(hash); - session.Tasks[hash] = DateTime.UtcNow; + remoteNode.session.InvTasks[hash] = DateTime.UtcNow; } foreach (InvPayload group in InvPayload.CreateGroup(payload.Type, hashes.ToArray())) @@ -96,10 +72,7 @@ protected override void OnReceive(object message) switch (message) { case Register register: - OnRegister(register.Version); - break; - case Update update: - OnUpdate(update.LastBlockIndex); + OnRegister(register.Node); break; case NewTasks tasks: OnNewTasks(tasks.Payload); @@ -107,9 +80,6 @@ protected override void OnReceive(object message) case TaskCompleted completed: OnTaskCompleted(completed.Hash); break; - case HeaderTaskCompleted _: - OnHeaderTaskCompleted(); - break; case RestartTasks restart: OnRestartTasks(restart.Payload); break; @@ -122,19 +92,10 @@ protected override void OnReceive(object message) } } - private void OnRegister(VersionPayload version) + private void OnRegister(RemoteNode node) { Context.Watch(Sender); - TaskSession session = new TaskSession(Sender, version); - sessions.Add(Sender, session); - RequestTasks(session); - } - - private void OnUpdate(uint lastBlockIndex) - { - if (!sessions.TryGetValue(Sender, out TaskSession session)) - return; - session.LastBlockIndex = lastBlockIndex; + nodes.Add(Sender, node); } private void OnRestartTasks(InvPayload payload) @@ -150,13 +111,8 @@ private void OnTaskCompleted(UInt256 hash) { knownHashes.Add(hash); globalTasks.Remove(hash); - foreach (TaskSession ms in sessions.Values) - ms.AvailableTasks.Remove(hash); - if (sessions.TryGetValue(Sender, out TaskSession session)) - { - session.Tasks.Remove(hash); - RequestTasks(session); - } + if (nodes.TryGetValue(Sender, out RemoteNode remoteNode)) + remoteNode.session.InvTasks.Remove(hash); } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -189,24 +145,25 @@ private bool IncrementGlobalTask(UInt256 hash) private void OnTerminated(IActorRef actor) { - if (!sessions.TryGetValue(actor, out TaskSession session)) + if (!nodes.TryGetValue(actor, out RemoteNode remoteNode)) return; - sessions.Remove(actor); - foreach (UInt256 hash in session.Tasks.Keys) + nodes.Remove(actor); + foreach (UInt256 hash in remoteNode.session.InvTasks.Keys) DecrementGlobalTask(hash); } private void OnTimer() { - foreach (TaskSession session in sessions.Values) - foreach (var task in session.Tasks.ToArray()) + foreach (RemoteNode node in nodes.Values) + { + NodeSession session = node.session; + foreach (var task in session.InvTasks.ToArray()) if (DateTime.UtcNow - task.Value > TaskTimeout) { - if (session.Tasks.Remove(task.Key)) + if (session.InvTasks.Remove(task.Key)) DecrementGlobalTask(task.Key); } - foreach (TaskSession session in sessions.Values) - RequestTasks(session); + } } protected override void PostStop() @@ -219,61 +176,6 @@ public static Props Props(NeoSystem system) { return Akka.Actor.Props.Create(() => new TaskManager(system)).WithMailbox("task-manager-mailbox"); } - - private void RequestTasks(TaskSession session) - { - if (session.HasTask) return; - // If there are pending tasks of InventoryType.Block we should process them - if (session.AvailableTasks.Count > 0) - { - session.AvailableTasks.Remove(knownHashes); - // Search any similar hash that is on Singleton's knowledge, which means, on the way or already processed - session.AvailableTasks.RemoveWhere(p => Blockchain.Singleton.ContainsBlock(p)); - HashSet hashes = new HashSet(session.AvailableTasks); - if (hashes.Count > 0) - { - foreach (UInt256 hash in hashes.ToArray()) - { - if (!IncrementGlobalTask(hash)) - hashes.Remove(hash); - } - session.AvailableTasks.Remove(hashes); - foreach (UInt256 hash in hashes) - session.Tasks[hash] = DateTime.UtcNow; - foreach (InvPayload group in InvPayload.CreateGroup(InventoryType.Block, hashes.ToArray())) - session.RemoteNode.Tell(Message.Create(MessageCommand.GetData, group)); - return; - } - } - - // When the number of AvailableTasks is no more than 0, no pending tasks of InventoryType.Block, it should process pending the tasks of headers - // If not HeaderTask pending to be processed it should ask for more Blocks - if ((!HasHeaderTask || globalTasks[HeaderTaskHash] < MaxConncurrentTasks) && Blockchain.Singleton.HeaderHeight < session.LastBlockIndex) - { - session.Tasks[HeaderTaskHash] = DateTime.UtcNow; - IncrementGlobalTask(HeaderTaskHash); - session.RemoteNode.Tell(Message.Create(MessageCommand.GetHeaders, GetBlocksPayload.Create(Blockchain.Singleton.CurrentHeaderHash))); - } - else if (Blockchain.Singleton.Height < session.LastBlockIndex) - { - UInt256 hash = Blockchain.Singleton.CurrentBlockHash; - for (uint i = Blockchain.Singleton.Height + 1; i <= Blockchain.Singleton.HeaderHeight; i++) - { - hash = Blockchain.Singleton.GetBlockHash(i); - if (!globalTasks.ContainsKey(hash)) - { - hash = Blockchain.Singleton.GetBlockHash(i - 1); - break; - } - } - session.RemoteNode.Tell(Message.Create(MessageCommand.GetBlocks, GetBlocksPayload.Create(hash))); - } - else if (Blockchain.Singleton.HeaderHeight >= session.LastBlockIndex - && TimeProvider.Current.UtcNow.ToTimestamp() - PingCoolingOffPeriod >= Blockchain.Singleton.GetBlock(Blockchain.Singleton.CurrentHeaderHash)?.Timestamp) - { - session.RemoteNode.Tell(Message.Create(MessageCommand.Ping, PingPayload.Create(Blockchain.Singleton.Height))); - } - } } internal class TaskManagerMailbox : PriorityMailbox @@ -288,11 +190,10 @@ internal protected override bool IsHighPriority(object message) switch (message) { case TaskManager.Register _: - case TaskManager.Update _: case TaskManager.RestartTasks _: return true; case TaskManager.NewTasks tasks: - if (tasks.Payload.Type == InventoryType.Block || tasks.Payload.Type == InventoryType.Consensus) + if (tasks.Payload.Type == InventoryType.Consensus) return true; return false; default: diff --git a/src/neo/Network/P2P/TaskSession.cs b/src/neo/Network/P2P/TaskSession.cs deleted file mode 100644 index eb0ab631bd..0000000000 --- a/src/neo/Network/P2P/TaskSession.cs +++ /dev/null @@ -1,31 +0,0 @@ -using Akka.Actor; -using Neo.Network.P2P.Capabilities; -using Neo.Network.P2P.Payloads; -using System; -using System.Collections.Generic; -using System.Linq; - -namespace Neo.Network.P2P -{ - internal class TaskSession - { - public readonly IActorRef RemoteNode; - public readonly VersionPayload Version; - public readonly Dictionary Tasks = new Dictionary(); - public readonly HashSet AvailableTasks = new HashSet(); - - public bool HasTask => Tasks.Count > 0; - public uint StartHeight { get; } - public uint LastBlockIndex { get; set; } - - public TaskSession(IActorRef node, VersionPayload version) - { - this.RemoteNode = node; - this.Version = version; - this.StartHeight = version.Capabilities - .OfType() - .FirstOrDefault()?.StartHeight ?? 0; - this.LastBlockIndex = this.StartHeight; - } - } -} diff --git a/tests/neo.UnitTests/Network/P2P/UT_TaskManagerMailbox.cs b/tests/neo.UnitTests/Network/P2P/UT_TaskManagerMailbox.cs index fca280d1eb..5d30e7e42b 100644 --- a/tests/neo.UnitTests/Network/P2P/UT_TaskManagerMailbox.cs +++ b/tests/neo.UnitTests/Network/P2P/UT_TaskManagerMailbox.cs @@ -41,8 +41,7 @@ public void TaskManager_Test_IsHighPriority() uut.IsHighPriority(new TaskManager.NewTasks { Payload = new InvPayload() }).Should().Be(false); // high priority - // -> NewTasks: payload Block or Consensus - uut.IsHighPriority(new TaskManager.NewTasks { Payload = new InvPayload { Type = InventoryType.Block } }).Should().Be(true); + // -> NewTasks: payload Consensus uut.IsHighPriority(new TaskManager.NewTasks { Payload = new InvPayload { Type = InventoryType.Consensus } }).Should().Be(true); // any random object should not have priority