Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parallel Queue #39

Open
wants to merge 2 commits into
base: relocate_transaction_verification
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions src/neo/Ledger/ParallelQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Neo.Ledger
{
public class ParallelQueue<T>
{
private long _isStarted = 0;
private CancellationTokenSource _cancel;

/// <summary>
/// Sorted Queue for oracle tasks
/// </summary>
private readonly BlockingCollection<T> _processingQueue;

/// <summary>
/// Number of threads for processing the oracle
/// </summary>
private Task[] _tasks;

/// <summary>
/// Processor
/// </summary>
private readonly Action<T> Processor;

/// <summary>
/// Is started
/// </summary>
public bool IsStarted => Interlocked.Read(ref _isStarted) == 1;

/// <summary>
/// Total entries in the pool.
/// </summary>
public int PendingCount => _processingQueue.Count;

/// <summary>
/// Constructor
/// </summary>
/// <param name="action">Action</param>
public ParallelQueue(Action<T> action)
{
if (action == null) throw new ArgumentNullException(nameof(action));

Processor = action;
_processingQueue = new BlockingCollection<T>();
}

/// <summary>
/// Start oracle
/// </summary>
/// <param name="numberOfTasks">Number of tasks</param>
public void Start(byte numberOfTasks = 4)
{
if (Interlocked.Exchange(ref _isStarted, 1) != 0) return;

// Create tasks

_cancel = new CancellationTokenSource();
_tasks = new Task[numberOfTasks];

for (int x = 0; x < _tasks.Length; x++)
{
_tasks[x] = new Task(() =>
{
foreach (var item in _processingQueue.GetConsumingEnumerable(_cancel.Token))
{
Processor.Invoke(item);
}
},
_cancel.Token);
}

// Start tasks

foreach (var task in _tasks) task.Start();
}

/// <summary>
/// Stop oracle
/// </summary>
public void Stop()
{
if (Interlocked.Exchange(ref _isStarted, 0) != 1) return;

_cancel.Cancel();

for (int x = 0; x < _tasks.Length; x++)
{
try { _tasks[x].Wait(); } catch { }
try { _tasks[x].Dispose(); } catch { }
}

_cancel.Dispose();
_cancel = null;
_tasks = null;

// Clean queue

while (_processingQueue.Count > 0) _processingQueue.TryTake(out _);
}

/// <summary>
/// Enqueue entry
/// </summary>
/// <param name="item">Entry</param>
public void Enqueue(T item)
{
_processingQueue.Add(item);
}
}
}
31 changes: 23 additions & 8 deletions src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
using System.Collections.ObjectModel;
using System.Linq;
using System.Net;
using System.Threading.Tasks;

namespace Neo.Network.P2P
{
Expand All @@ -27,6 +26,26 @@ protected override UInt256 GetKeyForItem((UInt256, DateTime) item)
}
}

private class Verificator
{
private readonly NeoSystem _system;

public Verificator(NeoSystem system)
{
_system = system;
}

public void VerifyTx(Transaction tx)
{
if (tx.VerifyStateIndependent() == VerifyResult.Succeed)
{
_system.TaskManager.Tell(tx);
_system.Consensus?.Tell(tx);
_system.Blockchain.Tell(tx, ActorRefs.NoSender);
}
}
}

private readonly PendingKnownHashesCollection pendingKnownHashes = new PendingKnownHashesCollection();
private readonly HashSetCache<UInt256> knownHashes = new HashSetCache<UInt256>(Blockchain.Singleton.MemPool.Capacity * 2 / 5);
private readonly HashSetCache<UInt256> sentHashes = new HashSetCache<UInt256>(Blockchain.Singleton.MemPool.Capacity * 2 / 5);
Expand Down Expand Up @@ -111,14 +130,10 @@ private void OnMessage(Message msg)
case MessageCommand.Transaction:
Transaction tx = (Transaction)msg.Payload;
RenewKnownHashes(tx.Hash);
Task.Run(() =>
if (msg.Payload.Size <= Transaction.MaxTransactionSize)
{
if (msg.Payload.Size <= Transaction.MaxTransactionSize)
{
if (tx.VerifyStateIndependent() == VerifyResult.Succeed)
OnInventoryReceived(tx);
}
});
VerificationQueue.Enqueue(tx);
}
break;
case MessageCommand.Verack:
case MessageCommand.Version:
Expand Down
10 changes: 10 additions & 0 deletions src/neo/Network/P2P/RemoteNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ internal class Relay { public IInventory Inventory; }
private ByteString msg_buffer = ByteString.Empty;
private bool ack = true;

private static ParallelQueue<Transaction> VerificationQueue;

public IPEndPoint Listener => new IPEndPoint(Remote.Address, ListenerTcpPort);
public int ListenerTcpPort { get; private set; } = 0;
public VersionPayload Version { get; private set; }
Expand All @@ -36,6 +38,14 @@ public RemoteNode(NeoSystem system, object connection, IPEndPoint remote, IPEndP
{
this.system = system;
LocalNode.Singleton.RemoteNodes.TryAdd(Self, this);

if (VerificationQueue == null)
{
// Start verification Queue

VerificationQueue = new ParallelQueue<Transaction>(new Verificator(system).VerifyTx);
VerificationQueue.Start(4);
}
}

/// <summary>
Expand Down