/
SocketNetworkClientFactory.cs
141 lines (127 loc) · 6.25 KB
/
SocketNetworkClientFactory.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Test.It.With.Amqp.Protocol;
using Test.It.With.Amqp.System;
namespace Test.It.With.Amqp.NetworkClient
{
internal class SocketNetworkClientFactory
{
private readonly IProtocolResolver _protocolResolver;
private readonly IConfiguration _configuration;
private readonly Func<AmqpConnectionSession, IDisposable> _subscribe;
private readonly CancellationTokenSource _cancellationTokenSource =
new CancellationTokenSource();
public SocketNetworkClientFactory(
IProtocolResolver protocolResolver,
IConfiguration configuration,
Func<AmqpConnectionSession, IDisposable> subscribe)
{
_protocolResolver = protocolResolver;
_configuration = configuration;
_subscribe = subscribe;
}
public ClientSessions StartReceivingClients(INetworkClientServer networkClientServer)
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(_cancellationTokenSource.Token);
var token = cts.Token;
var activeSessions = new ConcurrentDictionary<ConnectionId, DisconnectSessionAsync>();
var disconnectedSessions = new ConcurrentQueue<ConnectionId>();
var disconnectedSessionSignaler = new SemaphoreSlim(0);
// ReSharper disable once MethodSupportsCancellation
// Will be handled by the disposable returned
var clientReceivingTask = Task.Run(
async () =>
{
while (cts.IsCancellationRequested == false)
{
try
{
var client = await networkClientServer
.WaitForConnectedClientAsync(token)
.ConfigureAwait(false);
var session = new AmqpConnectionSession(_protocolResolver, _configuration, client);
var unsubscribe = _subscribe(session);
var signalDisconnect = new SemaphoreSlim(0);
client.Disconnected += SignalDisconnectOnStart;
var receiver = client.StartReceiving();
activeSessions.TryAdd(session.ConnectionId, Disconnect);
client.Disconnected += OnClientDisconnected;
// Disconnection happened before we could start accept disconnections
if (signalDisconnect.CurrentCount > 0)
{
OnClientDisconnected(this, EventArgs.Empty);
}
client.Disconnected -= SignalDisconnectOnStart;
void SignalDisconnectOnStart(object sender, EventArgs args)
{
signalDisconnect.Release();
}
async ValueTask Disconnect(CancellationToken _)
{
// Dispose in reverse dependency order
await receiver.DisposeAsync()
.ConfigureAwait(false);
unsubscribe.Dispose();
session.Dispose();
client.Dispose();
}
void OnClientDisconnected(object sender, EventArgs args)
{
disconnectedSessions.Enqueue(session.ConnectionId);
disconnectedSessionSignaler.Release();
}
}
catch when (cts.IsCancellationRequested)
{
return;
}
}
});
// ReSharper disable once MethodSupportsCancellation
// Will be handled by the disposable returned
var clientsDisconnectingTask = Task.Run(
async () =>
{
while (cts.IsCancellationRequested == false)
{
try
{
await disconnectedSessionSignaler.WaitAsync(token)
.ConfigureAwait(false);
if (!disconnectedSessions.TryDequeue(out var disconnectedSessionId))
{
throw new InvalidOperationException(
"Got signal about disconnect session but no disconnected session found");
}
if (activeSessions.TryRemove(disconnectedSessionId, out var disconnectSessionAsync))
{
await disconnectSessionAsync(token)
.ConfigureAwait(false);
}
}
catch when (cts.IsCancellationRequested)
{
return;
}
}
}
);
return new ClientSessions(activeSessions, new AsyncDisposableAction(async () =>
{
cts.Cancel();
await Task.WhenAll(clientReceivingTask, clientsDisconnectingTask)
.ConfigureAwait(false);
await activeSessions.Keys.Select(id =>
activeSessions.TryRemove(id, out var disconnectSessionAsync)
? disconnectSessionAsync(CancellationToken.None)
: new ValueTask())
.WhenAllAsync()
.ConfigureAwait(false);
cts.Dispose();
}));
}
}
}