diff --git a/core/Controllers/BlockController.cs b/core/Controllers/BlockController.cs index 3717e5db..8055e0f8 100644 --- a/core/Controllers/BlockController.cs +++ b/core/Controllers/BlockController.cs @@ -156,9 +156,7 @@ public async Task GetBlockHeightAsync() { try { - var blockCountResponse = - await _cypherNetworkCore.Graph().GetBlockCountAsync(); - return new ObjectResult(new { height = blockCountResponse?.Count }); + return new ObjectResult(new { height = _cypherNetworkCore.UnitOfWork().HashChainRepository.Count }); } catch (Exception ex) { diff --git a/core/Extensions/AppExtensions.cs b/core/Extensions/AppExtensions.cs index 1da96025..7880aadc 100644 --- a/core/Extensions/AppExtensions.cs +++ b/core/Extensions/AppExtensions.cs @@ -79,7 +79,7 @@ public static ContainerBuilder AddCypherSystemCore(this ContainerBuilder builder Thumbprint = configuration["Node:Network:X509Certificate:Thumbprint"], CertPath = configuration["Node:Network:X509Certificate:CertPath"] }, - TransactionRateConfig = new TransactionLeakRateConfigurationOption + MemoryPoolTransactionRateLimit = new TransactionLeakRateConfigurationOption { LeakRate = Convert.ToInt32( @@ -107,7 +107,7 @@ public static ContainerBuilder AddCypherSystemCore(this ContainerBuilder builder var endpoint = Util.GetIpEndPoint(selection.item.Value); var endpointFromHost = Util.GetIpEndpointFromHostPort(endpoint.Address.ToString(), endpoint.Port); var publicKey = remotePublicKeys[selection.index].Value; - node.Network.SeedList.Add($"{endpointFromHost.Address}:{endpointFromHost.Port}"); + node.Network.SeedList.Add($"{endpointFromHost.Address.ToString()}:{endpointFromHost.Port}"); node.Network.SeedListPublicKeys.Add(publicKey); } diff --git a/core/Helper/Util.cs b/core/Helper/Util.cs index b905448d..e3b3c953 100644 --- a/core/Helper/Util.cs +++ b/core/Helper/Util.cs @@ -360,7 +360,7 @@ public static void ThrowPortNotFree(int port) try { Task.Delay(100); - var localEp = new IPEndPoint(IPAddress.Any, port); + var localEp = new IPEndPoint(GetIpAddress(), port); socket.Bind(localEp); } catch (SocketException ex) diff --git a/core/Ledger/Graph.cs b/core/Ledger/Graph.cs index 6d93da53..7a5130c3 100644 --- a/core/Ledger/Graph.cs +++ b/core/Ledger/Graph.cs @@ -40,8 +40,6 @@ public interface IGraph Task GetTransactionAsync(TransactionRequest transactionRequest); Task GetPreviousBlockAsync(); Task GetSafeguardBlocksAsync(SafeguardBlocksRequest safeguardBlocksRequest); - Task GetBlockHeightAsync(); - Task GetBlockCountAsync(); Task SaveBlockAsync(SaveBlockRequest saveBlockRequest); Task GetBlocksAsync(BlocksRequest blocksRequest); Task PostAsync(BlockGraph blockGraph); @@ -114,7 +112,7 @@ protected override async Task OnReceiveAsync(BlockGraph blockGraph) { Guard.Argument(blockGraph, nameof(blockGraph)).NotNull(); if (_cypherSystemCore.Sync().Running) return; - if (blockGraph.Block.Round != await NextRoundAsync()) return; + if (blockGraph.Block.Round != NextRound()) return; if (await BlockHeightExistsAsync(new BlockHeightExistsRequest(blockGraph.Block.Round)) != VerifyResult.Succeed) return; if (!_syncCacheSeenBlockGraph.Contains(blockGraph.ToIdentifier())) { @@ -221,10 +219,10 @@ public async Task GetTransactionAsync(TransactionRequest tr /// public async Task GetPreviousBlockAsync() { - var unitOfWork = _cypherSystemCore.UnitOfWork(); - var height = await unitOfWork.HashChainRepository.GetBlockHeightAsync(); + var hashChainRepository = _cypherSystemCore.UnitOfWork().HashChainRepository; var prevBlock = - await unitOfWork.HashChainRepository.GetAsync(x => new ValueTask(x.Height == (ulong)height)); + await hashChainRepository.GetAsync(x => + new ValueTask(x.Height == hashChainRepository.Height)); return prevBlock; } @@ -237,10 +235,9 @@ public async Task GetSafeguardBlocksAsync(SafeguardBloc Guard.Argument(safeguardBlocksRequest, nameof(safeguardBlocksRequest)).NotNull(); try { - var unitOfWork = _cypherSystemCore.UnitOfWork(); - var height = (await GetBlockHeightAsync()).Count - safeguardBlocksRequest.NumberOfBlocks; - height = height < 0x0 ? 0x0 : height; - var blocks = await unitOfWork.HashChainRepository.OrderByRangeAsync(x => x.Height, (int)height, + var hashChainRepository = _cypherSystemCore.UnitOfWork().HashChainRepository; + var height = hashChainRepository.Height - (ulong)safeguardBlocksRequest.NumberOfBlocks; + var blocks = await hashChainRepository.OrderByRangeAsync(x => x.Height, (int)height, safeguardBlocksRequest.NumberOfBlocks); if (blocks.Any()) return new SafeguardBlocksResponse(blocks, string.Empty); } @@ -292,25 +289,6 @@ public async Task GetBlockByHeightAsync(BlockByHeightRequest bloc return new BlockResponse(null); } - /// - /// - /// - /// - public async Task GetBlockCountAsync() - { - try - { - var height = await _cypherSystemCore.UnitOfWork().HashChainRepository.CountAsync(); - return new BlockCountResponse(height); - } - catch (Exception ex) - { - _logger.Here().Error("{@Message}", ex.Message); - } - - return new BlockCountResponse(0); - } - /// /// /// @@ -332,24 +310,6 @@ public async Task GetBlocksAsync(BlocksRequest blocksRequest) return new BlocksResponse(null); } - /// - /// - public async Task GetBlockHeightAsync() - { - try - { - var count = (await GetBlockCountAsync()).Count; - if (count > 0) count--; - return new BlockHeightResponse(count); - } - catch (Exception ex) - { - _logger.Here().Error("{@Message}", ex.Message); - } - - return new BlockHeightResponse(0); - } - /// /// /// @@ -650,7 +610,7 @@ private async Task OnDeliveredReadyAsync(Interpreted deliver) foreach (var deliveredBlock in blocks) try { - if (deliveredBlock.Round != await NextRoundAsync()) continue; + if (deliveredBlock.Round != NextRound()) continue; await using var stream = Util.Manager.GetStream(deliveredBlock.Data.AsSpan()) as RecyclableMemoryStream; var block = await MessagePackSerializer.DeserializeAsync(stream); _syncCacheDelivered.AddOrUpdate(block.Hash, block); @@ -690,7 +650,7 @@ private async Task DecideWinnerAsync() }; if (block is { }) { - if (block.Height != await NextRoundAsync()) return; + if (block.Height != NextRound()) return; if (await BlockHeightExistsAsync(new BlockHeightExistsRequest(block.Height)) == VerifyResult.AlreadyExists) { _logger.Error("Block winner already exists"); @@ -744,17 +704,7 @@ private async Task DecideWinnerAsync() /// private ulong GetRound() { - return AsyncHelper.RunSync(GetRoundAsync); - } - - /// - /// - /// - /// - private async Task GetRoundAsync() - { - var blockHeightResponse = await GetBlockHeightAsync(); - return (ulong)blockHeightResponse.Count; + return _cypherSystemCore.UnitOfWork().HashChainRepository.Height; } /// @@ -763,16 +713,7 @@ private async Task GetRoundAsync() /// private ulong NextRound() { - return AsyncHelper.RunSync(NextRoundAsync); - } - - /// - /// - /// - /// - private async Task NextRoundAsync() - { - return await GetRoundAsync() + 1; + return _cypherSystemCore.UnitOfWork().HashChainRepository.Count; } /// @@ -784,7 +725,7 @@ private async Task BroadcastAsync(BlockGraph blockGraph) Guard.Argument(blockGraph, nameof(blockGraph)).NotNull(); try { - if (blockGraph.Block.Round == await NextRoundAsync()) + if (blockGraph.Block.Round == NextRound()) await _cypherSystemCore.Broadcast().PostAsync((TopicType.AddBlockGraph, MessagePackSerializer.Serialize(blockGraph))); } diff --git a/core/Ledger/PPoS.cs b/core/Ledger/PPoS.cs index cb035407..9a470d5b 100644 --- a/core/Ledger/PPoS.cs +++ b/core/Ledger/PPoS.cs @@ -234,9 +234,8 @@ private async Task BlockHeightSynchronizedAsync() { var peers = await _cypherSystemCore.PeerDiscovery().GetDiscoveryAsync(); if (!peers.Any()) return true; - var blockCountResponse = await _cypherSystemCore.Graph().GetBlockCountAsync(); var maxBlockHeight = peers.Max(x => x.BlockCount); - return blockCountResponse?.Count >= (long)maxBlockHeight; + return _cypherSystemCore.UnitOfWork().HashChainRepository.Count >= maxBlockHeight; } /// @@ -324,8 +323,7 @@ private async Task CreateCoinstakeAsync(Kernel kernel) var validator = _cypherSystemCore.Validator(); var solution = await validator.SolutionAsync(kernel.CalculatedVrfSignature, kernel.Hash).ConfigureAwait(false); if (solution == 0) return null; - var height = await _cypherSystemCore.UnitOfWork().HashChainRepository.CountAsync() + 1; - var networkShare = validator.NetworkShare(solution, (ulong)height); + var networkShare = validator.NetworkShare(solution, _cypherSystemCore.UnitOfWork().HashChainRepository.Count + 1); var bits = validator.Bits(solution, networkShare); _logger.Information("Begin... [COINSTAKE]"); var walletTransaction = await _cypherSystemCore.Wallet() @@ -412,11 +410,11 @@ private BlockGraph NewBlockGraph(in Block block, in Block prevBlock) var block = new Block { Hash = new byte[32], - Height = 1 + previousBlock.Height, + Height = previousBlock.Height + 1, BlockHeader = new BlockHeader { Version = 2, - Height = previousBlock.BlockHeader.Height, + Height = previousBlock.Height + 1, Locktime = lockTime, LocktimeScript = new Script(Op.GetPushOp(lockTime), OpcodeType.OP_CHECKLOCKTIMEVERIFY).ToString().ToBytes(), diff --git a/core/Ledger/Sync.cs b/core/Ledger/Sync.cs index 9e4ce321..3ffe57db 100644 --- a/core/Ledger/Sync.cs +++ b/core/Ledger/Sync.cs @@ -10,9 +10,11 @@ using CypherNetwork.Extensions; using CypherNetwork.Models; using CypherNetwork.Models.Messages; +using CypherNetwork.Network; using Dawn; using MessagePack; using Serilog; +using Spectre.Console; using Block = CypherNetwork.Models.Block; namespace CypherNetwork.Ledger; @@ -60,9 +62,9 @@ private void Init() { if (Running) return; _running = 1; - SynchronizeAsync().SafeFireAndForget(exception => + SynchronizeAsync().SafeFireAndForget(ex => { - _logger.Here().Error("{@Message}", exception.Message); + _logger.Here().Error("{@Message}", ex.Message); }); } catch (TaskCanceledException) @@ -80,9 +82,8 @@ private async Task SynchronizeAsync() _logger.Information("Begin... [SYNCHRONIZATION]"); try { - var blockCountResponse = - await _cypherSystemCore.Graph().GetBlockCountAsync(); - _logger.Information("OPENING block height [{@Height}]", blockCountResponse?.Count); + var blockCount = _cypherSystemCore.UnitOfWork().HashChainRepository.Count; + _logger.Information("OPENING block height [{@Height}]", blockCount); var currentRetry = 0; for (; ; ) { @@ -94,10 +95,9 @@ private async Task SynchronizeAsync() foreach (var peer in _cypherSystemCore.PeerDiscovery().GetDiscoveryStore()) { - if (blockCountResponse?.Count < (long)peer.BlockCount) + if (blockCount < peer.BlockCount) { - var skip = blockCountResponse.Count - 6; // +- Depth of blocks to compare. - skip = skip < 0 ? blockCountResponse.Count : skip; + var skip = blockCount - 6; // +- Depth of blocks to compare. var synchronized = await SynchronizeAsync(peer, (ulong)skip, (int)peer.BlockCount); if (!synchronized) continue; _logger.Information( @@ -106,7 +106,7 @@ private async Task SynchronizeAsync() break; } - blockCountResponse = await _cypherSystemCore.Graph().GetBlockCountAsync(); + blockCount = _cypherSystemCore.UnitOfWork().HashChainRepository.Count; } } catch (Exception ex) @@ -116,8 +116,8 @@ private async Task SynchronizeAsync() finally { Interlocked.Exchange(ref _running, 0); - var blockCountResponse = await _cypherSystemCore.Graph().GetBlockCountAsync(); - _logger.Information("LOCAL NODE block height: [{@LocalHeight}]", blockCountResponse?.Count); + var blockCount = _cypherSystemCore.UnitOfWork().HashChainRepository.Count; + _logger.Information("LOCAL NODE block height: [{@LocalHeight}]", blockCount); _logger.Information("End... [SYNCHRONIZATION]"); _logger.Information("Next...[SYNCHRONIZATION] in {@Message} minute(s)", _cypherSystemCore.Node.Network.AutoSyncEveryMinutes); @@ -169,12 +169,16 @@ private async Task SynchronizeAsync(Peer peer, ulong skip, int take) { _logger.Information("CONTINUE BOOTSTRAPPING"); _logger.Information("CHECKING [BLOCK HEIGHTS]"); - var verifyNoDuplicateBlockHeights = validator.VerifyNoDuplicateBlockHeights(blocks); if (verifyNoDuplicateBlockHeights == VerifyResult.AlreadyExists) { _cypherSystemCore.PeerDiscovery().SetPeerCooldown(new PeerCooldown - { IpAddress = peer.IpAddress, PublicKey = peer.PublicKey }); + { + IpAddress = peer.IpAddress, + PublicKey = peer.PublicKey, + ClientId = peer.ClientId, + PeerState = PeerState.DupBlocks + }); _logger.Warning("Duplicate block heights [UNABLE TO VERIFY]"); return false; } @@ -191,24 +195,43 @@ private async Task SynchronizeAsync(Peer peer, ulong skip, int take) _logger.Information("Fork rule check [OK]"); } - _logger.Information("SYNCHRONIZING [{@BlockCount}] Block(s)", blocks.Count); - foreach (var block in blocks.OrderBy(x => x.Height)) - try - { - _logger.Information("SYNCING block height: [{@Height}]", block.Height); - var verifyBlockHeader = await validator.VerifyBlockAsync(block); - if (verifyBlockHeader != VerifyResult.Succeed) return false; - _logger.Information("SYNCHRONIZED [OK]"); - var saveBlockResponse = await _cypherSystemCore.Graph().SaveBlockAsync(new SaveBlockRequest(block)); - if (saveBlockResponse.Ok) continue; - _logger.Error("Unable to save block: {@Hash}", block.Hash); - return false; - } - catch (Exception ex) - { - _logger.Here().Error(ex, "Unable to save block: {@Hash}", block.Hash.ByteToHex()); - return false; - } + await AnsiConsole.Progress().AutoClear(false).Columns(new TaskDescriptionColumn(), new ProgressBarColumn(), + new PercentageColumn(), new SpinnerColumn()).StartAsync(async ctx => + { + var warpTask = ctx.AddTask($"SYNCHRONIZING [bold yellow]{blocks.Count}[/] Block(s)", false).IsIndeterminate(); + warpTask.MaxValue(blocks.Count); + warpTask.StartTask(); + warpTask.IsIndeterminate(false); + while (!ctx.IsFinished) + foreach (var block in blocks.OrderBy(x => x.Height)) + try + { + var verifyBlockHeader = await validator.VerifyBlockAsync(block); + if (verifyBlockHeader != VerifyResult.Succeed) + { + warpTask.StopTask(); + return; + } + var saveBlockResponse = + await _cypherSystemCore.Graph().SaveBlockAsync(new SaveBlockRequest(block)); + if (saveBlockResponse.Ok) + { + await Task.Delay(1); + warpTask.Increment(1); + continue; + } + + warpTask.StopTask(); + AnsiConsole.MarkupLine("[red]LOG:[/] " + $"Unable to save block: {block.Hash}" + "[red]...[/]"); + return; + } + catch (Exception ex) + { + warpTask.StopTask(); + _logger.Here().Error(ex, "Unable to save block: {@Hash}", block.Hash.ByteToHex()); + return; + } + }); } catch (Exception ex) { @@ -217,9 +240,9 @@ private async Task SynchronizeAsync(Peer peer, ulong skip, int take) } finally { - var blockCountResponse = await _cypherSystemCore.Graph().GetBlockCountAsync(); - _logger.Information("Local node block height set to ({@LocalHeight})", blockCountResponse?.Count); - if (blockCountResponse?.Count == take) isSynchronized = true; + var blockCount = _cypherSystemCore.UnitOfWork().HashChainRepository.Count; + _logger.Information("Local node block height set to ({@LocalHeight})", blockCount); + if (blockCount == (ulong)take) isSynchronized = true; } return isSynchronized; @@ -237,18 +260,47 @@ private async Task> FetchBlocksAsync(Peer peer, ulong skip, Guard.Argument(skip, nameof(skip)).NotNegative(); Guard.Argument(take, nameof(take)).NotNegative(); _logger.Information("Synchronizing with {@Host} ({@Skip})/({@Take})", peer.IpAddress.FromBytes(), skip, take); + var iSkip = skip; try { _logger.Information("Fetching [{@Range}] block(s)", Math.Abs(take - (int)skip)); - var blocksResponse = await _cypherSystemCore.P2PDeviceReq().SendAsync(peer.IpAddress, - peer.TcpPort, peer.PublicKey, - MessagePackSerializer.Serialize(new Parameter[] + const int maxBlocks = 10; + var chunks = Enumerable.Repeat(maxBlocks, take / maxBlocks).ToList(); + if (take % maxBlocks != 0) chunks.Add(take % maxBlocks); + + // Show progress + var blocks = await AnsiConsole.Progress().AutoClear(false).Columns(new TaskDescriptionColumn(), + new ProgressBarColumn(), new PercentageColumn(), new SpinnerColumn()) + .StartAsync(async ctx => { - new() { Value = skip.ToBytes(), ProtocolCommand = ProtocolCommand.GetBlocks }, - new() { Value = take.ToBytes(), ProtocolCommand = ProtocolCommand.GetBlocks } - })); - _logger.Information("Finished with [{@BlockCount}] block(s)", blocksResponse.Blocks.Count); - return blocksResponse.Blocks; + var blocks = new List(); + var warpTask = ctx.AddTask("DOWNLOADING", false).IsIndeterminate(); + warpTask.MaxValue(chunks.Count); + warpTask.StartTask(); + warpTask.IsIndeterminate(false); + while (!ctx.IsFinished) + foreach (var chunk in chunks) + { + var blocksResponse = await _cypherSystemCore.P2PDeviceReq().SendAsync( + peer.IpAddress, peer.TcpPort, peer.PublicKey, + MessagePackSerializer.Serialize(new Parameter[] + { + new() { Value = iSkip.ToBytes(), ProtocolCommand = ProtocolCommand.GetBlocks }, + new() { Value = chunk.ToBytes(), ProtocolCommand = ProtocolCommand.GetBlocks } + }), 500); + if (blocksResponse?.Blocks is null) + { + warpTask.StopTask(); + break; + } + blocks.AddRange(blocksResponse.Blocks); + iSkip += (ulong)chunk; + await Task.Delay(100); + warpTask.Increment(chunk); + } + return blocks; + }); + return blocks; } catch (Exception ex) { diff --git a/core/Ledger/Validator.cs b/core/Ledger/Validator.cs index f2bdde55..733340cd 100644 --- a/core/Ledger/Validator.cs +++ b/core/Ledger/Validator.cs @@ -89,9 +89,8 @@ public async Task VerifyBlockHashAsync(Block block) { Guard.Argument(block, nameof(block)).NotNull(); using var hasher = Hasher.New(); - var unitOfWork = _cypherSystemCore.UnitOfWork(); - var height = await unitOfWork.HashChainRepository.GetBlockHeightAsync(); - var prevBlock = await unitOfWork.HashChainRepository.GetAsync(x => new ValueTask(x.Height == (ulong)height)); + var hashChainRepository = _cypherSystemCore.UnitOfWork().HashChainRepository; + var prevBlock = await hashChainRepository.GetAsync(x => new ValueTask(x.Height == hashChainRepository.Height)); if (prevBlock is null) { _logger.Here().Error("No previous block available"); @@ -112,9 +111,9 @@ public async Task VerifyBlockHashAsync(Block block) public async Task VerifyMerkleAsync(Block block) { Guard.Argument(block, nameof(block)).NotNull(); - var unitOfWork = _cypherSystemCore.UnitOfWork(); - var height = await unitOfWork.HashChainRepository.GetBlockHeightAsync(); - var prevBlock = await unitOfWork.HashChainRepository.GetAsync(x => new ValueTask(x.Height == (ulong)height)); + var hashChainRepository = _cypherSystemCore.UnitOfWork().HashChainRepository; + var prevBlock = + await hashChainRepository.GetAsync(x => new ValueTask(x.Height == hashChainRepository.Height)); if (prevBlock is null) { _logger.Here().Error("No previous block available"); @@ -718,15 +717,15 @@ public async Task GetRunningDistributionAsync() { var unitOfWork = _cypherSystemCore.UnitOfWork(); var runningDistributionTotal = LedgerConstant.Distribution; - var height = await unitOfWork.HashChainRepository.CountAsync() + 1; + var height = unitOfWork.HashChainRepository.Count + 1; var blockHeaders = await unitOfWork.HashChainRepository.TakeLongAsync(height); var orderedBlockHeaders = blockHeaders.OrderBy(x => x.Height).ToArray(blockHeaders.Count); - var length = height > orderedBlockHeaders.Length + var length = height > (ulong)orderedBlockHeaders.Length ? orderedBlockHeaders.LongLength : orderedBlockHeaders.Length - 1; for (var i = 0; i < length; i++) { - runningDistributionTotal -= NetworkShare(orderedBlockHeaders[i].BlockPos.Solution, (ulong)height); + runningDistributionTotal -= NetworkShare(orderedBlockHeaders[i].BlockPos.Solution, height); } return runningDistributionTotal; diff --git a/core/Models/Messages/Messages.cs b/core/Models/Messages/Messages.cs index 757bc36b..0ebf1a72 100644 --- a/core/Models/Messages/Messages.cs +++ b/core/Models/Messages/Messages.cs @@ -14,7 +14,7 @@ namespace CypherNetwork.Models.Messages; /// /// [MessagePackObject] -public record BlockCountResponse([property: Key(0)] long Count); +public record BlockCountResponse([property: Key(0)] ulong Count); public record BlockCountRequest; /// @@ -22,7 +22,7 @@ public record BlockCountRequest; /// /// [MessagePackObject] -public record BlockHeightResponse([property: Key(0)] long Count); +public record BlockHeightResponse([property: Key(0)] ulong Count); public record BlockHeightRequest(ulong Height); public record BlockByHeightRequest(ulong Height); diff --git a/core/Models/NetworkSetting.cs b/core/Models/NetworkSetting.cs index 87ba1947..21068bea 100644 --- a/core/Models/NetworkSetting.cs +++ b/core/Models/NetworkSetting.cs @@ -10,6 +10,7 @@ namespace CypherNetwork.Models; public class Config { public Node Node { get; set; } + public Log Log { get; set; } } public record Node @@ -30,7 +31,7 @@ public record Network public int AutoSyncEveryMinutes { get; set; } public LettuceEncrypt LettuceEncrypt { get; set; } public X509Certificate X509Certificate { get; set; } - public TransactionLeakRateConfigurationOption TransactionRateConfig { get; set; } + public TransactionLeakRateConfigurationOption MemoryPoolTransactionRateLimit { get; set; } public string SigningKeyRingName { get; set; } public int HttpPort { get; set; } public int HttpsPort { get; set; } @@ -72,5 +73,40 @@ public record Staking public int MaxTransactionsPerBlock { get; set; } public int MaxTransactionSizePerBlock { get; set; } public string RewardAddress { get; set; } +} + +public class Args +{ + public string outputTemplate { get; set; } + public string formatter { get; set; } + public string path { get; set; } + public int? fileSizeLimitBytes { get; set; } + public bool? rollOnFileSizeLimit { get; set; } + public string rollingInterval { get; set; } + public int? retainedFileCountLimit { get; set; } +} +public class Log +{ + public MinimumLevel MinimumLevel { get; set; } + public string Enrich { get; set; } + public List WriteTo { get; set; } +} + +public class MinimumLevel +{ + public string Default { get; set; } + public Override Override { get; set; } +} + +public class Override +{ + public string System { get; set; } + public string Microsoft { get; set; } +} + +public class WriteTo +{ + public string Name { get; set; } + public Args Args { get; set; } } \ No newline at end of file diff --git a/core/Models/Peer.cs b/core/Models/Peer.cs index 7a18068f..47263bfb 100644 --- a/core/Models/Peer.cs +++ b/core/Models/Peer.cs @@ -3,6 +3,7 @@ using System; using CypherNetwork.Extensions; +using CypherNetwork.Network; using MessagePack; namespace CypherNetwork.Models; @@ -20,7 +21,7 @@ public struct Peer : IComparable [Key(7)] public byte[] Name { get; set; } [Key(8)] public byte[] PublicKey { get; set; } [Key(9)] public byte[] Version { get; set; } - [Key(10)] public long Timestamp { get; set; } + [IgnoreMember] public int Retries { get; set; } /// /// diff --git a/core/Models/PeerCooldown.cs b/core/Models/PeerCooldown.cs index fe0f6e5b..49516b8e 100644 --- a/core/Models/PeerCooldown.cs +++ b/core/Models/PeerCooldown.cs @@ -1,6 +1,7 @@ using System; using CypherNetwork.Extensions; using CypherNetwork.Helper; +using CypherNetwork.Network; namespace CypherNetwork.Models; @@ -9,13 +10,17 @@ public struct PeerCooldown : IComparable { public byte[] IpAddress { get; set; } public byte[] PublicKey { get; set; } + public ulong ClientId { get; set; } public long Timestamp { get; } + public PeerState PeerState { get; set; } public PeerCooldown() { + ClientId = 0; IpAddress = null; PublicKey = null; Timestamp = Util.GetAdjustedTimeAsUnixTimestamp(); + PeerState = PeerState.Ready; } /// diff --git a/core/Network/P2PDevice.cs b/core/Network/P2PDevice.cs index a21bc56a..202ec80a 100644 --- a/core/Network/P2PDevice.cs +++ b/core/Network/P2PDevice.cs @@ -2,6 +2,7 @@ // To view a copy of this license, visit https://creativecommons.org/licenses/by-nc-nd/4.0 using System; +using System.Buffers; using System.Collections.Generic; using System.Net; using System.Reactive.Linq; @@ -140,7 +141,7 @@ private Task ListeningAsync(IPEndPoint ipEndPoint, Transport transport, int work try { _repSocket = NngFactorySingleton.Instance.Factory.ReplierOpen() - .ThenListen($"{GetTransportType(transport)}://{ipEndPoint.Address.ToString()}:{ipEndPoint.Port}").Unwrap(); + .ThenListen($"{GetTransportType(transport)}://{ipEndPoint.Address.ToString()}:{ipEndPoint.Port}", Defines.NngFlag.NNG_FLAG_NONBLOCK).Unwrap(); _repSocket.SetOpt(Defines.NNG_OPT_RECVMAXSZ, 20000000); for (var i = 0; i < workerCount; i++) { @@ -150,8 +151,7 @@ private Task ListeningAsync(IPEndPoint ipEndPoint, Transport transport, int work if (_cypherSystemCore.ApplicationLifetime.ApplicationStopping.IsCancellationRequested) return; try { - var p2PDeviceWorker = new P2PDeviceWorker(_cypherSystemCore, ctx, _logger); - p2PDeviceWorker.WorkerAsync().Wait(); + WorkerAsync(ctx).Wait(); } catch (AggregateException) { @@ -168,12 +168,98 @@ private Task ListeningAsync(IPEndPoint ipEndPoint, Transport transport, int work return Task.CompletedTask; } + /// + /// + /// + /// + private async Task WorkerAsync(IRepReqAsyncContext ctx) + { + var nngResult = (await ctx.Receive()).Unwrap(); + try + { + var message = await _cypherSystemCore.P2PDevice().DecryptAsync(nngResult); + if (message.Memory.Length == 0) + { + await EmptyReplyAsync(ctx); + return; + } + + var unwrapMessage = await UnWrapAsync(message.Memory); + if (unwrapMessage.ProtocolCommand != ProtocolCommand.NotFound) + { + var newMsg = NngFactorySingleton.Instance.Factory.CreateMessage(); + try + { + var response = + await _cypherSystemCore.P2PDeviceApi().Commands[(int)unwrapMessage.ProtocolCommand]( + unwrapMessage.Parameters); + if (unwrapMessage.ProtocolCommand == ProtocolCommand.UpdatePeers) + { + await EmptyReplyAsync(ctx); + return; + } + + var cipher = _cypherSystemCore.Crypto().BoxSeal( + response.IsSingleSegment ? response.First.Span : response.ToArray(), message.PublicKey); + if (cipher.Length != 0) + { + await using var packetStream = Util.Manager.GetStream() as RecyclableMemoryStream; + packetStream.Write(_cypherSystemCore.KeyPair.PublicKey[1..33].WrapLengthPrefix()); + packetStream.Write(cipher.WrapLengthPrefix()); + foreach (var memory in packetStream.GetReadOnlySequence()) newMsg.Append(memory.Span); + (await ctx.Reply(newMsg)).Unwrap(); + return; + } + } + catch (MessagePackSerializationException) + { + // Ignore + } + catch (AccessViolationException ex) + { + _logger.Here().Fatal("{@Message}", ex.Message); + } + catch (Exception ex) + { + _logger.Here().Fatal("{@Message}", ex.Message); + } + finally + { + newMsg.Dispose(); + } + } + + await EmptyReplyAsync(ctx); + } + finally + { + nngResult.Dispose(); + } + } + + /// + /// + /// + /// + private static async Task EmptyReplyAsync(IRepReqAsyncContext ctx) + { + try + { + var newMsg = NngFactorySingleton.Instance.Factory.CreateMessage(); + (await ctx.Reply(newMsg)).Unwrap(); + } + catch (Exception) + { + // Ignore + } + } + /// /// /// /// /// - public static async Task UnWrapAsync(ReadOnlyMemory msg) + private static async Task UnWrapAsync(ReadOnlyMemory msg) { try { @@ -184,13 +270,13 @@ public static async Task UnWrapAsync(ReadOnlyMemory msg) return new UnwrapMessage(parameters, command); } } - catch (ArgumentOutOfRangeException) + catch (ArgumentOutOfRangeException ex) { - // Ignore + Console.WriteLine("ArgumentOutOfRangeException: " + ex.Message); } - catch (Exception) + catch (Exception ex) { - // Ignore + Console.WriteLine("Exception: " + ex); } return default; diff --git a/core/Network/P2PDeviceApi.cs b/core/Network/P2PDeviceApi.cs index 52b71ad0..6bd74562 100644 --- a/core/Network/P2PDeviceApi.cs +++ b/core/Network/P2PDeviceApi.cs @@ -96,7 +96,7 @@ private async Task> OnGetPeerAsync(Parameter[] none = def /// private Task> OnGetPeersAsync(Parameter[] none = default) { - var nodePeersResponse = _cypherSystemCore.PeerDiscovery().Reply(); + var nodePeersResponse = _cypherSystemCore.PeerDiscovery().GetPeers(); return Task.FromResult(nodePeersResponse); } @@ -141,16 +141,14 @@ private async Task> OnSaveBlockAsync(Parameter[] paramete /// private async Task> OnGetBlockHeightAsync(Parameter[] none = default) { - var blockHeightResponse = await _cypherSystemCore.Graph().GetBlockHeightAsync(); - return await SerializeAsync(blockHeightResponse); + return await SerializeAsync(new BlockHeightResponse(_cypherSystemCore.UnitOfWork().HashChainRepository.Height)); } /// /// private async Task> OnGetBlockCountAsync(Parameter[] none = default) { - var blockCountResponse = await _cypherSystemCore.Graph().GetBlockCountAsync(); - return await SerializeAsync(blockCountResponse); + return await SerializeAsync(new BlockCountResponse(_cypherSystemCore.UnitOfWork().HashChainRepository.Count)); } /// diff --git a/core/Network/P2PDeviceReq.cs b/core/Network/P2PDeviceReq.cs index 1808293f..515b79b6 100644 --- a/core/Network/P2PDeviceReq.cs +++ b/core/Network/P2PDeviceReq.cs @@ -20,7 +20,7 @@ namespace CypherNetwork.Network; public interface IP2PDeviceReq { Task SendAsync(ReadOnlyMemory ipAddress, ReadOnlyMemory tcpPort, ReadOnlyMemory publicKey, - ReadOnlyMemory value, bool deserialize = true); + ReadOnlyMemory value, int timeMs = 200, bool deserialize = true); } public class EmptyMessage { } @@ -34,7 +34,7 @@ public class P2PDeviceReq : IP2PDeviceReq private readonly ICypherSystemCore _cypherSystemCore; private readonly ILogger _logger; private readonly Ping _ping = new(); - + public P2PDeviceReq(ICypherSystemCore cypherSystemCore) { _cypherSystemCore = cypherSystemCore; @@ -49,12 +49,13 @@ public P2PDeviceReq(ICypherSystemCore cypherSystemCore) /// /// /// + /// /// /// /// /// public async Task SendAsync(ReadOnlyMemory ipAddress, ReadOnlyMemory tcpPort, ReadOnlyMemory publicKey, - ReadOnlyMemory value, bool deserialize = true) + ReadOnlyMemory value, int timeMs = 200, bool deserialize = true) { var nngMsg = NngFactorySingleton.Instance.Factory.CreateMessage(); try @@ -72,8 +73,8 @@ public P2PDeviceReq(ICypherSystemCore cypherSystemCore) using var socket = NngFactorySingleton.Instance.Factory.RequesterOpen() .ThenDial($"tcp://{address}:{port}", Defines.NngFlag.NNG_FLAG_NONBLOCK).Unwrap(); - socket.SetOpt(Defines.NNG_OPT_RECVTIMEO, new nng_duration { TimeMs = 200 }); - socket.SetOpt(Defines.NNG_OPT_SENDTIMEO, new nng_duration { TimeMs = 200 }); + socket.SetOpt(Defines.NNG_OPT_RECVTIMEO, new nng_duration { TimeMs = timeMs }); + socket.SetOpt(Defines.NNG_OPT_SENDTIMEO, new nng_duration { TimeMs = timeMs }); using var ctx = socket.CreateAsyncContext(NngFactorySingleton.Instance.Factory).Unwrap(); var cipher = _cypherSystemCore.Crypto().BoxSeal(value.Span, publicKey.Span[1..33]); diff --git a/core/Network/P2PDeviceWorker.cs b/core/Network/P2PDeviceWorker.cs deleted file mode 100644 index 728b8bbe..00000000 --- a/core/Network/P2PDeviceWorker.cs +++ /dev/null @@ -1,119 +0,0 @@ -using System; -using System.Buffers; -using System.Threading; -using System.Threading.Tasks; -using System.Threading.Tasks.Dataflow; -using CypherNetwork.Extensions; -using CypherNetwork.Helper; -using CypherNetwork.Models.Messages; -using MessagePack; -using Microsoft.IO; -using nng; -using Serilog; - -namespace CypherNetwork.Network; - -/// -/// -/// -public class P2PDeviceWorker : ReceivedActor -{ - private readonly ICypherSystemCore _cypherSystemCore; - private readonly IRepReqAsyncContext _ctx; - private readonly ILogger _logger; - private readonly AutoResetEvent _autoReset = new(false); - private readonly INngMsg _nngMsg = NngFactorySingleton.Instance.Factory.CreateMessage(); - public P2PDeviceWorker(ICypherSystemCore cypherSystemCore, IRepReqAsyncContext ctx, ILogger logger) - : base(new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 }) - { - _cypherSystemCore = cypherSystemCore; - _ctx = ctx; - _logger = logger; - } - - /// - /// - /// - public async Task WorkerAsync() - { - var nngResult = await _ctx.Receive(); - await PostAsync(nngResult.Unwrap()); - _autoReset.WaitOne(10000); - _autoReset.Close(); - } - - /// - /// - /// - /// - protected override async Task OnReceiveAsync(INngMsg nngMsg) - { - try - { - var message = await _cypherSystemCore.P2PDevice().DecryptAsync(nngMsg); - if (message.Memory.Length == 0) - { - await EmptyReplyAsync(); - return; - } - - var unwrapMessage = await P2PDevice.UnWrapAsync(message.Memory); - if (unwrapMessage.ProtocolCommand != ProtocolCommand.NotFound) - { - try - { - var response = - await _cypherSystemCore.P2PDeviceApi().Commands[(int)unwrapMessage.ProtocolCommand]( - unwrapMessage.Parameters); - if (unwrapMessage.ProtocolCommand == ProtocolCommand.UpdatePeers) - { - await EmptyReplyAsync(); - return; - } - - var cipher = _cypherSystemCore.Crypto().BoxSeal( - response.IsSingleSegment ? response.First.Span : response.ToArray(), message.PublicKey); - if (cipher.Length != 0) - { - await using var packetStream = Util.Manager.GetStream() as RecyclableMemoryStream; - packetStream.Write(_cypherSystemCore.KeyPair.PublicKey[1..33].WrapLengthPrefix()); - packetStream.Write(cipher.WrapLengthPrefix()); - foreach (var memory in packetStream.GetReadOnlySequence()) _nngMsg.Append(memory.Span); - (await _ctx.Reply(_nngMsg)).Unwrap(); - return; - } - } - catch (MessagePackSerializationException) - { - // Ignore - } - catch (AccessViolationException ex) - { - _logger.Fatal("{@Message}", ex.Message); - } - catch (Exception ex) - { - _logger.Fatal("{@Message}", ex.Message); - } - finally - { - _nngMsg.Take(); - } - } - - await EmptyReplyAsync(); - } - finally - { - _autoReset.Set(); - } - } - - /// - /// - /// - private async Task EmptyReplyAsync() - { - (await _ctx.Reply(_nngMsg)).Unwrap(); - } -} \ No newline at end of file diff --git a/core/Network/PeerDiscovery.cs b/core/Network/PeerDiscovery.cs index f145f14a..070af55b 100644 --- a/core/Network/PeerDiscovery.cs +++ b/core/Network/PeerDiscovery.cs @@ -71,7 +71,7 @@ public interface IPeerDiscovery /// /// /// - ReadOnlySequence Reply(); + ReadOnlySequence GetPeers(); } /// @@ -82,6 +82,8 @@ public sealed class PeerDiscovery : IDisposable, IPeerDiscovery private readonly Caching _peerCooldownCaching = new(); private readonly ICypherSystemCore _cypherSystemCore; private readonly ILogger _logger; + + private LocalNode _localNode; private IDisposable _receiverDisposable; private IDisposable _coolDownDisposable; private Peer _localPeer; @@ -138,14 +140,7 @@ public Peer GetLocalPeer() /// private void UpdateLocalPeerInfo() { - var blockCountResponse = AsyncHelper.RunSync(async delegate - { - var value = await _cypherSystemCore.Graph().GetBlockCountAsync(); - return value; - }); - - _localPeer.BlockCount = (ulong)blockCountResponse.Count; - _localPeer.Timestamp = Util.GetAdjustedTimeAsUnixTimestamp(); + _localPeer.BlockCount = _cypherSystemCore.UnitOfWork().HashChainRepository.Count; } /// @@ -153,49 +148,72 @@ private void UpdateLocalPeerInfo() /// public LocalNode GetLocalNode() { - return new LocalNode - { - IpAddress = _cypherSystemCore.Node.EndPoint.Address.ToString().ToBytes(), - Identifier = _cypherSystemCore.KeyPair.PublicKey.ToHashIdentifier(), - TcpPort = _cypherSystemCore.Node.Network.P2P.TcpPort.ToBytes(), - WsPort = _cypherSystemCore.Node.Network.P2P.WsPort.ToBytes(), - HttpPort = _cypherSystemCore.Node.Network.HttpPort.ToBytes(), - HttpsPort = _cypherSystemCore.Node.Network.HttpsPort.ToBytes(), - Name = _cypherSystemCore.Node.Name.ToBytes(), - PublicKey = _cypherSystemCore.KeyPair.PublicKey, - Version = Util.GetAssemblyVersion().ToBytes() - }; + return _localNode; } /// /// /// /// - public ReadOnlySequence Reply() + public ReadOnlySequence GetPeers() { var sequence = new Sequence(); UpdateLocalPeerInfo(); - IList discoveryStore = new List { _localPeer }; + IList discoveryStore = _caching.GetItems().ToList(); + discoveryStore.Add(_localPeer); ReadOnlyPeerSequence(ref discoveryStore, ref sequence); return sequence.AsReadOnlySequence; } + /// + /// + /// + /// + /// + /// + private void UpdatePeer(ulong clientId, byte[] ipAddress, Peer peer) + { + _caching.AddOrUpdate(StoreDb.Key(clientId.ToString(), ipAddress), peer); + } + + /// + /// + /// + /// + /// + /// + private static byte[] GetKey(ulong clientId, byte[] ipAddress) + { + return StoreDb.Key(clientId.ToString(), ipAddress); + } + /// /// private void Init() { - var localNode = GetLocalNode(); + _localNode = new LocalNode + { + IpAddress = _cypherSystemCore.Node.EndPoint.Address.ToString().ToBytes(), + Identifier = _cypherSystemCore.KeyPair.PublicKey.ToHashIdentifier(), + TcpPort = _cypherSystemCore.Node.Network.P2P.TcpPort.ToBytes(), + WsPort = _cypherSystemCore.Node.Network.P2P.WsPort.ToBytes(), + HttpPort = _cypherSystemCore.Node.Network.HttpPort.ToBytes(), + HttpsPort = _cypherSystemCore.Node.Network.HttpsPort.ToBytes(), + Name = _cypherSystemCore.Node.Name.ToBytes(), + PublicKey = _cypherSystemCore.KeyPair.PublicKey, + Version = Util.GetAssemblyVersion().ToBytes() + }; _localPeer = new Peer { - IpAddress = localNode.IpAddress, - HttpPort = localNode.HttpPort, - HttpsPort = localNode.HttpsPort, - ClientId = localNode.PublicKey.ToHashIdentifier(), - TcpPort = localNode.TcpPort, - WsPort = localNode.WsPort, - Name = localNode.Name, - PublicKey = localNode.PublicKey, - Version = localNode.Version + IpAddress = _localNode.IpAddress, + HttpPort = _localNode.HttpPort, + HttpsPort = _localNode.HttpsPort, + ClientId = _localNode.PublicKey.ToHashIdentifier(), + TcpPort = _localNode.TcpPort, + WsPort = _localNode.WsPort, + Name = _localNode.Name, + PublicKey = _localNode.PublicKey, + Version = _localNode.Version }; _seedNodes = new RemoteNode[_cypherSystemCore.Node.Network.SeedList.Count]; foreach (var seedNode in _cypherSystemCore.Node.Network.SeedList.WithIndex()) @@ -220,7 +238,7 @@ private Task ReceiverAsync() if (!Monitor.TryEnter(LockOnReady)) return; try { - TryBootstrap(); + if (_seedNodes.Length != 0) TryBootstrap(); if (_caching.Count == 0) return; OnReadyAsync().Wait(); } @@ -243,9 +261,9 @@ private async Task BootstrapSeedsAsync() IList discoveryStore = new List { _localPeer }; var parameter = new Parameter[] { - new() { ProtocolCommand = ProtocolCommand.UpdatePeers, Value = (await P2PDeviceApi.SerializeAsync(discoveryStore)).ToArray() } + new() { ProtocolCommand = ProtocolCommand.UpdatePeers, Value = MessagePackSerializer.Serialize(discoveryStore) } }; - var readOnlySequenceMsg = await P2PDeviceApi.SerializeAsync(parameter); + var msg = MessagePackSerializer.Serialize(parameter); for (var index = 0; index < _seedNodes.Length; index++) { var i = index; @@ -255,11 +273,8 @@ private async Task BootstrapSeedsAsync() try { var _ = await _cypherSystemCore.P2PDeviceReq().SendAsync(seedNode.IpAddress, - seedNode.TcpPort, - seedNode.PublicKey, - readOnlySequenceMsg.IsSingleSegment - ? readOnlySequenceMsg.First - : readOnlySequenceMsg.ToArray(), false); + seedNode.TcpPort, seedNode.PublicKey, + msg); } catch (Exception ex) { @@ -298,20 +313,42 @@ private async Task OnReadyAsync() discoveryStore.Add(_localPeer); var parameter = new Parameter[] { - new() { ProtocolCommand = ProtocolCommand.UpdatePeers, Value = (await P2PDeviceApi.SerializeAsync(discoveryStore)).ToArray() } + new() { ProtocolCommand = ProtocolCommand.UpdatePeers, Value = MessagePackSerializer.Serialize(discoveryStore) } }; - var readOnlySequenceMsg = await P2PDeviceApi.SerializeAsync(parameter); - foreach (var peer in discoveryStore) + var msg = MessagePackSerializer.Serialize(parameter); + for (var index = 0; index < discoveryStore.Count; index++) { + var peer = discoveryStore[index]; + if (peer.ClientId == _localPeer.ClientId) continue; + var storePeer = peer; try { - if (await _cypherSystemCore.P2PDeviceReq().SendAsync(peer.IpAddress, peer.TcpPort, peer.PublicKey, - readOnlySequenceMsg.IsSingleSegment ? readOnlySequenceMsg.First : readOnlySequenceMsg.ToArray(), - false) is null) + if (await _cypherSystemCore.P2PDeviceReq().SendAsync(storePeer.IpAddress, storePeer.TcpPort, + storePeer.PublicKey, msg) is not null) { - _caching.Remove(peer.IpAddress); + if (storePeer.Retries == 0) continue; + storePeer.Retries = 0; + UpdatePeer(storePeer.ClientId, storePeer.IpAddress, storePeer); + } + else + { + if (storePeer.Retries >= 30) + { + SetPeerCooldown(new PeerCooldown + { + IpAddress = peer.IpAddress, + PublicKey = peer.PublicKey, + ClientId = peer.ClientId, + PeerState = PeerState.Unreachable + }); + _caching.Remove(GetKey(peer.ClientId, peer.IpAddress)); + } + else + { + storePeer.Retries++; + UpdatePeer(storePeer.ClientId, storePeer.IpAddress, storePeer); + } } - } catch (Exception ex) { @@ -334,21 +371,28 @@ public async Task ReceivedPeersAsync(ReadOnlyMemory msg) var elementSequence = await reader.ReadAsync(CancellationToken.None); if (elementSequence == null) continue; var peer = MessagePackSerializer.Deserialize(elementSequence.Value); + if (peer.ClientId == _localPeer.ClientId) continue; #if !DEBUG if (!IsAcceptedAddress(peer.IpAddress)) return; #endif - if (!_caching.TryGet(peer.IpAddress, out var cachedPeer)) + var key = GetKey(peer.ClientId, peer.IpAddress); + if (!_caching.TryGet(key, out var cachedPeer)) { - _caching.AddOrUpdate(peer.IpAddress, peer); + if (_peerCooldownCaching[key].IsDefault()) + { + UpdatePeer(peer.ClientId, peer.IpAddress, peer); + } } else if (cachedPeer.BlockCount != peer.BlockCount) { - _caching.AddOrUpdate(peer.IpAddress, peer); + peer.Retries = cachedPeer.Retries; + UpdatePeer(peer.ClientId, peer.IpAddress, peer); } } } + /// /// /// @@ -386,9 +430,9 @@ public void TryBootstrap() /// public void SetPeerCooldown(PeerCooldown peer) { - if (!_peerCooldownCaching.TryGet(peer.IpAddress, out _)) + if (!_peerCooldownCaching.TryGet(GetKey(peer.ClientId, peer.IpAddress), out _)) { - _peerCooldownCaching.AddOrUpdate(peer.IpAddress, peer); + _peerCooldownCaching.AddOrUpdate(StoreDb.Key(peer.ClientId.ToString(), peer.IpAddress), peer); } } @@ -397,7 +441,7 @@ public void SetPeerCooldown(PeerCooldown peer) /// private void HandlePeerCooldown() { - _coolDownDisposable = Observable.Interval(TimeSpan.FromMinutes(30)).Subscribe(_ => + _coolDownDisposable = Observable.Interval(TimeSpan.FromMinutes(10)).Subscribe(_ => { if (_cypherSystemCore.ApplicationLifetime.ApplicationStopping.IsCancellationRequested) return; try @@ -405,7 +449,7 @@ private void HandlePeerCooldown() var removePeersCooldown = AsyncHelper.RunSync(async delegate { - var removePeerCooldownBeforeTimestamp = Util.GetUtcNow().AddMinutes(-30).ToUnixTimestamp(); + var removePeerCooldownBeforeTimestamp = Util.GetUtcNow().AddMinutes(-10).ToUnixTimestamp(); return await _peerCooldownCaching.WhereAsync(x => new ValueTask(x.Value.Timestamp < removePeerCooldownBeforeTimestamp)); }); diff --git a/core/Network/PeerState.cs b/core/Network/PeerState.cs index e729206f..5be0a64f 100644 --- a/core/Network/PeerState.cs +++ b/core/Network/PeerState.cs @@ -8,5 +8,8 @@ public enum PeerState : byte Alive = 0x00, Dead = 0x01, Suspicious = 0x02, - Retry = 0x03 + Retry = 0x03, + Unreachable = 0x04, + DupBlocks = 0x05, + Ready = 0x99 } \ No newline at end of file diff --git a/core/Persistence/HashChainRepository.cs b/core/Persistence/HashChainRepository.cs index e65a372c..9376fd00 100644 --- a/core/Persistence/HashChainRepository.cs +++ b/core/Persistence/HashChainRepository.cs @@ -7,6 +7,7 @@ using System.Threading; using System.Threading.Tasks; using CypherNetwork.Extensions; +using CypherNetwork.Helper; using CypherNetwork.Models; using Dawn; using MessagePack; @@ -20,6 +21,8 @@ public interface IHashChainRepository : IRepository { ValueTask> OrderByRangeAsync(Func selector, int skip, int take); new Task PutAsync(byte[] key, Block data); + ulong Height { get; } + ulong Count { get; } } /// @@ -41,8 +44,20 @@ public HashChainRepository(IStoreDb storeDb, ILogger logger) _logger = logger.ForContext("SourceContext", nameof(HashChainRepository)); SetTableName(StoreDb.HashChainTable.ToString()); + Height = (ulong)AsyncHelper.RunSync(GetBlockHeightAsync); + Count = Height + 1; } + /// + /// + /// + public ulong Height { get; private set; } + + /// + /// + /// + public ulong Count { get; private set; } + /// /// /// @@ -58,8 +73,10 @@ public new Task PutAsync(byte[] key, Block data) using (_sync.Write()) { var cf = _storeDb.Rocks.GetColumnFamily(GetTableNameAsString()); - _storeDb.Rocks.Put(StoreDb.Key(StoreDb.HashChainTable.ToString(), key), MessagePackSerializer.Serialize(data), - cf); + _storeDb.Rocks.Put(StoreDb.Key(StoreDb.HashChainTable.ToString(), key), + MessagePackSerializer.Serialize(data), cf); + Height = data.Height; + Count++; return Task.FromResult(true); } } diff --git a/core/Persistence/Repository.cs b/core/Persistence/Repository.cs index e4513050..6beeee84 100644 --- a/core/Persistence/Repository.cs +++ b/core/Persistence/Repository.cs @@ -37,7 +37,7 @@ public interface IRepository ValueTask> SkipAsync(int skip); ValueTask> TakeAsync(int take); bool Delete(byte[] key); - Task> TakeLongAsync(long take); + Task> TakeLongAsync(ulong take); IAsyncEnumerable IterateAsync(); } @@ -444,7 +444,7 @@ public ValueTask> TakeAsync(int take) /// /// /// - public async Task> TakeLongAsync(long take) + public async Task> TakeLongAsync(ulong take) { Guard.Argument(take, nameof(take)).NotNegative(); IList entries = new List(); @@ -453,7 +453,7 @@ public async Task> TakeLongAsync(long take) using (_sync.Read()) { take = take == 0 ? 1 : take; - var iTake = 0; + ulong iTake = 0; var cf = _storeDb.Rocks.GetColumnFamily(_tableName); using var iterator = _storeDb.Rocks.NewIterator(cf, _readOptions); for (iterator.SeekToFirst(); iterator.Valid(); iterator.Next()) diff --git a/core/Wallet/WalletSession.cs b/core/Wallet/WalletSession.cs index 649e4e83..388f628c 100644 --- a/core/Wallet/WalletSession.cs +++ b/core/Wallet/WalletSession.cs @@ -247,10 +247,9 @@ private void HandleSafeguardBlocks() { if (_cypherSystemCore.ApplicationLifetime.ApplicationStopping.IsCancellationRequested) return; - var unitOfWork = _cypherSystemCore.UnitOfWork(); - var height = (await _cypherSystemCore.Graph().GetBlockHeightAsync()).Count - 147; - height = height < 0 ? 0 : height; - var blocks = await unitOfWork.HashChainRepository.OrderByRangeAsync(x => x.Height, (int)height, 147); + var hashChainRepository = _cypherSystemCore.UnitOfWork().HashChainRepository; + var height = hashChainRepository.Height - 147; + var blocks = await hashChainRepository.OrderByRangeAsync(x => x.Height, (int)height, 147); if (!blocks.Any()) return; lock (Locking) diff --git a/core/core.csproj b/core/core.csproj index 87bf42a9..214cfa4e 100644 --- a/core/core.csproj +++ b/core/core.csproj @@ -4,7 +4,7 @@ net6.0 AnyCPU;x64 CypherNetwork - 0.0.71.0 + 0.0.72.0 CypherNetwork core cyphernetworkcore diff --git a/node/Configuration/Utility.cs b/node/Configuration/Utility.cs index 1b6467d5..40d76c7b 100644 --- a/node/Configuration/Utility.cs +++ b/node/Configuration/Utility.cs @@ -57,7 +57,7 @@ public Utility(Config config) HttpPort(); WriteDivider("Auto Sync Time"); AutoSyncTime(); - var jsonWriteOptions = new JsonSerializerOptions { WriteIndented = true }; + var jsonWriteOptions = new JsonSerializerOptions { WriteIndented = true, DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull }; jsonWriteOptions.Converters.Add(new JsonStringEnumConverter()); var newJson = JsonSerializer.Serialize(config, jsonWriteOptions); var appSettingsPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "appsettings.json"); diff --git a/node/Program.cs b/node/Program.cs index acba6920..d593805b 100644 --- a/node/Program.cs +++ b/node/Program.cs @@ -16,6 +16,7 @@ using CypherNetwork.Models; using CypherNetworkNode.Configuration; using Microsoft.AspNetCore.DataProtection.XmlEncryption; +using Log = Serilog.Log; namespace CypherNetworkNode; @@ -71,7 +72,7 @@ public static async Task Main(string[] args) { throw new Exception($"No \"{logSectionName}\" section found in appsettings.json"); } - + Console.ForegroundColor = ConsoleColor.DarkMagenta; Console.WriteLine(@$" ______ __ __ diff --git a/node/appsettings.json b/node/appsettings.json index c26db4df..e8461521 100644 --- a/node/appsettings.json +++ b/node/appsettings.json @@ -17,10 +17,10 @@ "WsPort": "7947" }, "SeedList": [ - "127.0.0.1:7948" + "167.99.81.173:7946" ], "SeedListPublicKeys": [ - "053D7AD576B975BEB29FC52A17B71A33E8D669C111DB3E41EC8AA317EC140E3F20" + "05F4C195C979EDB25C5FF3078AB537E74AED015D5196689E728355005F20DB4E29" ], "Environment": "testnet", "SigningKeyRingName": "DefaultSigning.cyp3.Key", diff --git a/node/node.csproj b/node/node.csproj index a8656d2b..ee5350a3 100644 --- a/node/node.csproj +++ b/node/node.csproj @@ -2,7 +2,7 @@ true - 0.0.71.0 + 0.0.72.0 en node