diff --git a/src/neo/Ledger/ParallelQueue.cs b/src/neo/Ledger/ParallelQueue.cs new file mode 100644 index 0000000000..3057265e65 --- /dev/null +++ b/src/neo/Ledger/ParallelQueue.cs @@ -0,0 +1,113 @@ +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; + +namespace Neo.Ledger +{ + public class ParallelQueue + { + private long _isStarted = 0; + private CancellationTokenSource _cancel; + + /// + /// Sorted Queue for oracle tasks + /// + private readonly BlockingCollection _processingQueue; + + /// + /// Number of threads for processing the oracle + /// + private Task[] _tasks; + + /// + /// Processor + /// + private readonly Action Processor; + + /// + /// Is started + /// + public bool IsStarted => Interlocked.Read(ref _isStarted) == 1; + + /// + /// Total entries in the pool. + /// + public int PendingCount => _processingQueue.Count; + + /// + /// Constructor + /// + /// Action + public ParallelQueue(Action action) + { + if (action == null) throw new ArgumentNullException(nameof(action)); + + Processor = action; + _processingQueue = new BlockingCollection(); + } + + /// + /// Start oracle + /// + /// Number of tasks + public void Start(byte numberOfTasks = 4) + { + if (Interlocked.Exchange(ref _isStarted, 1) != 0) return; + + // Create tasks + + _cancel = new CancellationTokenSource(); + _tasks = new Task[numberOfTasks]; + + for (int x = 0; x < _tasks.Length; x++) + { + _tasks[x] = new Task(() => + { + foreach (var item in _processingQueue.GetConsumingEnumerable(_cancel.Token)) + { + Processor.Invoke(item); + } + }, + _cancel.Token); + } + + // Start tasks + + foreach (var task in _tasks) task.Start(); + } + + /// + /// Stop oracle + /// + public void Stop() + { + if (Interlocked.Exchange(ref _isStarted, 0) != 1) return; + + _cancel.Cancel(); + + for (int x = 0; x < _tasks.Length; x++) + { + try { _tasks[x].Wait(); } catch { } + try { _tasks[x].Dispose(); } catch { } + } + + _cancel.Dispose(); + _cancel = null; + _tasks = null; + + // Clean queue + + while (_processingQueue.Count > 0) _processingQueue.TryTake(out _); + } + + /// + /// Enqueue entry + /// + /// Entry + public void Enqueue(T item) + { + _processingQueue.Add(item); + } + } +} diff --git a/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs b/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs index 66e0a02012..c5a22f4b28 100644 --- a/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs +++ b/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs @@ -12,7 +12,6 @@ using System.Collections.ObjectModel; using System.Linq; using System.Net; -using System.Threading.Tasks; namespace Neo.Network.P2P { @@ -27,6 +26,26 @@ protected override UInt256 GetKeyForItem((UInt256, DateTime) item) } } + private class Verificator + { + private readonly NeoSystem _system; + + public Verificator(NeoSystem system) + { + _system = system; + } + + public void VerifyTx(Transaction tx) + { + if (tx.VerifyStateIndependent() == VerifyResult.Succeed) + { + _system.TaskManager.Tell(tx); + _system.Consensus?.Tell(tx); + _system.Blockchain.Tell(tx, ActorRefs.NoSender); + } + } + } + private readonly PendingKnownHashesCollection pendingKnownHashes = new PendingKnownHashesCollection(); private readonly HashSetCache knownHashes = new HashSetCache(Blockchain.Singleton.MemPool.Capacity * 2 / 5); private readonly HashSetCache sentHashes = new HashSetCache(Blockchain.Singleton.MemPool.Capacity * 2 / 5); @@ -111,14 +130,10 @@ private void OnMessage(Message msg) case MessageCommand.Transaction: Transaction tx = (Transaction)msg.Payload; RenewKnownHashes(tx.Hash); - Task.Run(() => + if (msg.Payload.Size <= Transaction.MaxTransactionSize) { - if (msg.Payload.Size <= Transaction.MaxTransactionSize) - { - if (tx.VerifyStateIndependent() == VerifyResult.Succeed) - OnInventoryReceived(tx); - } - }); + VerificationQueue.Enqueue(tx); + } break; case MessageCommand.Verack: case MessageCommand.Version: diff --git a/src/neo/Network/P2P/RemoteNode.cs b/src/neo/Network/P2P/RemoteNode.cs index b1457a8522..02ad8f40b8 100644 --- a/src/neo/Network/P2P/RemoteNode.cs +++ b/src/neo/Network/P2P/RemoteNode.cs @@ -25,6 +25,8 @@ internal class Relay { public IInventory Inventory; } private ByteString msg_buffer = ByteString.Empty; private bool ack = true; + private static ParallelQueue VerificationQueue; + public IPEndPoint Listener => new IPEndPoint(Remote.Address, ListenerTcpPort); public int ListenerTcpPort { get; private set; } = 0; public VersionPayload Version { get; private set; } @@ -36,6 +38,14 @@ public RemoteNode(NeoSystem system, object connection, IPEndPoint remote, IPEndP { this.system = system; LocalNode.Singleton.RemoteNodes.TryAdd(Self, this); + + if (VerificationQueue == null) + { + // Start verification Queue + + VerificationQueue = new ParallelQueue(new Verificator(system).VerifyTx); + VerificationQueue.Start(4); + } } ///