From fde4b229b0fff44188c42b6207521baed62a408f Mon Sep 17 00:00:00 2001 From: pingpongsneak Date: Wed, 26 Oct 2022 22:16:28 +0100 Subject: [PATCH 1/7] refactor: peer discovery survey protocol --- core/Controllers/MembershipController.cs | 2 + core/Cryptography/Crypto.cs | 44 +++- core/Extensions/AppExtensions.cs | 5 +- core/Models/LocalNode.cs | 1 + core/Models/Messages/ProtocolCommand.cs | 1 - core/Models/NetworkSetting.cs | 2 +- core/Models/Peer.cs | 14 +- core/Network/P2PDevice.cs | 9 - core/Network/P2PDeviceApi.cs | 21 +- core/Network/PeerDiscovery.cs | 305 ++++++++++++++--------- node/Configuration/Utility.cs | 43 +++- 11 files changed, 278 insertions(+), 169 deletions(-) diff --git a/core/Controllers/MembershipController.cs b/core/Controllers/MembershipController.cs index dfa26668..131d741b 100644 --- a/core/Controllers/MembershipController.cs +++ b/core/Controllers/MembershipController.cs @@ -48,6 +48,7 @@ public async Task GetPeerAsync() HttpsPort = peer.HttpsPort.FromBytes(), TcpPort = peer.TcpPort.FromBytes(), WsPort = peer.WsPort.FromBytes(), + DsPort = peer.DsPort.FromBytes(), Name = peer.Name.FromBytes(), PublicKey = peer.PublicKey.ByteToHex(), Version = peer.Version.FromBytes() @@ -81,6 +82,7 @@ public async Task GetPeersAsync() HttpsPort = peer.HttpsPort.FromBytes(), TcpPort = peer.TcpPort.FromBytes(), WsPort = peer.WsPort.FromBytes(), + DsPort = peer.DsPort.FromBytes(), Name = peer.Name.FromBytes(), PublicKey = peer.PublicKey.ByteToHex(), Version = peer.Version.FromBytes() diff --git a/core/Cryptography/Crypto.cs b/core/Cryptography/Crypto.cs index d7de556a..b2839739 100644 --- a/core/Cryptography/Crypto.cs +++ b/core/Cryptography/Crypto.cs @@ -22,8 +22,9 @@ public interface ICrypto Task GetOrUpsertKeyNameAsync(string keyName); Task GetPublicKeyAsync(string keyName); Task SignAsync(string keyName, byte[] message); + byte[] Sign(byte[] privateKey, byte[] message); bool VerifySignature(byte[] signature, byte[] message); - bool VerifySignature(VerifySignatureManualRequest verifySignatureManualRequest); + bool VerifySignature(byte[] publicKey, byte[] message, byte[] signature); byte[] GetCalculateVrfSignature(ECPrivateKey ecPrivateKey, byte[] msg); byte[] GetVerifyVrfSignature(ECPublicKey ecPublicKey, byte[] msg, byte[] sig); // byte[] EncryptChaCha20Poly1305(byte[] data, byte[] key, byte[] associatedData, out byte[] tag, out byte[] nonce); @@ -131,6 +132,28 @@ public async Task SignAsync(string keyName, byte[] message) return signatureResponse; } + /// + /// + /// + /// + /// + /// + public byte[] Sign(byte[] privateKey, byte[] message) + { + Guard.Argument(privateKey, nameof(privateKey)).NotNull(); + Guard.Argument(message, nameof(message)).NotNull(); + try + { + return Curve.calculateSignature(Curve.decodePrivatePoint(privateKey), message); + } + catch (Exception ex) + { + _logger.Here().Error(ex, "Unable to sign the message"); + } + + return null; + } + /// /// /// @@ -155,22 +178,21 @@ public bool VerifySignature(byte[] signature, byte[] message) } /// + /// /// - /// + /// + /// + /// /// - public bool VerifySignature(VerifySignatureManualRequest verifySignatureManualRequest) + public bool VerifySignature(byte[] publicKey, byte[] message, byte[] signature) { - Guard.Argument(verifySignatureManualRequest, nameof(verifySignatureManualRequest)).NotNull(); - Guard.Argument(verifySignatureManualRequest.Signature, nameof(verifySignatureManualRequest.Signature)) - .NotNull(); - Guard.Argument(verifySignatureManualRequest.PublicKey, nameof(verifySignatureManualRequest.PublicKey)) - .NotNull(); - Guard.Argument(verifySignatureManualRequest.Message, nameof(verifySignatureManualRequest.Message)).NotNull(); + Guard.Argument(publicKey, nameof(publicKey)).NotNull(); + Guard.Argument(message, nameof(message)).NotNull(); + Guard.Argument(signature, nameof(signature)).NotNull(); var verified = false; try { - verified = Curve.verifySignature(Curve.decodePoint(verifySignatureManualRequest.PublicKey, 0), - verifySignatureManualRequest.Message, verifySignatureManualRequest.Signature); + verified = Curve.verifySignature(Curve.decodePoint(publicKey, 0), message, signature); } catch (Exception ex) { diff --git a/core/Extensions/AppExtensions.cs b/core/Extensions/AppExtensions.cs index 7880aadc..9328eb0b 100644 --- a/core/Extensions/AppExtensions.cs +++ b/core/Extensions/AppExtensions.cs @@ -47,7 +47,6 @@ public static ContainerBuilder AddCypherSystemCore(this ContainerBuilder builder builder.Register(c => { var remoteNodes = configuration.GetSection("Node:Network:SeedList").GetChildren().ToArray(); - var remotePublicKeys = configuration.GetSection("Node:Network:SeedListPublicKeys").GetChildren().ToArray(); var node = new Node { EndPoint = new IPEndPoint(Util.GetIpAddress(), 0), @@ -67,11 +66,11 @@ public static ContainerBuilder AddCypherSystemCore(this ContainerBuilder builder P2P = new P2P { + DsPort = Convert.ToInt32(configuration["Node:Network:P2P:DsPort"]), TcpPort = Convert.ToInt32(configuration["Node:Network:P2P:TcpPort"]), WsPort = Convert.ToInt32(configuration["Node:Network:P2P:WsPort"]) }, SeedList = new List(remoteNodes.Length), - SeedListPublicKeys = new List(remoteNodes.Length), X509Certificate = new Models.X509Certificate { @@ -106,9 +105,7 @@ public static ContainerBuilder AddCypherSystemCore(this ContainerBuilder builder { var endpoint = Util.GetIpEndPoint(selection.item.Value); var endpointFromHost = Util.GetIpEndpointFromHostPort(endpoint.Address.ToString(), endpoint.Port); - var publicKey = remotePublicKeys[selection.index].Value; node.Network.SeedList.Add($"{endpointFromHost.Address.ToString()}:{endpointFromHost.Port}"); - node.Network.SeedListPublicKeys.Add(publicKey); } var cypherSystemCore = new CypherSystemCore(c.Resolve(), diff --git a/core/Models/LocalNode.cs b/core/Models/LocalNode.cs index 46a6bf4d..e13356ad 100644 --- a/core/Models/LocalNode.cs +++ b/core/Models/LocalNode.cs @@ -16,6 +16,7 @@ public record LocalNode public byte[] Name { get; init; } public byte[] TcpPort { get; init; } public byte[] WsPort { get; init; } + public byte[] DsPort { get; init; } public byte[] HttpPort { get; init; } public byte[] HttpsPort { get; init; } public byte[] IpAddress { get; init; } diff --git a/core/Models/Messages/ProtocolCommand.cs b/core/Models/Messages/ProtocolCommand.cs index de3d76c3..d682e4b4 100644 --- a/core/Models/Messages/ProtocolCommand.cs +++ b/core/Models/Messages/ProtocolCommand.cs @@ -9,7 +9,6 @@ public enum ProtocolCommand : byte Version = 0x01, GetPeer = 0x10, GetPeers = 0x11, - UpdatePeers = 0x12, GetBlocks = 0x14, SaveBlock = 0x15, GetBlockHeight = 0x17, diff --git a/core/Models/NetworkSetting.cs b/core/Models/NetworkSetting.cs index 21068bea..b28a07d5 100644 --- a/core/Models/NetworkSetting.cs +++ b/core/Models/NetworkSetting.cs @@ -37,12 +37,12 @@ public record Network public int HttpsPort { get; set; } public P2P P2P { get; set; } public IList SeedList { get; set; } - public IList SeedListPublicKeys { get; set; } public string CertificateMode { get; set; } } public record P2P { + public int DsPort { get; set; } public int TcpPort { get; set; } public int WsPort { get; set; } } diff --git a/core/Models/Peer.cs b/core/Models/Peer.cs index 47263bfb..21048635 100644 --- a/core/Models/Peer.cs +++ b/core/Models/Peer.cs @@ -17,14 +17,16 @@ public struct Peer : IComparable [Key(3)] public ulong BlockCount { get; set; } [Key(4)] public ulong ClientId { get; init; } [Key(5)] public byte[] TcpPort { get; set; } - [Key(6)] public byte[] WsPort { get; set; } - [Key(7)] public byte[] Name { get; set; } - [Key(8)] public byte[] PublicKey { get; set; } - [Key(9)] public byte[] Version { get; set; } + [Key(6)] public byte[] DsPort { get; set; } + [Key(7)] public byte[] WsPort { get; set; } + [Key(8)] public byte[] Name { get; set; } + [Key(9)] public byte[] PublicKey { get; set; } + [Key(10)] public byte[] Signature { get; set; } + [Key(11)] public byte[] Version { get; set; } + [Key(12)] public long Timestamp { get; set; } [IgnoreMember] public int Retries { get; set; } - /// - /// + /// s /// /// public int CompareTo(Peer other) diff --git a/core/Network/P2PDevice.cs b/core/Network/P2PDevice.cs index 3bacfc90..131e749e 100644 --- a/core/Network/P2PDevice.cs +++ b/core/Network/P2PDevice.cs @@ -189,15 +189,6 @@ private async Task WorkerAsync(IRepReqAsyncContext ctx) { try { - if (unwrapMessage.ProtocolCommand == ProtocolCommand.UpdatePeers) - { - unwrapMessage.Parameters[0].Sender = message.PublicKey; - _ = await _cypherSystemCore.P2PDeviceApi().Commands[(int)unwrapMessage.ProtocolCommand]( - unwrapMessage.Parameters); - await EmptyReplyAsync(ctx); - return; - } - var newMsg = NngFactorySingleton.Instance.Factory.CreateMessage(); var readOnlySequence = await _cypherSystemCore.P2PDeviceApi().Commands[(int)unwrapMessage.ProtocolCommand]( diff --git a/core/Network/P2PDeviceApi.cs b/core/Network/P2PDeviceApi.cs index cbfb37ad..a47e56ac 100644 --- a/core/Network/P2PDeviceApi.cs +++ b/core/Network/P2PDeviceApi.cs @@ -64,7 +64,6 @@ private void RegisterCommand() { Commands.Add((int)ProtocolCommand.GetPeer, OnGetPeerAsync); Commands.Add((int)ProtocolCommand.GetPeers, OnGetPeersAsync); - Commands.Add((int)ProtocolCommand.UpdatePeers, OnUpdatePeersAsync); Commands.Add((int)ProtocolCommand.GetBlocks, OnGetBlocksAsync); Commands.Add((int)ProtocolCommand.SaveBlock, OnSaveBlockAsync); Commands.Add((int)ProtocolCommand.GetBlockHeight, OnGetBlockHeightAsync); @@ -85,8 +84,8 @@ private void RegisterCommand() /// private async Task> OnGetPeerAsync(Parameter[] none = default) { - var nodeDetailsResponse = _cypherSystemCore.PeerDiscovery().GetLocalPeer(); - return await SerializeAsync(nodeDetailsResponse); + var localPeerResponse = _cypherSystemCore.PeerDiscovery().GetLocalPeer(); + return await SerializeAsync(localPeerResponse); } /// @@ -94,21 +93,9 @@ private async Task> OnGetPeerAsync(Parameter[] none = def /// /// /// - private Task> OnGetPeersAsync(Parameter[] none = default) + private async Task> OnGetPeersAsync(Parameter[] none = default) { - var nodePeersResponse = _cypherSystemCore.PeerDiscovery().GetPeers(); - return Task.FromResult(nodePeersResponse); - } - - /// - /// - /// - /// - /// - private async Task> OnUpdatePeersAsync(Parameter[] parameters) - { - await _cypherSystemCore.PeerDiscovery().ReceivedPeersAsync(parameters[0].Value, parameters[0].Sender); - return _updatePeersResponse; + return await SerializeAsync(_cypherSystemCore.PeerDiscovery()); } /// diff --git a/core/Network/PeerDiscovery.cs b/core/Network/PeerDiscovery.cs index 0b5b915e..4412b0aa 100644 --- a/core/Network/PeerDiscovery.cs +++ b/core/Network/PeerDiscovery.cs @@ -2,7 +2,6 @@ // To view a copy of this license, visit https://creativecommons.org/licenses/by-nc-nd/4.0 using System; -using System.Buffers; using System.Collections.Generic; using System.Linq; using System.Net; @@ -12,14 +11,14 @@ using CypherNetwork.Extensions; using CypherNetwork.Helper; using CypherNetwork.Models; -using CypherNetwork.Models.Messages; using CypherNetwork.Persistence; using MessagePack; -using Microsoft.IO; using Microsoft.Toolkit.HighPerformance; using NBitcoin; using Nerdbank.Streams; using Serilog; +using nng; +using nng.Native; namespace CypherNetwork.Network; @@ -55,40 +54,28 @@ public interface IPeerDiscovery /// void TryBootstrap(); - /// - /// - /// - /// void SetPeerCooldown(PeerCooldown peer); - - /// - /// - /// - /// - /// - /// - Task ReceivedPeersAsync(ReadOnlyMemory msg, ReadOnlyMemory publicKey); - /// - /// - /// - /// - ReadOnlySequence GetPeers(); } /// /// public sealed class PeerDiscovery : IDisposable, IPeerDiscovery { + private const int PrunedTimeoutFromSeconds = 30; + private const int SurveyorWaitTimeMilliseconds = 2500; + private const int ReceiveWaitTimeMilliseconds = 1000; private readonly Caching _caching = new(); private readonly Caching _peerCooldownCaching = new(); private readonly ICypherSystemCore _cypherSystemCore; private readonly ILogger _logger; - - private LocalNode _localNode; + private IDisposable _discoverDisposable; private IDisposable _receiverDisposable; private IDisposable _coolDownDisposable; + private LocalNode _localNode; private Peer _localPeer; private RemoteNode[] _seedNodes; + private ISurveyorSocket _socket; + private ISurveyorAsyncContext _ctx; private bool _disposed; private static readonly object LockOnReady = new(); @@ -96,11 +83,11 @@ public sealed class PeerDiscovery : IDisposable, IPeerDiscovery /// /// - /// + /// /// - public PeerDiscovery(ICypherSystemCore cypherSystemCore, ILogger logger) + public PeerDiscovery(ICypherSystemCore cypherNetworkCore, ILogger logger) { - _cypherSystemCore = cypherSystemCore; + _cypherSystemCore = cypherNetworkCore; _logger = logger; Init(); } @@ -136,12 +123,18 @@ public Peer GetLocalPeer() return _localPeer; } + /// + /// + /// /// /// /// private void UpdateLocalPeerInfo() { _localPeer.BlockCount = _cypherSystemCore.UnitOfWork().HashChainRepository.Count; + _localPeer.Timestamp = Util.GetAdjustedTimeAsUnixTimestamp(); + _localPeer.Signature = _cypherSystemCore.Crypto().Sign( + _cypherSystemCore.KeyPair.PrivateKey.FromSecureString().HexToByte(), _localPeer.Timestamp.ToBytes()); } /// @@ -152,20 +145,6 @@ public LocalNode GetLocalNode() return _localNode; } - /// - /// - /// - /// - public ReadOnlySequence GetPeers() - { - var sequence = new Sequence(); - UpdateLocalPeerInfo(); - IList discoveryStore = _caching.GetItems().ToList(); - discoveryStore.Add(_localPeer); - ReadOnlyPeerSequence(ref discoveryStore, ref sequence); - return sequence.AsReadOnlySequence; - } - /// /// /// @@ -198,6 +177,7 @@ private void Init() Identifier = _cypherSystemCore.KeyPair.PublicKey.ToHashIdentifier(), TcpPort = _cypherSystemCore.Node.Network.P2P.TcpPort.ToBytes(), WsPort = _cypherSystemCore.Node.Network.P2P.WsPort.ToBytes(), + DsPort = _cypherSystemCore.Node.Network.P2P.DsPort.ToBytes(), HttpPort = _cypherSystemCore.Node.Network.HttpPort.ToBytes(), HttpsPort = _cypherSystemCore.Node.Network.HttpsPort.ToBytes(), Name = _cypherSystemCore.Node.Name.ToBytes(), @@ -212,6 +192,7 @@ private void Init() ClientId = _localNode.PublicKey.ToHashIdentifier(), TcpPort = _localNode.TcpPort, WsPort = _localNode.WsPort, + DsPort = _localNode.DsPort, Name = _localNode.Name, PublicKey = _localNode.PublicKey, Version = _localNode.Version @@ -220,14 +201,62 @@ private void Init() foreach (var seedNode in _cypherSystemCore.Node.Network.SeedList.WithIndex()) { var endpoint = Util.GetIpEndPoint(seedNode.item); - _seedNodes[seedNode.index] = new RemoteNode(endpoint.Address.ToString().ToBytes(), endpoint.Port.ToBytes(), - _cypherSystemCore.Node.Network.SeedListPublicKeys[seedNode.index].HexToByte()); + _seedNodes[seedNode.index] = new RemoteNode(endpoint.Address.ToString().ToBytes(), endpoint.Port.ToBytes(), null); } - + DiscoverAsync().ConfigureAwait(false); ReceiverAsync().ConfigureAwait(false); HandlePeerCooldown(); } + /// + /// + /// + private Task DiscoverAsync() + { + Util.ThrowPortNotFree(_cypherSystemCore.Node.Network.P2P.DsPort); + var ipEndPoint = new IPEndPoint(_cypherSystemCore.Node.EndPoint.Address, + _cypherSystemCore.Node.Network.P2P.DsPort); + _socket = NngFactorySingleton.Instance.Factory.SurveyorOpen() + .ThenListen($"tcp://{ipEndPoint.Address}:{ipEndPoint.Port}", Defines.NngFlag.NNG_FLAG_NONBLOCK).Unwrap(); + _socket.SetOpt(Defines.NNG_OPT_RECVMAXSZ, 5000000); + _ctx = _socket.CreateAsyncContext(NngFactorySingleton.Instance.Factory).Unwrap(); + _ctx.Ctx.SetOpt(Defines.NNG_OPT_SURVEYOR_SURVEYTIME, + new nng_duration { TimeMs = SurveyorWaitTimeMilliseconds }); + _discoverDisposable = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(1000)).Subscribe(_ => + { + if (_cypherSystemCore.ApplicationLifetime.ApplicationStopping.IsCancellationRequested) return; + StartWorkerAsync(_ctx).Wait(); + }); + + return Task.CompletedTask; + } + + /// + /// + /// + /// + private async Task StartWorkerAsync(IReceiveAsyncContext ctx) + { + INngMsg nngMsg = default; + try + { + var msg = NngFactorySingleton.Instance.Factory.CreateMessage(); + (await _ctx.Send(msg)).Unwrap(); + var nngResult = await ctx.Receive(CancellationToken.None); + if (!nngResult.IsOk()) return; + nngMsg = nngResult.Unwrap(); + await ReceivedPeersAsync(nngMsg); + } + catch (Exception ex) + { + _logger.Here().Error("{@Message}", ex.Message); + } + finally + { + nngMsg?.Dispose(); + } + } + /// /// /// @@ -258,33 +287,46 @@ private Task ReceiverAsync() private async Task BootstrapSeedsAsync() { var tasks = new List(); - UpdateLocalPeerInfo(); - IList discoveryStore = new List { _localPeer }; - var parameter = new Parameter[] - { - new() { ProtocolCommand = ProtocolCommand.UpdatePeers, Value = MessagePackSerializer.Serialize(discoveryStore) } - }; - var msg = MessagePackSerializer.Serialize(parameter); - for (var index = 0; index < _seedNodes.Length; index++) + var sequence = new Sequence(); + try { - var i = index; - tasks.Add(Task.Run(async () => + UpdateLocalPeerInfo(); + IList discoveryStore = new List { _localPeer }; + ReadOnlyPeerSequence(ref discoveryStore, ref sequence); + for (var index = 0; index < _seedNodes.Length; index++) { - var seedNode = _seedNodes[i]; - try + var i = index; + tasks.Add(Task.Run(async () => { - var _ = await _cypherSystemCore.P2PDeviceReq().SendAsync(seedNode.IpAddress, - seedNode.TcpPort, seedNode.PublicKey, - msg); - } - catch (Exception ex) - { - _logger.Here().Error("{@Message}", ex.Message); - } - })); - } + var seedNode = _seedNodes[i]; + var nngMsg = NngFactorySingleton.Instance.Factory.CreateMessage(); + try + { + await BroadcastAsync(seedNode.IpAddress, seedNode.TcpPort, sequence, nngMsg); + } + catch (NngException ex) + { + if (ex.Error == Defines.NngErrno.ECONNREFUSED) return; + _logger.Here().Error("{@Message}", ex.Message); + } + catch (Exception ex) + { + _logger.Here().Error("{@Message}", ex.Message); + } + finally + { + nngMsg.Dispose(); + } + })); + } - await Task.WhenAll(tasks).ConfigureAwait(false); + await Task.WhenAll(tasks).ConfigureAwait(false); + } + finally + { + sequence.Reset(); + sequence.Dispose(); + } } /// @@ -309,31 +351,34 @@ private static void ReadOnlyPeerSequence(ref IList peers, ref Sequence private async Task OnReadyAsync() { - IList discoveryStore = _caching.GetItems().ToList(); - UpdateLocalPeerInfo(); - discoveryStore.Add(_localPeer); - var parameter = new Parameter[] - { - new() { ProtocolCommand = ProtocolCommand.UpdatePeers, Value = MessagePackSerializer.Serialize(discoveryStore) } - }; - var msg = MessagePackSerializer.Serialize(parameter); - for (var index = 0; index < discoveryStore.Count; index++) + var sequence = new Sequence(); + try { - var peer = discoveryStore[index]; - if (peer.ClientId == _localPeer.ClientId) continue; - var storePeer = peer; - try + IList discoveryStore = _caching.GetItems().ToList(); + UpdateLocalPeerInfo(); + discoveryStore.Add(_localPeer); + ReadOnlyPeerSequence(ref discoveryStore, ref sequence); + for (var index = 0; index < discoveryStore.Count; index++) { - if (await _cypherSystemCore.P2PDeviceReq().SendAsync(storePeer.IpAddress, storePeer.TcpPort, - storePeer.PublicKey, msg) is not null) + var peer = discoveryStore[index]; + if (peer.ClientId == _localPeer.ClientId) continue; + var nngMsg = NngFactorySingleton.Instance.Factory.CreateMessage(); + try { - if (storePeer.Retries == 0) continue; - storePeer.Retries = 0; - UpdatePeer(storePeer.ClientId, storePeer.IpAddress, storePeer); + await BroadcastAsync(peer.IpAddress, peer.DsPort, sequence, nngMsg); } - else + catch (NngException ex) { - if (storePeer.Retries >= 30) + if (ex.Error == Defines.NngErrno.ECONNREFUSED) return; + _logger.Here().Error("{@Message}", ex.Message); + } + catch (Exception ex) + { + _logger.Here().Error("{@Message}", ex.Message); + } + finally + { + if (peer.Timestamp < Util.GetUtcNow().AddSeconds(-30).ToUnixTimestamp()) { SetPeerCooldown(new PeerCooldown { @@ -344,58 +389,86 @@ private async Task OnReadyAsync() }); _caching.Remove(GetKey(peer.ClientId, peer.IpAddress)); } - else - { - storePeer.Retries++; - UpdatePeer(storePeer.ClientId, storePeer.IpAddress, storePeer); - } + + nngMsg.Dispose(); } } - catch (Exception ex) - { - _logger.Here().Error("{@Message}", ex.Message); - } } + finally + { + sequence.Reset(); + sequence.Dispose(); + } + } + + /// + /// + /// + /// + /// + /// + /// + private static async Task BroadcastAsync(byte[] ipAddress, byte[] dsPort, Sequence sequence, INngMsg nngMsg) + { + var address = string.Create(ipAddress.Length, ipAddress.AsMemory(), (chars, state) => + { + Span address = System.Text.Encoding.UTF8.GetString(state.Span).ToCharArray(); + address.CopyTo(chars); + }); + var port = string.Create(dsPort.Length, dsPort.AsMemory(), (chars, state) => + { + Span port = System.Text.Encoding.UTF8.GetString(state.Span).ToCharArray(); + port.CopyTo(chars); + }); + using var socket = NngFactorySingleton.Instance.Factory.RespondentOpen() + .ThenDial($"tcp://{address}:{port}", Defines.NngFlag.NNG_FLAG_NONBLOCK).Unwrap(); + using var ctx = socket.CreateAsyncContext(NngFactorySingleton.Instance.Factory).Unwrap(); + ctx.Ctx.SetOpt(Defines.NNG_OPT_RECVTIMEO, + new nng_duration { TimeMs = ReceiveWaitTimeMilliseconds }); + var nngResult = await ctx.Receive(CancellationToken.None); + if (!nngResult.IsOk()) return; + foreach (var memory in sequence.AsReadOnlySequence) nngMsg.Append(memory.Span); + await ctx.Send(nngMsg); } /// /// /// - /// - /// - public async Task ReceivedPeersAsync(ReadOnlyMemory msg, ReadOnlyMemory publicKey) + /// + private async Task ReceivedPeersAsync(INngMsgPart nngMsg) { - await using var stream = Util.Manager.GetStream(msg.Span) as RecyclableMemoryStream; - using var reader = new MessagePackStreamReader(stream); + await using var stream = Util.Manager.GetStream(nngMsg.AsSpan()); + var reader = new MessagePackStreamReader(stream); var length = await reader.ReadArrayHeaderAsync(CancellationToken.None); for (var i = 0; i < length; i++) { var elementSequence = await reader.ReadAsync(CancellationToken.None); if (elementSequence == null) continue; var peer = MessagePackSerializer.Deserialize(elementSequence.Value); - if (peer.ClientId == _localPeer.ClientId) continue; #if !DEBUG if (!IsAcceptedAddress(peer.IpAddress)) return; #endif + if (!_cypherSystemCore.Crypto() + .VerifySignature(peer.PublicKey, peer.Timestamp.ToBytes(), peer.Signature)) continue; var key = GetKey(peer.ClientId, peer.IpAddress); - if (!_caching.TryGet(key, out var cachedPeer)) + if (!_caching.TryGet(key, out var cachedPeer) && _peerCooldownCaching[key].IsDefault()) { - if (_peerCooldownCaching[key].IsDefault()) - { - UpdatePeer(peer.ClientId, peer.IpAddress, peer); - } - else if (peer.PublicKey[1..33].Xor(publicKey.ToArray())) - { - _peerCooldownCaching.Remove(key); - UpdatePeer(peer.ClientId, peer.IpAddress, peer); - } + UpdatePeer(peer.ClientId, peer.IpAddress, peer); + continue; } - else if (cachedPeer.BlockCount != peer.BlockCount) + + if (!cachedPeer.IsDefault()) { - peer.Retries = cachedPeer.Retries; + if (cachedPeer.Timestamp >= peer.Timestamp) continue; UpdatePeer(peer.ClientId, peer.IpAddress, peer); + continue; } + + if (_peerCooldownCaching[key].IsDefault()) continue; + if (_peerCooldownCaching[key].Timestamp >= peer.Timestamp) continue; + _peerCooldownCaching.Remove(key); + UpdatePeer(peer.ClientId, peer.IpAddress, peer); } } @@ -447,15 +520,14 @@ public void SetPeerCooldown(PeerCooldown peer) /// private void HandlePeerCooldown() { - _coolDownDisposable = Observable.Interval(TimeSpan.FromMinutes(10)).Subscribe(_ => + _coolDownDisposable = Observable.Interval(TimeSpan.FromMinutes(30)).Subscribe(_ => { if (_cypherSystemCore.ApplicationLifetime.ApplicationStopping.IsCancellationRequested) return; try { - + var removePeerCooldownBeforeTimestamp = Util.GetUtcNow().AddMinutes(-30).ToUnixTimestamp(); var removePeersCooldown = AsyncHelper.RunSync(async delegate { - var removePeerCooldownBeforeTimestamp = Util.GetUtcNow().AddMinutes(-10).ToUnixTimestamp(); return await _peerCooldownCaching.WhereAsync(x => new ValueTask(x.Value.Timestamp < removePeerCooldownBeforeTimestamp)); }); @@ -508,7 +580,10 @@ private void Dispose(bool disposing) if (disposing) { + _discoverDisposable?.Dispose(); _receiverDisposable?.Dispose(); + _socket?.Dispose(); + _ctx?.Dispose(); } _disposed = true; diff --git a/node/Configuration/Utility.cs b/node/Configuration/Utility.cs index 40d76c7b..08e38e3b 100644 --- a/node/Configuration/Utility.cs +++ b/node/Configuration/Utility.cs @@ -51,6 +51,8 @@ public Utility(Config config) StepIpAddress(); WriteDivider("Public TCP Port"); TcpPort(); + WriteDivider("Public Peer Discovery Port"); + DiscoveryPort(); WriteDivider("Public Web Socket Port"); WebSocketPort(); WriteDivider("Public HTTP Port"); @@ -74,6 +76,7 @@ public Utility(Config config) .AddRow("Http Port", _node.Network.HttpPort.ToString()) .AddRow("Tcp Port", _node.Network.P2P.TcpPort.ToString()) .AddRow("Web Socket Port", _node.Network.P2P.WsPort.ToString()) + .AddRow("Discovery Peer Port", _node.Network.P2P.DsPort.ToString()) .AddRow("Auto Sync Time", _node.Network.AutoSyncEveryMinutes.ToString())); } catch (Exception ex) @@ -172,7 +175,7 @@ private void TcpPort() .Title("Your node exposes an API which needs to be accessible by other nodes in the network. This " + "API listens on a configurable public TCP port, the default tcp port number is " + "[bold yellow]7946[/]. You have to make sure that this port is properly." + - "configured in your firewall or router.").AddChoices(new[] { UseDefaultPort, ManuallyEnterPort })); + "configured in your firewall or router.").AddChoices(UseDefaultPort, ManuallyEnterPort)); if (tcpPortChoice == UseDefaultPort) { _node.Network.P2P = new P2P { TcpPort = 7946 }; @@ -193,6 +196,36 @@ private void TcpPort() } } + /// + /// + /// + private void DiscoveryPort() + { + var tcpPortChoice = AnsiConsole.Prompt(new SelectionPrompt() + .Title("Your node exposes an API which needs to be accessible by other nodes in the network. This " + + "API listens on a configurable public Peer discovery port, the default Peer discovery port number is " + + "[bold yellow]5146[/]. You have to make sure that this port is properly." + + "configured in your firewall or router.").AddChoices(UseDefaultPort, ManuallyEnterPort)); + if (tcpPortChoice == UseDefaultPort) + { + _node.Network.P2P = _node.Network.P2P with { DsPort = 5146 }; + } + else + { + var port = AnsiConsole.Prompt(new TextPrompt("Enter [bold green]port[/]:").ValidationErrorMessage("") + .Validate(p => + { + var pass = p is >= -1 and <= int.MaxValue; + return pass switch + { + true => ValidationResult.Success(), + false => ValidationResult.Error("[red]Something went wrong[/]") + }; + })); + _node.Network.P2P = _node.Network.P2P with { DsPort = port }; + } + } + /// /// /// @@ -202,7 +235,7 @@ private void WebSocketPort() .Title("Your node exposes an API which needs to be accessible by other nodes in the network. This " + "API listens on a configurable public TCP port, the default web socket port number is " + "[bold yellow]7947[/]. You have to make sure that this port is properly." + - "configured in your firewall or router.").AddChoices(new[] { UseDefaultPort, ManuallyEnterPort })); + "configured in your firewall or router.").AddChoices(UseDefaultPort, ManuallyEnterPort)); if (tcpPortChoice == UseDefaultPort) { _node.Network.P2P = _node.Network.P2P with { WsPort = 7947 }; @@ -232,7 +265,7 @@ private void HttpPort() .Title("Your node exposes an API which needs to be accessible by other nodes in the network. This " + "API listens on a configurable public TCP port, the default http port number is " + "[bold yellow]48655[/]. You have to make sure that this port is properly." + - "configured in your firewall or router.").AddChoices(new[] { UseDefaultPort, ManuallyEnterPort })); + "configured in your firewall or router.").AddChoices(UseDefaultPort, ManuallyEnterPort)); if (tcpPortChoice == UseDefaultPort) { _node.Network.HttpPort = 48655; @@ -262,7 +295,7 @@ private void HttpsPort() .Title("Your node exposes an API which needs to be accessible by other nodes in the network. This " + "API listens on a configurable public TCP port, the default https port number is " + "[bold yellow]44333[/]. You have to make sure that this port is properly." + - "configured in your firewall or router.").AddChoices(new[] { UseDefaultPort, ManuallyEnterPort })); + "configured in your firewall or router.").AddChoices(UseDefaultPort, ManuallyEnterPort)); if (tcpPortChoice == UseDefaultPort) { _node.Network.HttpsPort = 44333; @@ -290,7 +323,7 @@ private void AutoSyncTime() { var syncTimeChoice = AnsiConsole.Prompt(new SelectionPrompt() .Title("Node automatically starts syncing with network peers on discovery " + - "By default, the auto sync time is set to [bold yellow]10 minutes[/].").AddChoices(new[] { UseDefaultSyncTime, ManuallyEnterSyncTime })); + "By default, the auto sync time is set to [bold yellow]10 minutes[/].").AddChoices(UseDefaultSyncTime, ManuallyEnterSyncTime)); if (syncTimeChoice == UseDefaultSyncTime) { _node.Network.AutoSyncEveryMinutes = 10; From 4fc5d8dddd2a93a7cac51c8e13c21a63278edcff Mon Sep 17 00:00:00 2001 From: pingpongsneak Date: Wed, 26 Oct 2022 22:17:16 +0100 Subject: [PATCH 2/7] refactor: general --- core/Ledger/Validator.cs | 10 ++++------ core/Persistence/DataProtectionRepository.cs | 2 +- core/Persistence/HashChainRepository.cs | 2 +- core/Wallet/Wallet.cs | 1 + 4 files changed, 7 insertions(+), 8 deletions(-) diff --git a/core/Ledger/Validator.cs b/core/Ledger/Validator.cs index 64a56d76..7d622fd2 100644 --- a/core/Ledger/Validator.cs +++ b/core/Ledger/Validator.cs @@ -32,7 +32,7 @@ namespace CypherNetwork.Ledger; /// public interface IValidator { - Task VerifyBlockGraphSignatureNodeRound(BlockGraph blockGraph); + VerifyResult VerifyBlockGraphSignatureNodeRound(BlockGraph blockGraph); VerifyResult VerifyBulletProof(Transaction transaction); VerifyResult VerifyCoinbaseTransaction(Vout coinbase, ulong solution, decimal runningDistribution, ulong height); VerifyResult VerifySolution(byte[] vrfBytes, byte[] kernel, ulong solution); @@ -88,7 +88,6 @@ public Validator(ICypherSystemCore cypherSystemCore, ILogger logger) public async Task VerifyBlockHashAsync(Block block) { Guard.Argument(block, nameof(block)).NotNull(); - using var hasher = Hasher.New(); var hashChainRepository = _cypherSystemCore.UnitOfWork().HashChainRepository; var prevBlock = await hashChainRepository.GetAsync(x => new ValueTask(x.Height == hashChainRepository.Height)); if (prevBlock is null) @@ -96,7 +95,7 @@ public async Task VerifyBlockHashAsync(Block block) _logger.Here().Error("No previous block available"); return VerifyResult.UnableToVerify; } - + using var hasher = Hasher.New(); hasher.Update(prevBlock.Hash); hasher.Update(block.ToHash()); var hash = hasher.Finalize(); @@ -129,14 +128,13 @@ public async Task VerifyMerkleAsync(Block block) /// /// /// - public async Task VerifyBlockGraphSignatureNodeRound(BlockGraph blockGraph) + public VerifyResult VerifyBlockGraphSignatureNodeRound(BlockGraph blockGraph) { Guard.Argument(blockGraph, nameof(blockGraph)).NotNull(); try { if (!_cypherSystemCore.Crypto() - .VerifySignature(new VerifySignatureManualRequest(blockGraph.Signature, blockGraph.PublicKey, - blockGraph.ToHash()))) + .VerifySignature(blockGraph.PublicKey, blockGraph.ToHash(), blockGraph.Signature)) { _logger.Error("Unable to verify the signature for block {@Round} from node {@Node}", blockGraph.Block.Round, blockGraph.Block.Node); diff --git a/core/Persistence/DataProtectionRepository.cs b/core/Persistence/DataProtectionRepository.cs index 533f493f..8fee9405 100644 --- a/core/Persistence/DataProtectionRepository.cs +++ b/core/Persistence/DataProtectionRepository.cs @@ -55,7 +55,7 @@ public new Task PutAsync(byte[] key, DataProtection data) { var cf = _storeDb.Rocks.GetColumnFamily(GetTableNameAsString()); var buffer = MessagePackSerializer.Serialize(data); - _storeDb.Rocks.Put(StoreDb.Key(StoreDb.DataProtectionTable.ToString(), key), buffer, cf); + _storeDb.Rocks.Put(StoreDb.Key(GetTableNameAsString(), key), buffer, cf); saved = true; } } diff --git a/core/Persistence/HashChainRepository.cs b/core/Persistence/HashChainRepository.cs index 4f38b352..1f5c8605 100644 --- a/core/Persistence/HashChainRepository.cs +++ b/core/Persistence/HashChainRepository.cs @@ -73,7 +73,7 @@ public new Task PutAsync(byte[] key, Block data) using (_sync.Write()) { var cf = _storeDb.Rocks.GetColumnFamily(GetTableNameAsString()); - _storeDb.Rocks.Put(StoreDb.Key(StoreDb.HashChainTable.ToString(), key), + _storeDb.Rocks.Put(StoreDb.Key(GetTableNameAsString(), key), MessagePackSerializer.Serialize(data), cf); Height = data.Height; Count++; diff --git a/core/Wallet/Wallet.cs b/core/Wallet/Wallet.cs index e2f6b6f1..7b341768 100644 --- a/core/Wallet/Wallet.cs +++ b/core/Wallet/Wallet.cs @@ -338,6 +338,7 @@ private bool IsSpent(Output output, IWalletSession session) var (spendKey, scanKey) = Unlock(); var transactions = session.GetSafeGuardBlocks() .SelectMany(x => x.Txs).ToArray(); + if (transactions.Any() != true) return null; transactions.Shuffle(); for (var k = 0; k < nRows - 1; ++k) From 468ce8978a4bf870755018bf54522e91f7916ac6 Mon Sep 17 00:00:00 2001 From: pingpongsneak Date: Wed, 26 Oct 2022 22:18:48 +0100 Subject: [PATCH 3/7] fix: index out of range --- core/Ledger/Graph.cs | 4 ++-- core/Wallet/WalletSession.cs | 21 ++++++++++++--------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/core/Ledger/Graph.cs b/core/Ledger/Graph.cs index 6cf258f8..30b1393c 100644 --- a/core/Ledger/Graph.cs +++ b/core/Ledger/Graph.cs @@ -236,7 +236,7 @@ public async Task GetSafeguardBlocksAsync(SafeguardBloc try { var hashChainRepository = _cypherSystemCore.UnitOfWork().HashChainRepository; - var height = hashChainRepository.Height == 0 ? 0 : hashChainRepository.Height - (ulong)safeguardBlocksRequest.NumberOfBlocks; + var height = hashChainRepository.Height <= (ulong)safeguardBlocksRequest.NumberOfBlocks ? hashChainRepository.Height : hashChainRepository.Height - (ulong)safeguardBlocksRequest.NumberOfBlocks; var blocks = await hashChainRepository.OrderByRangeAsync(x => x.Height, (int)height, safeguardBlocksRequest.NumberOfBlocks); if (blocks.Any()) return new SafeguardBlocksResponse(blocks, string.Empty); @@ -474,7 +474,7 @@ private bool Save(BlockGraph blockGraph) Guard.Argument(blockGraph, nameof(blockGraph)).NotNull(); try { - if (_cypherSystemCore.Validator().VerifyBlockGraphSignatureNodeRound(blockGraph).Result != VerifyResult.Succeed) + if (_cypherSystemCore.Validator().VerifyBlockGraphSignatureNodeRound(blockGraph) != VerifyResult.Succeed) { _logger.Error("Unable to verify block for {@Node} and round {@Round}", blockGraph.Block.Node, blockGraph.Block.Round); diff --git a/core/Wallet/WalletSession.cs b/core/Wallet/WalletSession.cs index 4048f861..eaf37425 100644 --- a/core/Wallet/WalletSession.cs +++ b/core/Wallet/WalletSession.cs @@ -9,6 +9,7 @@ using System.Threading.Tasks; using CypherNetwork.Extensions; using CypherNetwork.Models; +using CypherNetwork.Models.Messages; using CypherNetwork.Persistence; using CypherNetwork.Wallet.Models; using Dawn; @@ -245,16 +246,18 @@ private void HandleSafeguardBlocks() _disposableHandleSafeguardBlocks = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(155520000)) .Select(_ => Observable.FromAsync(async () => { - if (_cypherSystemCore.ApplicationLifetime.ApplicationStopping.IsCancellationRequested) return; - - var hashChainRepository = _cypherSystemCore.UnitOfWork().HashChainRepository; - var height = hashChainRepository.Height == 0 ? 0 : hashChainRepository.Height - 147; - var blocks = await hashChainRepository.OrderByRangeAsync(x => x.Height, (int)height, 147); - if (!blocks.Any()) return; - - lock (Locking) + try + { + if (_cypherSystemCore.ApplicationLifetime.ApplicationStopping.IsCancellationRequested) return; + var blocksResponse = await _cypherSystemCore.Graph().GetSafeguardBlocksAsync(new SafeguardBlocksRequest(147)); + lock (Locking) + { + _readOnlySafeGuardBlocks = blocksResponse.Blocks; + } + } + catch (Exception) { - _readOnlySafeGuardBlocks = blocks; + // Ignore } })) .Merge() From 7096bdc7ea08f9eba07510dad294d79b2ead50ab Mon Sep 17 00:00:00 2001 From: pingpongsneak Date: Wed, 26 Oct 2022 22:19:58 +0100 Subject: [PATCH 4/7] fix: step counter --- core/Persistence/HashChainRepository.cs | 28 +++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/core/Persistence/HashChainRepository.cs b/core/Persistence/HashChainRepository.cs index 1f5c8605..04ad9084 100644 --- a/core/Persistence/HashChainRepository.cs +++ b/core/Persistence/HashChainRepository.cs @@ -21,6 +21,7 @@ public interface IHashChainRepository : IRepository { ValueTask> OrderByRangeAsync(Func selector, int skip, int take); new Task PutAsync(byte[] key, Block data); + new bool Delete(byte[] key); ulong Height { get; } ulong Count { get; } } @@ -88,6 +89,33 @@ public new Task PutAsync(byte[] key, Block data) return Task.FromResult(false); } + /// + /// + /// + /// + /// + public new bool Delete(byte[] key) + { + Guard.Argument(key, nameof(key)).NotNull().NotEmpty(); + try + { + using (_sync.Write()) + { + var cf = _storeDb.Rocks.GetColumnFamily(GetTableNameAsString()); + _storeDb.Rocks.Remove(StoreDb.Key(GetTableNameAsString(), key), cf); + Height--; + Count--; + return true; + } + } + catch (Exception ex) + { + _logger.Here().Error(ex, "Error while removing from database"); + } + + return false; + } + /// /// /// From 41611e0f17e12ad5b9142205243f0ed870e75904 Mon Sep 17 00:00:00 2001 From: pingpongsneak Date: Wed, 26 Oct 2022 22:21:16 +0100 Subject: [PATCH 5/7] refactor: seeds --- node/appsettings.json | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/node/appsettings.json b/node/appsettings.json index e8461521..be7b7061 100644 --- a/node/appsettings.json +++ b/node/appsettings.json @@ -10,17 +10,16 @@ "MaxTransactionSizePerBlock" : 2102400 }, "Network": { - "HttpPort": "48655", - "HttpsPort": "44333", + "HttpPort": 48655, + "HttpsPort": 44333, "P2P" : { - "TcpPort": "7946", - "WsPort": "7947" + "TcpPort": 7946, + "WsPort": 7947, + "DsPort": 5146 }, "SeedList": [ - "167.99.81.173:7946" - ], - "SeedListPublicKeys": [ - "05F4C195C979EDB25C5FF3078AB537E74AED015D5196689E728355005F20DB4E29" + "seedtest1.cypherpunks.network:5146", + "seedtest2.cypherpunks.network:5146" ], "Environment": "testnet", "SigningKeyRingName": "DefaultSigning.cyp3.Key", From 88b74fe3ed4d6dd825ef66408b56392ae9f8b322 Mon Sep 17 00:00:00 2001 From: pingpongsneak Date: Wed, 26 Oct 2022 22:22:06 +0100 Subject: [PATCH 6/7] refactor: load balancing downloads --- core/Ledger/Sync.cs | 66 ++++++++++++++++++++++----------------------- 1 file changed, 32 insertions(+), 34 deletions(-) diff --git a/core/Ledger/Sync.cs b/core/Ledger/Sync.cs index f4e36d86..af603aea 100644 --- a/core/Ledger/Sync.cs +++ b/core/Ledger/Sync.cs @@ -12,6 +12,7 @@ using CypherNetwork.Models.Messages; using CypherNetwork.Network; using Dawn; +using libsignal.util; using MessagePack; using Serilog; using Spectre.Console; @@ -38,6 +39,8 @@ public class Sync : ISync, IDisposable private bool _disposed; private int _running; + private static readonly object LockOnSync = new(); + /// /// /// @@ -93,20 +96,25 @@ private async Task SynchronizeAsync() currentRetry++; } - foreach (var peer in _cypherSystemCore.PeerDiscovery().GetDiscoveryStore()) + var peers = _cypherSystemCore.PeerDiscovery().GetDiscoveryStore().Where(x => x.BlockCount > blockCount).ToArray(); + if (peers.Any() != true) return; + peers.Shuffle(); + var maxBlockHeight = peers.Max(x => (long)x.BlockCount); + var chunk = maxBlockHeight / peers.Length; + _logger.Information("Peer count [{@PeerCount}]", peers.Length); + _logger.Information("Network block height [{@MaxBlockHeight}]", maxBlockHeight); + foreach (var peer in peers) { - if (blockCount < peer.BlockCount) + var skip = blockCount <= 6 ? blockCount : blockCount - 6; // +- Depth of blocks to compare. + var take = (int)((int)blockCount + chunk); + if (take > (int)maxBlockHeight) { - var skip = blockCount == 0 ? 0 : blockCount - 6; // +- Depth of blocks to compare. - var synchronized = await SynchronizeAsync(peer, (ulong)skip, (int)peer.BlockCount); - if (!synchronized) continue; - _logger.Information( - "Successfully SYNCHRONIZED with node:{@NodeName} host:{@Host} version:{@Version}", - peer.Name.FromBytes(), peer.IpAddress.FromBytes(), peer.Version.FromBytes()); - break; + take = (int)(maxBlockHeight - (long)blockCount) + (int)blockCount; } - + SynchronizeAsync(peer, skip, take).Wait(); blockCount = _cypherSystemCore.UnitOfWork().HashChainRepository.Count; + _logger.Information("Local block height ({@LocalHeight})", blockCount); + if (blockCount == (ulong)maxBlockHeight) break; } } catch (Exception ex) @@ -150,7 +158,7 @@ private async Task WaitForPeersAsync(int currentRetry, int retryCount) /// /// /// - private async Task SynchronizeAsync(Peer peer, ulong skip, int take) + private async Task SynchronizeAsync(Peer peer, ulong skip, int take) { Guard.Argument(peer, nameof(peer)).HasValue(); Guard.Argument(skip, nameof(skip)).NotNegative(); @@ -160,7 +168,7 @@ private async Task SynchronizeAsync(Peer peer, ulong skip, int take) { var validator = _cypherSystemCore.Validator(); var blocks = await FetchBlocksAsync(peer, skip, take); - if (blocks?.Any() != true) return false; + if (blocks?.Any() != true) return; if (skip == 0) { _logger.Warning("FIRST TIME BOOTSTRAPPING"); @@ -168,7 +176,7 @@ private async Task SynchronizeAsync(Peer peer, ulong skip, int take) else { _logger.Information("CONTINUE BOOTSTRAPPING"); - _logger.Information("CHECKING [BLOCK HEIGHTS]"); + _logger.Information("CHECKING [BLOCK DUPLICATES]"); var verifyNoDuplicateBlockHeights = validator.VerifyNoDuplicateBlockHeights(blocks); if (verifyNoDuplicateBlockHeights == VerifyResult.AlreadyExists) { @@ -179,26 +187,26 @@ private async Task SynchronizeAsync(Peer peer, ulong skip, int take) ClientId = peer.ClientId, PeerState = PeerState.DupBlocks }); - _logger.Warning("Duplicate block heights [UNABLE TO VERIFY]"); - return false; + _logger.Warning("DUPLICATE block height [UNABLE TO VERIFY]"); + return; } _logger.Information("CHECKING [FORK RULE]"); var forkRuleBlocks = await validator.VerifyForkRuleAsync(blocks.OrderBy(x => x.Height).ToArray()); if (forkRuleBlocks.Length == 0) { - _logger.Fatal("Fork rule check [UNABLE TO VERIFY]"); - return false; + _logger.Fatal("FORK RULE CHECK [UNABLE TO VERIFY]"); + return; } blocks = forkRuleBlocks.ToList(); - _logger.Information("Fork rule check [OK]"); + _logger.Information("FORK RULE CHECK [OK]"); } await AnsiConsole.Progress().AutoClear(false).Columns(new TaskDescriptionColumn(), new ProgressBarColumn(), new PercentageColumn(), new SpinnerColumn()).StartAsync(async ctx => { - var warpTask = ctx.AddTask($"SYNCHRONIZING [bold yellow]{blocks.Count}[/] Block(s)", false).IsIndeterminate(); + var warpTask = ctx.AddTask($"[bold green]SYNCHRONIZING[/] [bold yellow]{blocks.Count}[/] Block(s)", false).IsIndeterminate(); warpTask.MaxValue(blocks.Count); warpTask.StartTask(); warpTask.IsIndeterminate(false); @@ -236,16 +244,7 @@ private async Task SynchronizeAsync(Peer peer, ulong skip, int take) catch (Exception ex) { _logger.Here().Error(ex, "SYNCHRONIZATION [FAILED]"); - return false; } - finally - { - var blockCount = _cypherSystemCore.UnitOfWork().HashChainRepository.Count; - _logger.Information("Local node block height set to ({@LocalHeight})", blockCount); - if (blockCount == (ulong)take) isSynchronized = true; - } - - return isSynchronized; } /// @@ -259,14 +258,13 @@ private async Task> FetchBlocksAsync(Peer peer, ulong skip, Guard.Argument(peer, nameof(peer)).HasValue(); Guard.Argument(skip, nameof(skip)).NotNegative(); Guard.Argument(take, nameof(take)).NotNegative(); - _logger.Information("Synchronizing with {@Host} ({@Skip})/({@Take})", peer.IpAddress.FromBytes(), skip, take); var iSkip = skip; try { - _logger.Information("Fetching [{@Range}] block(s)", Math.Abs(take - (int)skip)); const int maxBlocks = 10; - var chunks = Enumerable.Repeat(maxBlocks, take / maxBlocks).ToList(); - if (take % maxBlocks != 0) chunks.Add(take % maxBlocks); + var iTake = take - (int)skip; + var chunks = Enumerable.Repeat(maxBlocks, iTake / maxBlocks).ToList(); + if (iTake % maxBlocks != 0) chunks.Add(iTake % maxBlocks); // Show progress var blocks = await AnsiConsole.Progress().AutoClear(false).Columns(new TaskDescriptionColumn(), @@ -274,8 +272,8 @@ private async Task> FetchBlocksAsync(Peer peer, ulong skip, .StartAsync(async ctx => { var blocks = new List(); - var warpTask = ctx.AddTask("DOWNLOADING", false).IsIndeterminate(); - warpTask.MaxValue(chunks.Count); + var warpTask = ctx.AddTask($"[bold green]DOWNLOADING[/] [bold yellow]{Math.Abs(take - (int)skip)}[/] block(s) from [bold yellow]{peer.Name.FromBytes()}[/] v{peer.Version.FromBytes()}", false).IsIndeterminate(); + warpTask.MaxValue(take - (int)skip); warpTask.StartTask(); warpTask.IsIndeterminate(false); while (!ctx.IsFinished) From ffe9b1788fbca06fc0978305fcd5d61e8adfeb7f Mon Sep 17 00:00:00 2001 From: pingpongsneak Date: Wed, 26 Oct 2022 22:22:29 +0100 Subject: [PATCH 7/7] build: bump version --- core/core.csproj | 2 +- node/Program.cs | 2 +- node/node.csproj | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/core.csproj b/core/core.csproj index cf241c81..8072f2e0 100644 --- a/core/core.csproj +++ b/core/core.csproj @@ -4,7 +4,7 @@ net6.0 AnyCPU;x64 CypherNetwork - 0.0.74.0 + 0.0.75.0 CypherNetwork core cyphernetworkcore diff --git a/node/Program.cs b/node/Program.cs index d593805b..42be67f5 100644 --- a/node/Program.cs +++ b/node/Program.cs @@ -80,7 +80,7 @@ public static async Task Main(string[] args) / / / / / // __ \ / __ \ / _ \ / ___// __ \ / / / // __ \ / //_// ___/ / /___ / /_/ // /_/ // / / // __// / / /_/ // /_/ // / / // ,< (__ ) \____/ \__, // .___//_/ /_/ \___//_/ / .___/ \__,_//_/ /_//_/|_|/____/ - /____//_/ /_/ write code: v{Util.GetAssemblyVersion()} RC1"); + /____//_/ /_/ write code: v{Util.GetAssemblyVersion()} RC2"); Console.WriteLine(); Console.ResetColor(); diff --git a/node/node.csproj b/node/node.csproj index e2cde869..94b94022 100644 --- a/node/node.csproj +++ b/node/node.csproj @@ -2,7 +2,7 @@ true - 0.0.74.0 + 0.0.75.0 en node @@ -52,7 +52,7 @@ - +