-
Notifications
You must be signed in to change notification settings - Fork 425
/
Subscription.cs
79 lines (70 loc) · 2.53 KB
/
Subscription.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// SPDX-FileCopyrightText: 2022 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only
using System;
using System.Threading.Channels;
using System.Threading.Tasks;
using Nethermind.Logging;
namespace Nethermind.JsonRpc.Modules.Subscribe
{
public abstract class Subscription : IDisposable
{
protected ILogger _logger;
protected Subscription(IJsonRpcDuplexClient jsonRpcDuplexClient)
{
Id = string.Concat("0x", Guid.NewGuid().ToString("N"));
JsonRpcDuplexClient = jsonRpcDuplexClient;
ProcessMessages();
}
public string Id { get; }
public abstract string Type { get; }
public IJsonRpcDuplexClient JsonRpcDuplexClient { get; }
private Channel<Action> SendChannel { get; } = Channel.CreateUnbounded<Action>(new UnboundedChannelOptions() { SingleReader = true });
public virtual void Dispose()
{
SendChannel.Writer.Complete();
}
protected JsonRpcResult CreateSubscriptionMessage(object result)
{
return JsonRpcResult.Single(
new JsonRpcSubscriptionResponse()
{
Params = new JsonRpcSubscriptionResult()
{
Result = result,
Subscription = Id
}
}, default);
}
protected void ScheduleAction(Action action)
{
SendChannel.Writer.TryWrite(action);
}
protected string GetErrorMsg() => $"{Type} subscription with ID {Id} failed.";
private void ProcessMessages()
{
Task.Factory.StartNew(async () =>
{
while (await SendChannel.Reader.WaitToReadAsync())
{
while (SendChannel.Reader.TryRead(out Action action))
{
try
{
action();
}
catch (Exception e)
{
if (_logger.IsDebug) _logger.Debug($"{GetErrorMsg()} With exception {e}");
}
}
}
}, TaskCreationOptions.LongRunning).ContinueWith(t =>
{
if (t.IsFaulted)
{
if (_logger.IsError) _logger.Error($"{GetErrorMsg()} {nameof(ProcessMessages)} encountered an exception.", t.Exception);
}
});
}
}
}