Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #67 from Madlord/master

Dispose Threads
  • Loading branch information...
commit f70d32c7c3f2ab61ff898c8ab965bbe7cf388021 2 parents 3b49d0c + adc784a
@ajacksified ajacksified authored
View
4 src/Alchemy/Classes/Context.cs
@@ -134,7 +134,9 @@ public void Dispose()
{
// skip
}
- }
+ }
+ SendReady.Release();
+ ReceiveReady.Release();
}
#endregion
View
30 src/Alchemy/Handlers/Handler.cs
@@ -13,17 +13,22 @@ namespace Alchemy.Handlers
/// When the protocol has not yet been determined the system defaults to this request handler.
/// Singleton, just like the other handlers.
/// </summary>
- public class Handler
+ public class Handler : IDisposable
{
private static Handler _instance;
protected static object createLock = new object();
internal IAuthentication Authentication;
- private Thread[] ProcessSendThreads = new Thread[Environment.ProcessorCount];
+ private Thread[] ProcessSendThreads = new Thread[Environment.ProcessorCount];
private ConcurrentQueue<HandlerMessage> MessageQueue { get; set; }
+ /// <summary>
+ /// Cancellation of threads if disposing
+ /// </summary>
+ private static CancellationTokenSource cancellation = new CancellationTokenSource();
+
protected Handler() {
MessageQueue = new ConcurrentQueue<HandlerMessage>();
@@ -119,11 +124,12 @@ public void ProcessHeader(Context context)
private void ProcessSend()
{
- while (true)
+ while (!cancellation.IsCancellationRequested)
{
while (MessageQueue.IsEmpty)
{
Thread.Sleep(10);
+ if (cancellation.IsCancellationRequested) return;
}
HandlerMessage message;
@@ -140,7 +146,15 @@ private void ProcessSend()
private void Send(HandlerMessage message)
{
message.Context.SendEventArgs.UserToken = message;
- message.Context.SendReady.Wait();
+
+ try
+ {
+ message.Context.SendReady.Wait(cancellation.Token);
+ }
+ catch (OperationCanceledException)
+ {
+ return;
+ }
try
{
@@ -206,5 +220,11 @@ private class HandlerMessage
public Boolean IsRaw { get; set;}
public Boolean DoClose { get; set;}
}
+
+ public void Dispose()
+ {
+ cancellation.Cancel();
+ }
+
}
-}
+}
View
2  src/Alchemy/TCPServer.cs
@@ -5,7 +5,7 @@
namespace Alchemy
{
- public abstract class TcpServer
+ public abstract class TcpServer : IDisposable
{
/// <summary>
/// This Semaphore protects our clients variable on increment/decrement when a user connects/disconnects.
View
15 src/Alchemy/WebSocketClient.cs
@@ -10,7 +10,7 @@
namespace Alchemy
{
- public class WebSocketClient
+ public class WebSocketClient : IDisposable
{
public TimeSpan ConnectTimeout = new TimeSpan(0, 0, 0, 10);
public bool IsAuthenticated;
@@ -35,6 +35,7 @@ public class WebSocketClient
private readonly string _host;
private static Thread[] ClientThreads = new Thread[Environment.ProcessorCount];
+ private static CancellationTokenSource cancellation = new CancellationTokenSource();
private static Queue<Context> NewClients { get; set; }
private static Dictionary<Context, WebSocketClient> ContextMapping { get; set; }
@@ -67,13 +68,14 @@ static WebSocketClient()
private static void HandleClientThread()
{
- while (true)
+ while (!cancellation.IsCancellationRequested)
{
Context context = null;
while (NewClients.Count == 0)
{
Thread.Sleep(10);
+ if (cancellation.IsCancellationRequested) return;
}
lock (NewClients)
@@ -367,5 +369,12 @@ public void Send(byte[] data)
{
_context.UserContext.Send(data);
}
+
+ public void Dispose()
+ {
+ cancellation.Cancel();
+ Handler.Instance.Dispose();
+ }
+
}
-}
+}
View
42 src/Alchemy/WebSocketServer.cs
@@ -20,6 +20,8 @@ public class WebSocketServer : TcpServer, IDisposable
private static Thread[] ClientThreads = new Thread[Environment.ProcessorCount];
private static Thread CleanupThread;
+ private static CancellationTokenSource cancellation = new CancellationTokenSource();
+
private static ConcurrentQueue<Context> ContextQueue { get; set; }
private static Dictionary<Context, WebSocketServer> ContextMapping { get; set; }
@@ -44,13 +46,14 @@ static WebSocketServer()
private static void HandleClientThread()
{
- while (true)
+ while (!cancellation.Token.IsCancellationRequested)
{
Context context;
while (ContextQueue.Count == 0)
{
Thread.Sleep(10);
+ if (cancellation.Token.IsCancellationRequested) return;
}
if (!ContextQueue.TryDequeue(out context))
@@ -72,7 +75,7 @@ private static void HandleClientThread()
private static void HandleContextCleanupThread()
{
- while (true)
+ while (!cancellation.IsCancellationRequested)
{
Thread.Sleep(100);
@@ -85,6 +88,8 @@ private static void HandleContextCleanupThread()
foreach (var connection in currentConnections)
{
+ if (cancellation.IsCancellationRequested) break;
+
if (!connection.Connected)
{
lock (CurrentConnections)
@@ -279,26 +284,30 @@ private void SetupContext(Context _context)
}
private void StartReceive(Context _context)
{
- if (_context.ReceiveReady.Wait(TimeOut))
+ try
{
- try
+ if (_context.ReceiveReady.Wait(TimeOut, cancellation.Token))
{
- if (!_context.Connection.Client.ReceiveAsync(_context.ReceiveEventArgs))
+ try
+ {
+ if (!_context.Connection.Client.ReceiveAsync(_context.ReceiveEventArgs))
+ {
+ ReceiveEventArgs_Completed(_context.Connection.Client, _context.ReceiveEventArgs);
+ }
+ }
+ catch (SocketException ex)
{
- ReceiveEventArgs_Completed(_context.Connection.Client, _context.ReceiveEventArgs);
+ //logger.Error("SocketException in ReceieveAsync", ex);
+ _context.Disconnect();
}
}
- catch (SocketException ex)
+ else
{
- //logger.Error("SocketException in ReceieveAsync", ex);
+ //logger.Error("Timeout waiting for ReceiveReady");
_context.Disconnect();
}
}
- else
- {
- //logger.Error("Timeout waiting for ReceiveReady");
- _context.Disconnect();
- }
+ catch (OperationCanceledException) { }
}
void ReceiveEventArgs_Completed(object sender, SocketAsyncEventArgs e)
{
@@ -322,5 +331,12 @@ void ReceiveEventArgs_Completed(object sender, SocketAsyncEventArgs e)
context.ReceiveReady.Release();
}
}
+
+ public void Dispose()
+ {
+ cancellation.Cancel();
+ base.Dispose();
+ Handler.Instance.Dispose();
+ }
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.