Skip to content

Commit

Permalink
fix: task executes before its completed
Browse files Browse the repository at this point in the history
  • Loading branch information
pingpongsneak committed Oct 11, 2022
1 parent d0102d1 commit f46e02a
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 140 deletions.
102 changes: 94 additions & 8 deletions core/Network/P2PDevice.cs
Expand Up @@ -2,6 +2,7 @@
// To view a copy of this license, visit https://creativecommons.org/licenses/by-nc-nd/4.0

using System;
using System.Buffers;
using System.Collections.Generic;
using System.Net;
using System.Reactive.Linq;
Expand Down Expand Up @@ -140,7 +141,7 @@ private Task ListeningAsync(IPEndPoint ipEndPoint, Transport transport, int work
try
{
_repSocket = NngFactorySingleton.Instance.Factory.ReplierOpen()
.ThenListen($"{GetTransportType(transport)}://{ipEndPoint.Address.ToString()}:{ipEndPoint.Port}").Unwrap();
.ThenListen($"{GetTransportType(transport)}://{ipEndPoint.Address.ToString()}:{ipEndPoint.Port}", Defines.NngFlag.NNG_FLAG_NONBLOCK).Unwrap();
_repSocket.SetOpt(Defines.NNG_OPT_RECVMAXSZ, 20000000);
for (var i = 0; i < workerCount; i++)
{
Expand All @@ -150,8 +151,7 @@ private Task ListeningAsync(IPEndPoint ipEndPoint, Transport transport, int work
if (_cypherSystemCore.ApplicationLifetime.ApplicationStopping.IsCancellationRequested) return;
try
{
var p2PDeviceWorker = new P2PDeviceWorker(_cypherSystemCore, ctx, _logger);
p2PDeviceWorker.WorkerAsync().Wait();
WorkerAsync(ctx).Wait();
}
catch (AggregateException)
{
Expand All @@ -168,12 +168,98 @@ private Task ListeningAsync(IPEndPoint ipEndPoint, Transport transport, int work
return Task.CompletedTask;
}

/// <summary>
///
/// </summary>
/// <param name="ctx"></param>
private async Task WorkerAsync(IRepReqAsyncContext<INngMsg> ctx)
{
var nngResult = (await ctx.Receive()).Unwrap();
try
{
var message = await _cypherSystemCore.P2PDevice().DecryptAsync(nngResult);
if (message.Memory.Length == 0)
{
await EmptyReplyAsync(ctx);
return;
}

var unwrapMessage = await UnWrapAsync(message.Memory);
if (unwrapMessage.ProtocolCommand != ProtocolCommand.NotFound)
{
var newMsg = NngFactorySingleton.Instance.Factory.CreateMessage();
try
{
var response =
await _cypherSystemCore.P2PDeviceApi().Commands[(int)unwrapMessage.ProtocolCommand](
unwrapMessage.Parameters);
if (unwrapMessage.ProtocolCommand == ProtocolCommand.UpdatePeers)
{
await EmptyReplyAsync(ctx);
return;
}

var cipher = _cypherSystemCore.Crypto().BoxSeal(
response.IsSingleSegment ? response.First.Span : response.ToArray(), message.PublicKey);
if (cipher.Length != 0)
{
await using var packetStream = Util.Manager.GetStream() as RecyclableMemoryStream;
packetStream.Write(_cypherSystemCore.KeyPair.PublicKey[1..33].WrapLengthPrefix());
packetStream.Write(cipher.WrapLengthPrefix());
foreach (var memory in packetStream.GetReadOnlySequence()) newMsg.Append(memory.Span);
(await ctx.Reply(newMsg)).Unwrap();
return;
}
}
catch (MessagePackSerializationException)
{
// Ignore
}
catch (AccessViolationException ex)
{
_logger.Here().Fatal("{@Message}", ex.Message);
}
catch (Exception ex)
{
_logger.Here().Fatal("{@Message}", ex.Message);
}
finally
{
newMsg.Dispose();
}
}

await EmptyReplyAsync(ctx);
}
finally
{
nngResult.Dispose();
}
}

/// <summary>
///
/// </summary>
/// <param name="ctx"></param>
private static async Task EmptyReplyAsync(IRepReqAsyncContext<INngMsg> ctx)
{
try
{
var newMsg = NngFactorySingleton.Instance.Factory.CreateMessage();
(await ctx.Reply(newMsg)).Unwrap();
}
catch (Exception)
{
// Ignore
}
}

/// <summary>
///
/// </summary>
/// <param name="msg"></param>
/// <returns></returns>
public static async Task<UnwrapMessage> UnWrapAsync(ReadOnlyMemory<byte> msg)
private static async Task<UnwrapMessage> UnWrapAsync(ReadOnlyMemory<byte> msg)
{
try
{
Expand All @@ -184,13 +270,13 @@ public static async Task<UnwrapMessage> UnWrapAsync(ReadOnlyMemory<byte> msg)
return new UnwrapMessage(parameters, command);
}
}
catch (ArgumentOutOfRangeException)
catch (ArgumentOutOfRangeException ex)
{
// Ignore
Console.WriteLine("ArgumentOutOfRangeException: " + ex.Message);
}
catch (Exception)
catch (Exception ex)
{
// Ignore
Console.WriteLine("Exception: " + ex);
}

return default;
Expand Down
8 changes: 3 additions & 5 deletions core/Network/P2PDeviceApi.cs
Expand Up @@ -96,7 +96,7 @@ private async Task<ReadOnlySequence<byte>> OnGetPeerAsync(Parameter[] none = def
/// <returns></returns>
private Task<ReadOnlySequence<byte>> OnGetPeersAsync(Parameter[] none = default)
{
var nodePeersResponse = _cypherSystemCore.PeerDiscovery().Reply();
var nodePeersResponse = _cypherSystemCore.PeerDiscovery().GetPeers();
return Task.FromResult(nodePeersResponse);
}

Expand Down Expand Up @@ -141,16 +141,14 @@ private async Task<ReadOnlySequence<byte>> OnSaveBlockAsync(Parameter[] paramete
/// </summary>
private async Task<ReadOnlySequence<byte>> OnGetBlockHeightAsync(Parameter[] none = default)
{
var blockHeightResponse = await _cypherSystemCore.Graph().GetBlockHeightAsync();
return await SerializeAsync(blockHeightResponse);
return await SerializeAsync(new BlockHeightResponse(_cypherSystemCore.UnitOfWork().HashChainRepository.Height));
}

/// <summary>
/// </summary>
private async Task<ReadOnlySequence<byte>> OnGetBlockCountAsync(Parameter[] none = default)
{
var blockCountResponse = await _cypherSystemCore.Graph().GetBlockCountAsync();
return await SerializeAsync(blockCountResponse);
return await SerializeAsync(new BlockCountResponse(_cypherSystemCore.UnitOfWork().HashChainRepository.Count));
}

/// <summary>
Expand Down
127 changes: 0 additions & 127 deletions core/Network/P2PDeviceWorker.cs

This file was deleted.

0 comments on commit f46e02a

Please sign in to comment.