From fcc4add103e003aa5de4b98ed932067fb312613d Mon Sep 17 00:00:00 2001 From: David Haney Date: Fri, 3 Oct 2014 00:48:16 -0400 Subject: [PATCH] More robust communication with keep-alive pings --- SimplSockets/SimplSocketClient.cs | 172 ++++++++++++++++-------------- SimplSockets/SimplSocketServer.cs | 139 ++++++++++++++---------- 2 files changed, 171 insertions(+), 140 deletions(-) diff --git a/SimplSockets/SimplSocketClient.cs b/SimplSockets/SimplSocketClient.cs index a4309bc..59fbce3 100644 --- a/SimplSockets/SimplSocketClient.cs +++ b/SimplSockets/SimplSocketClient.cs @@ -140,14 +140,7 @@ public void Connect(EndPoint endPoint) } catch (SocketException ex) { - lock (_isConnectedLock) - { - if (_isConnected) - { - HandleCommunicationError(_socket, ex); - _isConnected = false; - } - } + HandleCommunicationError(_socket, ex); } catch (ObjectDisposedException) { @@ -179,14 +172,7 @@ public void Send(byte[] message) } catch (SocketException ex) { - lock (_isConnectedLock) - { - if (_isConnected) - { - HandleCommunicationError(_socket, ex); - _isConnected = false; - } - } + HandleCommunicationError(_socket, ex); } catch (ObjectDisposedException) { @@ -222,15 +208,7 @@ public byte[] SendReceive(byte[] message) } catch (SocketException ex) { - lock (_isConnectedLock) - { - if (_isConnected) - { - HandleCommunicationError(_socket, ex); - _isConnected = false; - } - } - + HandleCommunicationError(_socket, ex); return null; } catch (ObjectDisposedException) @@ -273,10 +251,7 @@ public void Close() _socket.Close(); // No longer connected - lock (_isConnectedLock) - { - _isConnected = false; - } + _isConnected = false; } /// @@ -322,15 +297,7 @@ private void ConnectCallback(IAsyncResult asyncResult) } catch (SocketException ex) { - lock (_isConnectedLock) - { - if (_isConnected) - { - HandleCommunicationError(_socket, ex); - _isConnected = false; - } - } - + HandleCommunicationError(_socket, ex); return; } catch (ObjectDisposedException) @@ -355,6 +322,9 @@ private void ConnectCallback(IAsyncResult asyncResult) // Post a receive to the socket as the client will be continuously receiving messages to be pushed to the queue _socket.BeginReceive(messageState.Buffer, 0, messageState.Buffer.Length, 0, ReceiveCallback, messageState); + // Spin up the keep-alive + KeepAlive(_socket); + // Process all incoming messages var processMessageState = _messageStatePool.Pop(); processMessageState.Handler = _socket; @@ -362,6 +332,45 @@ private void ConnectCallback(IAsyncResult asyncResult) ProcessReceivedMessage(processMessageState); } + private void KeepAlive(Socket handler) + { + int availableTest = 0; + + // If the socket is disposed, we're done + try + { + availableTest = handler.Available; + } + catch (ObjectDisposedException) + { + // Peace out! + return; + } + + // Do the keep-alive + try + { + handler.BeginSend(_controlBytesPlaceholder, 0, _controlBytesPlaceholder.Length, 0, KeepAliveCallback, handler); + } + catch (SocketException ex) + { + HandleCommunicationError(handler, ex); + } + catch (ObjectDisposedException) + { + // If disposed, handle communication error was already done and we're just catching up on other threads. Supress it. + } + } + + private void KeepAliveCallback(IAsyncResult asyncResult) + { + SendCallback(asyncResult); + + Thread.Sleep(1000); + + KeepAlive((Socket)asyncResult.AsyncState); + } + private void SendCallback(IAsyncResult asyncResult) { // Get the socket to complete on @@ -374,15 +383,7 @@ private void SendCallback(IAsyncResult asyncResult) } catch (SocketException ex) { - lock (_isConnectedLock) - { - if (_isConnected) - { - HandleCommunicationError(_socket, ex); - _isConnected = false; - } - } - + HandleCommunicationError(_socket, ex); return; } catch (ObjectDisposedException) @@ -405,15 +406,7 @@ private void ReceiveCallback(IAsyncResult asyncResult) } catch (SocketException ex) { - lock (_isConnectedLock) - { - if (_isConnected) - { - HandleCommunicationError(_socket, ex); - _isConnected = false; - } - } - + HandleCommunicationError(_socket, ex); return; } catch (ObjectDisposedException) @@ -491,27 +484,34 @@ private void ProcessReceivedMessage(MessageState messageState) // Have control bytes, get message bytes - // Initialize buffer if needed - if (messageState.Buffer == null) + // SPECIAL CASE: if empty message, skip a bunch of stuff + if (messageState.BytesToRead != 0) { - messageState.Buffer = new byte[messageState.BytesToRead]; - } + // Initialize buffer if needed + if (messageState.Buffer == null) + { + messageState.Buffer = new byte[messageState.BytesToRead]; + } - var bytesAvailable = bytesRead - currentOffset; + var bytesAvailable = bytesRead - currentOffset; - var bytesToCopy = Math.Min(messageState.BytesToRead, bytesAvailable); + var bytesToCopy = Math.Min(messageState.BytesToRead, bytesAvailable); - // Copy bytes to buffer - Buffer.BlockCopy(buffer, currentOffset, messageState.Buffer, messageState.Buffer.Length - messageState.BytesToRead, bytesToCopy); + // Copy bytes to buffer + Buffer.BlockCopy(buffer, currentOffset, messageState.Buffer, messageState.Buffer.Length - messageState.BytesToRead, bytesToCopy); - currentOffset += bytesToCopy; - messageState.BytesToRead -= bytesToCopy; + currentOffset += bytesToCopy; + messageState.BytesToRead -= bytesToCopy; + } // Check if we're done if (messageState.BytesToRead == 0) { - // Done, add to complete received messages - CompleteMessage(messageState.Handler, messageState.ThreadId, messageState.Buffer); + if (messageState.Buffer != null) + { + // Done, add to complete received messages + CompleteMessage(messageState.Handler, messageState.ThreadId, messageState.Buffer); + } // Reset message state messageState.Buffer = null; @@ -566,24 +566,30 @@ private void CompleteMessage(Socket handler, int threadId, byte[] message) /// The exception that the socket raised. private void HandleCommunicationError(Socket socket, Exception ex) { - // Close the socket - try - { - socket.Shutdown(SocketShutdown.Both); - } - catch (SocketException) + lock (socket) { - // Socket was not able to be shutdown, likely because it was never opened - } - catch (ObjectDisposedException) - { - // Socket was already closed/disposed, so return out to prevent raising the Error event multiple times - // This is most likely to happen when an error occurs during heavily multithreaded use - return; + // Close the socket + try + { + socket.Shutdown(SocketShutdown.Both); + } + catch (SocketException) + { + // Socket was not able to be shutdown, likely because it was never opened + } + catch (ObjectDisposedException) + { + // Socket was already closed/disposed, so return out to prevent raising the Error event multiple times + // This is most likely to happen when an error occurs during heavily multithreaded use + return; + } + + // Close / dispose the socket + socket.Close(); } - // Close / dispose the socket - socket.Close(); + // No longer connected + _isConnected = false; // Clear receive queue for this client _receiveBufferQueue.Clear(); diff --git a/SimplSockets/SimplSocketServer.cs b/SimplSockets/SimplSocketServer.cs index 693e07d..e366c60 100644 --- a/SimplSockets/SimplSocketServer.cs +++ b/SimplSockets/SimplSocketServer.cs @@ -26,10 +26,6 @@ public class SimplSocketServer : ISimplSocketServer // Whether or not to use the Nagle algorithm private readonly bool _useNagleAlgorithm = false; - // The receive buffer queue - private readonly Dictionary>> _receiveBufferQueue = null; - private readonly ReaderWriterLockSlim _receiveBufferQueueLock = new ReaderWriterLockSlim(); - // Whether or not the socket is currently listening private volatile bool _isListening = false; private readonly object _isListeningLock = new object(); @@ -79,12 +75,11 @@ public SimplSocketServer(Func socketFunc, int messageBufferSize = 4096, _currentlyConnectedClients = new List(maximumConnections); - _receiveBufferQueue = new Dictionary>>(maximumConnections); - // Create the pools _messageStatePool = new Pool(maximumConnections, () => new MessageState(), messageState => { messageState.Buffer = null; + messageState.ReceiveBufferQueue = null; messageState.Handler = null; messageState.ThreadId = -1; messageState.BytesToRead = -1; @@ -337,6 +332,8 @@ private void AcceptCallback(IAsyncResult asyncResult) var messageState = _messageStatePool.Pop(); messageState.Handler = handler; messageState.Buffer = _bufferPool.Pop(); + // Create receive buffer queue for this client + messageState.ReceiveBufferQueue = new BlockingQueue>(_maximumConnections * 10); try { @@ -353,22 +350,54 @@ private void AcceptCallback(IAsyncResult asyncResult) return; } - // Create receive buffer queue for this client - var receiveBufferQueue = new BlockingQueue>(64); - _receiveBufferQueueLock.EnterWriteLock(); + // Spin up the keep-alive + KeepAlive(handler); + + // Process all incoming messages + var processMessageState = _messageStatePool.Pop(); + processMessageState.ReceiveBufferQueue = messageState.ReceiveBufferQueue; + processMessageState.Handler = handler; + + ProcessReceivedMessage(processMessageState); + } + + private void KeepAlive(Socket handler) + { + int availableTest = 0; + + // If the socket is disposed, we're done try { - _receiveBufferQueue[messageState.Handler.GetHashCode()] = receiveBufferQueue; + availableTest = handler.Available; } - finally + catch (ObjectDisposedException) + { + // Peace out! + return; + } + + // Do the keep-alive + try + { + handler.BeginSend(_controlBytesPlaceholder, 0, _controlBytesPlaceholder.Length, 0, KeepAliveCallback, handler); + } + catch (SocketException ex) + { + HandleCommunicationError(handler, ex); + } + catch (ObjectDisposedException) { - _receiveBufferQueueLock.ExitWriteLock(); + // If disposed, handle communication error was already done and we're just catching up on other threads. Supress it. } + } - // Process all incoming messages - var processMessageState = _messageStatePool.Pop(); - processMessageState.Handler = handler; - ProcessReceivedMessage(processMessageState, receiveBufferQueue); + private void KeepAliveCallback(IAsyncResult asyncResult) + { + SendCallback(asyncResult); + + Thread.Sleep(1000); + + KeepAlive((Socket)asyncResult.AsyncState); } private void SendCallback(IAsyncResult asyncResult) @@ -416,21 +445,7 @@ private void ReceiveCallback(IAsyncResult asyncResult) if (bytesRead > 0) { // Add buffer to queue - BlockingQueue> queue = null; - _receiveBufferQueueLock.EnterReadLock(); - try - { - if (!_receiveBufferQueue.TryGetValue(messageState.Handler.GetHashCode(), out queue)) - { - throw new Exception("FATAL: No receive queue created for current socket"); - } - } - finally - { - _receiveBufferQueueLock.ExitReadLock(); - } - - queue.Enqueue(new KeyValuePair(messageState.Buffer, bytesRead)); + messageState.ReceiveBufferQueue.Enqueue(new KeyValuePair(messageState.Buffer, bytesRead)); // Post receive on the handler socket messageState.Buffer = _bufferPool.Pop(); @@ -449,16 +464,29 @@ private void ReceiveCallback(IAsyncResult asyncResult) } } - private void ProcessReceivedMessage(MessageState messageState, BlockingQueue> receiveBufferQueue) + private void ProcessReceivedMessage(MessageState messageState) { + int availableTest = 0; int controlBytesOffset = 0; byte[] protocolBuffer = new byte[_controlBytesPlaceholder.Length]; // Loop until socket is done while (_isListening) { + // If the socket is disposed, we're done + try + { + availableTest = messageState.Handler.Available; + } + catch (ObjectDisposedException) + { + // Peace out! + _messageStatePool.Push(messageState); + return; + } + // Get the next buffer from the queue - var receiveBufferEntry = receiveBufferQueue.Dequeue(); + var receiveBufferEntry = messageState.ReceiveBufferQueue.Dequeue(); var buffer = receiveBufferEntry.Key; int bytesRead = receiveBufferEntry.Value; @@ -496,27 +524,34 @@ private void ProcessReceivedMessage(MessageState messageState, BlockingQueue /// Handles an error in socket communication. /// - /// The socket that raised the exception. + /// The socket. /// The exception that the socket raised. private void HandleCommunicationError(Socket socket, Exception ex) { @@ -594,17 +629,6 @@ private void HandleCommunicationError(Socket socket, Exception ex) _currentlyConnectedClientsLock.ExitWriteLock(); } - // Remove receive queue for this client - _receiveBufferQueueLock.EnterWriteLock(); - try - { - _receiveBufferQueue.Remove(socket.GetHashCode()); - } - finally - { - _receiveBufferQueueLock.ExitWriteLock(); - } - // Release one from the max connections semaphore _maxConnectionsSemaphore.Release(); @@ -622,6 +646,7 @@ private void HandleCommunicationError(Socket socket, Exception ex) private class MessageState { public byte[] Buffer = null; + public BlockingQueue> ReceiveBufferQueue = null; public Socket Handler = null; public int ThreadId = -1; public int BytesToRead = -1;