-
Notifications
You must be signed in to change notification settings - Fork 180
/
AsyncTcpSocketServer.cs
315 lines (269 loc) · 10.7 KB
/
AsyncTcpSocketServer.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Logrila.Logging;
namespace Cowboy.Sockets
{
public class AsyncTcpSocketServer
{
#region Fields
private static readonly ILog _log = Logger.Get<AsyncTcpSocketServer>();
private TcpListener _listener;
private readonly ConcurrentDictionary<string, AsyncTcpSocketSession> _sessions = new ConcurrentDictionary<string, AsyncTcpSocketSession>();
private readonly IAsyncTcpSocketServerEventDispatcher _dispatcher;
private readonly AsyncTcpSocketServerConfiguration _configuration;
private int _state;
private const int _none = 0;
private const int _listening = 1;
private const int _disposed = 5;
#endregion
#region Constructors
public AsyncTcpSocketServer(int listenedPort, IAsyncTcpSocketServerEventDispatcher dispatcher, AsyncTcpSocketServerConfiguration configuration = null)
: this(IPAddress.Any, listenedPort, dispatcher, configuration)
{
}
public AsyncTcpSocketServer(IPAddress listenedAddress, int listenedPort, IAsyncTcpSocketServerEventDispatcher dispatcher, AsyncTcpSocketServerConfiguration configuration = null)
: this(new IPEndPoint(listenedAddress, listenedPort), dispatcher, configuration)
{
}
public AsyncTcpSocketServer(IPEndPoint listenedEndPoint, IAsyncTcpSocketServerEventDispatcher dispatcher, AsyncTcpSocketServerConfiguration configuration = null)
{
if (listenedEndPoint == null)
throw new ArgumentNullException("listenedEndPoint");
if (dispatcher == null)
throw new ArgumentNullException("dispatcher");
this.ListenedEndPoint = listenedEndPoint;
_dispatcher = dispatcher;
_configuration = configuration ?? new AsyncTcpSocketServerConfiguration();
if (_configuration.BufferManager == null)
throw new InvalidProgramException("The buffer manager in configuration cannot be null.");
if (_configuration.FrameBuilder == null)
throw new InvalidProgramException("The frame handler in configuration cannot be null.");
}
public AsyncTcpSocketServer(
int listenedPort,
Func<AsyncTcpSocketSession, byte[], int, int, Task> onSessionDataReceived = null,
Func<AsyncTcpSocketSession, Task> onSessionStarted = null,
Func<AsyncTcpSocketSession, Task> onSessionClosed = null,
AsyncTcpSocketServerConfiguration configuration = null)
: this(IPAddress.Any, listenedPort, onSessionDataReceived, onSessionStarted, onSessionClosed, configuration)
{
}
public AsyncTcpSocketServer(
IPAddress listenedAddress, int listenedPort,
Func<AsyncTcpSocketSession, byte[], int, int, Task> onSessionDataReceived = null,
Func<AsyncTcpSocketSession, Task> onSessionStarted = null,
Func<AsyncTcpSocketSession, Task> onSessionClosed = null,
AsyncTcpSocketServerConfiguration configuration = null)
: this(new IPEndPoint(listenedAddress, listenedPort), onSessionDataReceived, onSessionStarted, onSessionClosed, configuration)
{
}
public AsyncTcpSocketServer(
IPEndPoint listenedEndPoint,
Func<AsyncTcpSocketSession, byte[], int, int, Task> onSessionDataReceived = null,
Func<AsyncTcpSocketSession, Task> onSessionStarted = null,
Func<AsyncTcpSocketSession, Task> onSessionClosed = null,
AsyncTcpSocketServerConfiguration configuration = null)
: this(listenedEndPoint,
new DefaultAsyncTcpSocketServerEventDispatcher(onSessionDataReceived, onSessionStarted, onSessionClosed),
configuration)
{
}
#endregion
#region Properties
public IPEndPoint ListenedEndPoint { get; private set; }
public bool IsListening { get { return _state == _listening; } }
public int SessionCount { get { return _sessions.Count; } }
#endregion
#region Server
public void Listen()
{
int origin = Interlocked.CompareExchange(ref _state, _listening, _none);
if (origin == _disposed)
{
throw new ObjectDisposedException(GetType().FullName);
}
else if (origin != _none)
{
throw new InvalidOperationException("This tcp server has already started.");
}
try
{
_listener = new TcpListener(this.ListenedEndPoint);
SetSocketOptions();
_listener.Start(_configuration.PendingConnectionBacklog);
Task.Factory.StartNew(async () =>
{
await Accept();
},
TaskCreationOptions.LongRunning)
.Forget();
}
catch (Exception ex) when (!ShouldThrow(ex)) { }
}
public void Shutdown()
{
if (Interlocked.Exchange(ref _state, _disposed) == _disposed)
{
return;
}
try
{
_listener.Stop();
_listener = null;
Task.Factory.StartNew(async () =>
{
try
{
foreach (var session in _sessions.Values)
{
await session.Close(); // parent server close session when shutdown
}
}
catch (Exception ex) when (!ShouldThrow(ex)) { }
},
TaskCreationOptions.PreferFairness)
.Wait();
}
catch (Exception ex) when (!ShouldThrow(ex)) { }
}
private void SetSocketOptions()
{
_listener.AllowNatTraversal(_configuration.AllowNatTraversal);
_listener.Server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, _configuration.ReuseAddress);
}
public bool Pending()
{
if (!IsListening)
throw new InvalidOperationException("The tcp server is not active.");
// determine if there are pending connection requests.
return _listener.Pending();
}
private async Task Accept()
{
try
{
while (IsListening)
{
var tcpClient = await _listener.AcceptTcpClientAsync();
Task.Factory.StartNew(async () =>
{
await Process(tcpClient);
},
TaskCreationOptions.None)
.Forget();
}
}
catch (Exception ex) when (!ShouldThrow(ex)) { }
catch (Exception ex)
{
_log.Error(ex.Message, ex);
}
}
private async Task Process(TcpClient acceptedTcpClient)
{
var session = new AsyncTcpSocketSession(acceptedTcpClient, _configuration, _configuration.BufferManager, _dispatcher, this);
if (_sessions.TryAdd(session.SessionKey, session))
{
_log.DebugFormat("New session [{0}].", session);
try
{
await session.Start();
}
catch (TimeoutException ex)
{
_log.Error(ex.Message, ex);
}
finally
{
AsyncTcpSocketSession throwAway;
if (_sessions.TryRemove(session.SessionKey, out throwAway))
{
_log.DebugFormat("Close session [{0}].", throwAway);
}
}
}
}
private bool ShouldThrow(Exception ex)
{
if (ex is ObjectDisposedException
|| ex is InvalidOperationException
|| ex is SocketException
|| ex is IOException)
{
return false;
}
return true;
}
#endregion
#region Send
public async Task SendToAsync(string sessionKey, byte[] data)
{
await SendToAsync(sessionKey, data, 0, data.Length);
}
public async Task SendToAsync(string sessionKey, byte[] data, int offset, int count)
{
AsyncTcpSocketSession sessionFound;
if (_sessions.TryGetValue(sessionKey, out sessionFound))
{
await sessionFound.SendAsync(data, offset, count);
}
else
{
_log.WarnFormat("Cannot find session [{0}].", sessionKey);
}
}
public async Task SendToAsync(AsyncTcpSocketSession session, byte[] data)
{
await SendToAsync(session, data, 0, data.Length);
}
public async Task SendToAsync(AsyncTcpSocketSession session, byte[] data, int offset, int count)
{
AsyncTcpSocketSession sessionFound;
if (_sessions.TryGetValue(session.SessionKey, out sessionFound))
{
await sessionFound.SendAsync(data, offset, count);
}
else
{
_log.WarnFormat("Cannot find session [{0}].", session);
}
}
public async Task BroadcastAsync(byte[] data)
{
await BroadcastAsync(data, 0, data.Length);
}
public async Task BroadcastAsync(byte[] data, int offset, int count)
{
foreach (var session in _sessions.Values)
{
await session.SendAsync(data, offset, count);
}
}
#endregion
#region Session
public bool HasSession(string sessionKey)
{
return _sessions.ContainsKey(sessionKey);
}
public AsyncTcpSocketSession GetSession(string sessionKey)
{
AsyncTcpSocketSession session = null;
_sessions.TryGetValue(sessionKey, out session);
return session;
}
public async Task CloseSession(string sessionKey)
{
AsyncTcpSocketSession session = null;
if (_sessions.TryGetValue(sessionKey, out session))
{
await session.Close(); // parent server close session by session-key
}
}
#endregion
}
}