Skip to content

Commit

Permalink
Adding some comments to P2P classes (neo-project#1212)
Browse files Browse the repository at this point in the history
* Adding random hashes for OnGetDataMessageReceived

* Adding static readonly Random

* Adding some comments to Peers class

* Reverting change on ProtocolHandler

* dotnet format

* Additional comments

* Adding extra comments

* Fixing typo

* Fixing typo

* Fixing typo

* Adding more comments

* adding more comments

* Add some comments of P2P (neo-project#1303)

* add some comments of P2P

* fix

* Minor changes

* Minor changes

* Minor Changes

* Minor changes

* Minor changes

* dotnet format

* Minor changes

* Minor changes

* Additional comments

* Minor changes

* Reverting variable change

* Dotnet format

* Minor changes

* Minor changes

* Minor changes

* Minor changes

* Minor changes
  • Loading branch information
vncoelho authored and Luchuan committed Jan 10, 2020
1 parent 96ffcc4 commit b9d3dc6
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 0 deletions.
8 changes: 8 additions & 0 deletions src/neo/Helper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,21 @@ public static ulong ToTimestampMS(this DateTime time)
return (ulong)(time.ToUniversalTime() - unixEpoch).TotalMilliseconds;
}

/// <summary>
/// Checks if address is IPv4 Maped to IPv6 format, if so, Map to IPv4.
/// Otherwise, return current address.
/// </summary>
internal static IPAddress Unmap(this IPAddress address)
{
if (address.IsIPv4MappedToIPv6)
address = address.MapToIPv4();
return address;
}

/// <summary>
/// Checks if IPEndPoint is IPv4 Maped to IPv6 format, if so, unmap to IPv4.
/// Otherwise, return current endpoint.
/// </summary>
internal static IPEndPoint Unmap(this IPEndPoint endPoint)
{
if (!endPoint.Address.IsIPv4MappedToIPv6)
Expand Down
28 changes: 28 additions & 0 deletions src/neo/Network/P2P/LocalNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,21 @@ public LocalNode(NeoSystem system)
}
}

/// <summary>
/// Packs a MessageCommand to a full Message with an optional ISerializable payload.
/// Forwards it to <see cref="BroadcastMessage(Message message)"/>.
/// </summary>
/// <param name="command">The message command to be packed.</param>
/// <param name="payload">Optional payload to be Serialized along the message.</param>
private void BroadcastMessage(MessageCommand command, ISerializable payload = null)
{
BroadcastMessage(Message.Create(command, payload));
}

/// <summary>
/// Broadcast a message to all connected nodes, namely <see cref="Connections"/>.
/// </summary>
/// <param name="message">The message to be broadcasted.</param>
private void BroadcastMessage(Message message)
{
Connections.Tell(message);
Expand All @@ -87,6 +97,10 @@ private static IPEndPoint GetIPEndpointFromHostPort(string hostNameOrAddress, in
return new IPEndPoint(ipAddress, port);
}

/// <summary>
/// Return an amount of random seeds nodes from the default SeedList file defined on <see cref="ProtocolSettings"/>.
/// </summary>
/// <param name="seedsToTake">Limit of random seed nodes to be obtained, also limited by the available seeds from file.</param>
private static IEnumerable<IPEndPoint> GetIPEndPointsFromSeedList(int seedsToTake)
{
if (seedsToTake > 0)
Expand Down Expand Up @@ -122,6 +136,12 @@ public IEnumerable<IPEndPoint> GetUnconnectedPeers()
return UnconnectedPeers;
}

/// <summary>
/// Override of abstract class that is triggered when <see cref="UnconnectedPeers"/> is empty.
/// Performs a BroadcastMessage with the command `MessageCommand.GetAddr`, which, eventually, tells all known connections.
/// If there are no connected peers it will try with the default, respecting MaxCountFromSeedList limit.
/// </summary>
/// <param name="count">The count of peers required</param>
protected override void NeedMorePeers(int count)
{
count = Math.Max(count, MaxCountFromSeedList);
Expand All @@ -131,6 +151,8 @@ protected override void NeedMorePeers(int count)
}
else
{
// Will call AddPeers with default SeedList set cached on <see cref="ProtocolSettings"/>.
// It will try to add those, sequentially, to the list of currently uncconected ones.
AddPeers(GetIPEndPointsFromSeedList(count));
}
}
Expand All @@ -157,6 +179,12 @@ protected override void OnReceive(object message)
}
}

/// <summary>
/// For Transaction type of IInventory, it will tell Transaction to the actor of Consensus.
/// Otherwise, tell the inventory to the actor of Blockchain.
/// There are, currently, three implementations of IInventory: TX, Block and ConsensusPayload.
/// </summary>
/// <param name="inventory">The inventory to be relayed.</param>
private void OnRelay(IInventory inventory)
{
if (inventory is Transaction transaction)
Expand Down
4 changes: 4 additions & 0 deletions src/neo/Network/P2P/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ public class Message : ISerializable
private const int CompressionMinSize = 128;
private const int CompressionThreshold = 64;

/// <summary>
/// Flags that represents whether a message is compressed.
/// 0 for None, 1 for Compressed.
/// </summary>
public MessageFlags Flags;
public MessageCommand Command;
public ISerializable Payload;
Expand Down
39 changes: 39 additions & 0 deletions src/neo/Network/P2P/Peer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,19 @@ private class WsConnected { public WebSocket Socket; public IPEndPoint Remote; p

private static readonly HashSet<IPAddress> localAddresses = new HashSet<IPAddress>();
private readonly Dictionary<IPAddress, int> ConnectedAddresses = new Dictionary<IPAddress, int>();
/// <summary>
/// A dictionary that stores the connected nodes.
/// </summary>
protected readonly ConcurrentDictionary<IActorRef, IPEndPoint> ConnectedPeers = new ConcurrentDictionary<IActorRef, IPEndPoint>();
/// <summary>
/// An ImmutableHashSet that stores the Peers received: 1) from other nodes or 2) from default file.
/// If the number of desired connections is not enough, first try to connect with the peers from this set.
/// </summary>
protected ImmutableHashSet<IPEndPoint> UnconnectedPeers = ImmutableHashSet<IPEndPoint>.Empty;
/// <summary>
/// When a TCP connection request is sent to a peer, the peer will be added to the ImmutableHashSet.
/// If a Tcp.Connected or a Tcp.CommandFailed (with TCP.Command of type Tcp.Connect) is received, the related peer will be removed.
/// </summary>
protected ImmutableHashSet<IPEndPoint> ConnectingPeers = ImmutableHashSet<IPEndPoint>.Empty;
protected HashSet<IPAddress> TrustedIpAddresses { get; } = new HashSet<IPAddress>();

Expand All @@ -63,10 +74,16 @@ static Peer()
localAddresses.UnionWith(NetworkInterface.GetAllNetworkInterfaces().SelectMany(p => p.GetIPProperties().UnicastAddresses).Select(p => p.Address.Unmap()));
}

/// <summary>
/// Tries to add a set of peers to the immutable ImmutableHashSet of UnconnectedPeers.
/// </summary>
/// <param name="peers">Peers that the method will try to add (union) to (with) UnconnectedPeers.</param>
protected void AddPeers(IEnumerable<IPEndPoint> peers)
{
if (UnconnectedPeers.Count < UnconnectedMax)
{
// Do not select peers to be added that are already on the ConnectedPeers
// If the address is the same, the ListenerTcpPort should be different
peers = peers.Where(p => (p.Port != ListenerTcpPort || !localAddresses.Contains(p.Address)) && !ConnectedPeers.Values.Contains(p));
ImmutableInterlocked.Update(ref UnconnectedPeers, p => p.Union(peers));
}
Expand All @@ -75,9 +92,11 @@ protected void AddPeers(IEnumerable<IPEndPoint> peers)
protected void ConnectToPeer(IPEndPoint endPoint, bool isTrusted = false)
{
endPoint = endPoint.Unmap();
// If the address is the same, the ListenerTcpPort should be different, otherwise, return
if (endPoint.Port == ListenerTcpPort && localAddresses.Contains(endPoint.Address)) return;

if (isTrusted) TrustedIpAddresses.Add(endPoint.Address);
// If connections with the peer greater than or equal to MaxConnectionsPerAddress, return.
if (ConnectedAddresses.TryGetValue(endPoint.Address, out int count) && count >= MaxConnectionsPerAddress)
return;
if (ConnectedPeers.Values.Contains(endPoint)) return;
Expand All @@ -96,6 +115,10 @@ private static bool IsIntranetAddress(IPAddress address)
return (value & 0xff000000) == 0x0a000000 || (value & 0xff000000) == 0x7f000000 || (value & 0xfff00000) == 0xac100000 || (value & 0xffff0000) == 0xc0a80000 || (value & 0xffff0000) == 0xa9fe0000;
}

/// <summary>
/// Abstract method for asking for more peers. Currently triggered when UnconnectedPeers is empty.
/// </summary>
/// <param name="count">Number of peers that are being requested.</param>
protected abstract void NeedMorePeers(int count);

protected override void OnReceive(object message)
Expand Down Expand Up @@ -141,6 +164,7 @@ private void OnStart(ChannelsConfig config)
MaxConnections = config.MaxConnections;
MaxConnectionsPerAddress = config.MaxConnectionsPerAddress;

// schedule time to trigger `OnTimer` event every TimerMillisecondsInterval ms
timer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(0, 5000, Context.Self, new Timer(), ActorRefs.NoSender);
if ((ListenerTcpPort > 0 || ListenerWsPort > 0)
&& localAddresses.All(p => !p.IsIPv4MappedToIPv6 || IsIntranetAddress(p))
Expand Down Expand Up @@ -174,6 +198,13 @@ private void OnStart(ChannelsConfig config)
}
}

/// <summary>
/// Will be triggered when a Tcp.Connected message is received.
/// If the conditions are met, the remote endpoint will be added to ConnectedPeers.
/// Increase the connection number with the remote endpoint by one.
/// </summary>
/// <param name="remote">The remote endpoint of TCP connection.</param>
/// <param name="local">The local endpoint of TCP connection.</param>
private void OnTcpConnected(IPEndPoint remote, IPEndPoint local)
{
ImmutableInterlocked.Update(ref ConnectingPeers, p => p.Remove(remote));
Expand All @@ -198,6 +229,11 @@ private void OnTcpConnected(IPEndPoint remote, IPEndPoint local)
}
}

/// <summary>
/// Will be triggered when a Tcp.CommandFailed message is received.
/// If it's a Tcp.Connect command, remove the related endpoint from ConnectingPeers.
/// </summary>
/// <param name="cmd">Tcp.Command message/event.</param>
private void OnTcpCommandFailed(Tcp.Command cmd)
{
switch (cmd)
Expand All @@ -223,7 +259,10 @@ private void OnTerminated(IActorRef actorRef)

private void OnTimer()
{
// Check if the number of desired connections is already enough
if (ConnectedPeers.Count >= MinDesiredConnections) return;

// If there aren't available UnconnectedPeers, it triggers an abstract implementation of NeedMorePeers
if (UnconnectedPeers.Count == 0)
NeedMorePeers(MinDesiredConnections - ConnectedPeers.Count);
IPEndPoint[] endpoints = UnconnectedPeers.Take(MinDesiredConnections - ConnectedPeers.Count).ToArray();
Expand Down
24 changes: 24 additions & 0 deletions src/neo/Network/P2P/ProtocolHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ private void OnFilterLoadMessageReceived(FilterLoadPayload payload)
Context.Parent.Tell(new SetFilter { Filter = bloom_filter });
}

/// <summary>
/// Will be triggered when a MessageCommand.GetAddr message is received.
/// Randomly select nodes from the local RemoteNodes and tells to RemoteNode actors a MessageCommand.Addr message.
/// The message contains a list of networkAddresses from those selected random peers.
/// </summary>
private void OnGetAddrMessageReceived()
{
Random rand = new Random();
Expand All @@ -187,9 +192,16 @@ private void OnGetAddrMessageReceived()
Context.Parent.Tell(Message.Create(MessageCommand.Addr, AddrPayload.Create(networkAddresses)));
}

/// <summary>
/// Will be triggered when a MessageCommand.GetBlocks message is received.
/// Tell the specified number of blocks' hashes starting with the requested HashStart until payload.Count or MaxHashesCount
/// Responses are sent to RemoteNode actor as MessageCommand.Inv Message.
/// </summary>
/// <param name="payload">A GetBlocksPayload including start block Hash and number of blocks requested.</param>
private void OnGetBlocksMessageReceived(GetBlocksPayload payload)
{
UInt256 hash = payload.HashStart;
// The default value of payload.Count is -1
int count = payload.Count < 0 || payload.Count > InvPayload.MaxHashesCount ? InvPayload.MaxHashesCount : payload.Count;
TrimmedBlock state = Blockchain.Singleton.View.Blocks.TryGet(hash);
if (state == null) return;
Expand Down Expand Up @@ -227,6 +239,12 @@ private void OnGetBlockDataMessageReceived(GetBlockDataPayload payload)
}
}

/// <summary>
/// Will be triggered when a MessageCommand.GetData message is received.
/// The payload includes an array of hash values.
/// For different payload.Type (Tx, Block, Consensus), get the corresponding (Txs, Blocks, Consensus) and tell them to RemoteNode actor.
/// </summary>
/// <param name="payload">The payload containing the requested information.</param>
private void OnGetDataMessageReceived(InvPayload payload)
{
UInt256[] hashes = payload.Hashes.Where(p => sentHashes.Add(p)).ToArray();
Expand Down Expand Up @@ -262,6 +280,12 @@ private void OnGetDataMessageReceived(InvPayload payload)
}
}

/// <summary>
/// Will be triggered when a MessageCommand.GetHeaders message is received.
/// Tell the specified number of blocks' headers starting with the requested HashStart to RemoteNode actor.
/// A limit set by HeadersPayload.MaxHeadersCount is also applied to the number of requested Headers, namely payload.Count.
/// </summary>
/// <param name="payload">A GetBlocksPayload including start block Hash and number of blocks' headers requested.</param>
private void OnGetHeadersMessageReceived(GetBlocksPayload payload)
{
UInt256 hash = payload.HashStart;
Expand Down
10 changes: 10 additions & 0 deletions src/neo/Network/P2P/RemoteNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ public RemoteNode(NeoSystem system, object connection, IPEndPoint remote, IPEndP
SendMessage(Message.Create(MessageCommand.Version, VersionPayload.Create(LocalNode.Nonce, LocalNode.UserAgent, capabilities.ToArray())));
}

/// <summary>
/// It defines the message queue to be used for dequeuing.
/// If the high-priority message queue is not empty, choose the high-priority message queue.
/// Otherwise, choose the low-priority message queue.
/// Finally, it sends the first message of the queue.
/// </summary>
private void CheckMessageQueue()
{
if (!verack || !ack) return;
Expand All @@ -67,6 +73,10 @@ private void EnqueueMessage(MessageCommand command, ISerializable payload = null
EnqueueMessage(Message.Create(command, payload));
}

/// <summary>
/// Add message to high priority queue or low priority queue depending on the message type.
/// </summary>
/// <param name="message">The message to be added.</param>
private void EnqueueMessage(Message message)
{
bool is_single = false;
Expand Down
14 changes: 14 additions & 0 deletions src/neo/Network/P2P/TaskManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ private class Timer { }

private readonly NeoSystem system;
private const int MaxConncurrentTasks = 3;

private const int PingCoolingOffPeriod = 60; // in secconds.
/// <summary>
/// A set of known hashes, of inventories or payloads, already received.
/// </summary>
private readonly FIFOSet<UInt256> knownHashes;
private readonly Dictionary<UInt256, int> globalTasks = new Dictionary<UInt256, int>();
private readonly Dictionary<IActorRef, TaskSession> sessions = new Dictionary<IActorRef, TaskSession>();
Expand Down Expand Up @@ -55,23 +59,28 @@ private void OnNewTasks(InvPayload payload)
{
if (!sessions.TryGetValue(Sender, out TaskSession session))
return;
// Do not accept payload of type InventoryType.TX if not synced on best known HeaderHeight
if (payload.Type == InventoryType.TX && Blockchain.Singleton.Height < Blockchain.Singleton.HeaderHeight)
{
RequestTasks(session);
return;
}
HashSet<UInt256> hashes = new HashSet<UInt256>(payload.Hashes);
// Remove all previously processed knownHashes from the list that is being requested
hashes.Remove(knownHashes);
// Add to AvailableTasks the ones, of type InventoryType.Block, that are global (already under process by other sessions)
if (payload.Type == InventoryType.Block)
session.AvailableTasks.UnionWith(hashes.Where(p => globalTasks.ContainsKey(p)));

// Remove those that are already in process by other sessions
hashes.Remove(globalTasks);
if (hashes.Count == 0)
{
RequestTasks(session);
return;
}

// Update globalTasks with the ones that will be requested within this current session
foreach (UInt256 hash in hashes)
{
IncrementGlobalTask(hash);
Expand Down Expand Up @@ -214,9 +223,11 @@ public static Props Props(NeoSystem system)
private void RequestTasks(TaskSession session)
{
if (session.HasTask) return;
// If there are pending tasks of InventoryType.Block we should process them
if (session.AvailableTasks.Count > 0)
{
session.AvailableTasks.Remove(knownHashes);
// Search any similar hash that is on Singleton's knowledge, which means, on the way or already processed
session.AvailableTasks.RemoveWhere(p => Blockchain.Singleton.ContainsBlock(p));
HashSet<UInt256> hashes = new HashSet<UInt256>(session.AvailableTasks);
if (hashes.Count > 0)
Expand All @@ -234,6 +245,9 @@ private void RequestTasks(TaskSession session)
return;
}
}

// When the number of AvailableTasks is no more than 0, no pending tasks of InventoryType.Block, it should process pending the tasks of headers
// If not HeaderTask pending to be processed it should ask for more Blocks
if ((!HasHeaderTask || globalTasks[HeaderTaskHash] < MaxConncurrentTasks) && Blockchain.Singleton.HeaderHeight < session.LastBlockIndex)
{
session.Tasks[HeaderTaskHash] = DateTime.UtcNow;
Expand Down

0 comments on commit b9d3dc6

Please sign in to comment.