diff --git a/MIN/MINProtocol.cs b/MIN/MINProtocol.cs index 90e6d49..2deca4a 100644 --- a/MIN/MINProtocol.cs +++ b/MIN/MINProtocol.cs @@ -50,7 +50,7 @@ public class MINProtocol : IMINProtocol public TimeSpan FrameRetransmitTimeout { get; set; } = TimeSpan.FromMilliseconds(50); - private readonly IMINTransport transport; + private IMINTransport transport; private readonly ILogger logger; private readonly IMINTimeProvider timeProvider; @@ -61,6 +61,7 @@ public class MINProtocol : IMINProtocol private readonly object statsLock = new object(); private readonly MINStats stats = new MINStats(); + private bool disposeTransport; // Null propagation is fine too, but I like skipping it if the log messages requires extra work to generate @@ -72,7 +73,7 @@ public class MINProtocol : IMINProtocol /// /// Creates a new instance of the MINProtocol /// - /// The transport implementation to use + /// The transport implementation to use. The instance will be disposed when the new MINProtocol instance is disposed. /// A Microsoft.Extensions.Logging compatible implementation which receives protocol logging. If not provided, no logging will occur. /// An implementation of IMINTimeProvider for testing purposes. Defaults to SystemTimeProvider. public MINProtocol(IMINTransport transport, ILogger logger = null, IMINTimeProvider timeProvider = null) @@ -87,6 +88,14 @@ public MINProtocol(IMINTransport transport, ILogger logger = null, IMINTimeProvi /// public void Dispose() { + if (workerThreadCancellation == null) + { + // Already stopped, dispose the transport ourselves + transport?.Dispose(); + return; + } + + disposeTransport = true; Stop(); } @@ -232,158 +241,170 @@ private void QueueTransport(QueuedFrame queuedFrame) private void RunWorker(CancellationTokenSource cancellationTokenSource) { - var waitHandles = new List + try { - transportQueueEvent.WaitHandle, - sendResetEvent.WaitHandle, - cancellationTokenSource.Token.WaitHandle - }; - - - var usePolling = false; - if (transport is IMINAwaitableTransport awaitableTransport) - waitHandles.Add(awaitableTransport.DataAvailable()); - else - usePolling = true; + var waitHandles = new List + { + transportQueueEvent.WaitHandle, + sendResetEvent.WaitHandle, + cancellationTokenSource.Token.WaitHandle + }; - var waitHandlesArray = waitHandles.ToArray(); + var usePolling = false; + if (transport is IMINAwaitableTransport awaitableTransport) + waitHandles.Add(awaitableTransport.DataAvailable()); + else + usePolling = true; - var transportConnected = false; - transport.OnDisconnected += (sender, args) => - { - OnDisconnected?.Invoke(this, EventArgs.Empty); - transportConnected = false; - }; + var waitHandlesArray = waitHandles.ToArray(); - - while (!cancellationTokenSource.Token.IsCancellationRequested) - { - try + + var transportConnected = false; + transport.OnDisconnected += (sender, args) => + { + OnDisconnected?.Invoke(this, EventArgs.Empty); + transportConnected = false; + }; + + + while (!cancellationTokenSource.Token.IsCancellationRequested) { - if (!transportConnected) + try { - transport.Connect(cancellationTokenSource.Token); - if (cancellationTokenSource.Token.IsCancellationRequested) - break; - - OnConnected?.Invoke(this, EventArgs.Empty); - transportConnected = true; - } + if (!transportConnected) + { + transport.Connect(cancellationTokenSource.Token); + if (cancellationTokenSource.Token.IsCancellationRequested) + break; + OnConnected?.Invoke(this, EventArgs.Empty); + transportConnected = true; + } - DateTime now; - bool remoteActive; - var yield = true; - try - { - if (sendResetEvent.IsSet) + DateTime now; + bool remoteActive; + + var yield = true; + try { - logger?.LogDebug("Sending RESET"); - var resetFrame = new MINFrame(MINWire.Reset, Array.Empty(), true); + if (sendResetEvent.IsSet) + { + logger?.LogDebug("Sending RESET"); + var resetFrame = new MINFrame(MINWire.Reset, Array.Empty(), true); + + WriteFrameData(resetFrame, 0, cancellationTokenSource.Token); + WriteFrameData(resetFrame, 0, cancellationTokenSource.Token); + + transport.Reset(); + InternalReset(false); - WriteFrameData(resetFrame, 0, cancellationTokenSource.Token); - WriteFrameData(resetFrame, 0, cancellationTokenSource.Token); + sendResetEvent.Reset(); - transport.Reset(); - InternalReset(false); - - sendResetEvent.Reset(); + lock (resetCompletedLock) + { + resetCompleted?.TrySetResult(true); + resetCompleted = null; + } + } + + if (cancellationTokenSource.Token.IsCancellationRequested) + break; - lock (resetCompletedLock) + var incomingData = transport.ReadAll(); + if (incomingData.Length > 0) { - resetCompleted?.TrySetResult(true); - resetCompleted = null; + ProcessIncomingData(incomingData, cancellationTokenSource.Token); + yield = false; } - } - - if (cancellationTokenSource.Token.IsCancellationRequested) - break; - - var incomingData = transport.ReadAll(); - if (incomingData.Length > 0) - { - ProcessIncomingData(incomingData, cancellationTokenSource.Token); - yield = false; - } - if (cancellationTokenSource.Token.IsCancellationRequested) - break; + if (cancellationTokenSource.Token.IsCancellationRequested) + break; - if (transportConnected) - { - var queuedFrame = GetNextQueuedFrame(); - if (queuedFrame != null) + if (transportConnected) { - if (ProcessQueuedFrame(queuedFrame, cancellationTokenSource.Token)) - FrameSent(queuedFrame); - - yield = false; + var queuedFrame = GetNextQueuedFrame(); + if (queuedFrame != null) + { + if (ProcessQueuedFrame(queuedFrame, cancellationTokenSource.Token)) + FrameSent(queuedFrame); + + yield = false; + } } + else + yield = false; + + if (cancellationTokenSource.Token.IsCancellationRequested) + break; + + now = timeProvider.Now(); + remoteActive = now - lastReceivedFrame < IdleTimeout; + + if (now - lastAck > AckRetransmitTimeout && remoteActive && transportConnected) + SendAck(cancellationTokenSource.Token); + } + catch (OperationCanceledException) + { + break; } - else - yield = false; + + if (cancellationTokenSource.Token.IsCancellationRequested) break; - now = timeProvider.Now(); - remoteActive = now - lastReceivedFrame < IdleTimeout; - if (now - lastAck > AckRetransmitTimeout && remoteActive && transportConnected) - SendAck(cancellationTokenSource.Token); - } - catch (OperationCanceledException) - { - break; - } + // If any frames were processed, spin once more to check again + if (!yield) + continue; - - if (cancellationTokenSource.Token.IsCancellationRequested) - break; + // Wait for the next thing to do to prevent an idle loop + var timeout = Timeout.Infinite; - - // If any frames were processed, spin once more to check again - if (!yield) - continue; + if (remoteActive) + // Frames were received recently, the Ack is expected to be resent regularly. + timeout = (int) Math.Max(0, (AckRetransmitTimeout - (now - lastAck)).TotalMilliseconds); - // Wait for the next thing to do to prevent an idle loop - var timeout = Timeout.Infinite; + var oldestFrame = transportAwaitingAck.FirstOrDefault(); + if (oldestFrame != null) + { + // There are unacknowledged frames, it is expected to be resent regularly. + var timeUntilResend = (int) (FrameRetransmitTimeout - (now - oldestFrame.LastSentTime)) + .TotalMilliseconds; + if (timeout == Timeout.Infinite || timeout > timeUntilResend) + timeout = timeUntilResend; + } - if (remoteActive) - // Frames were received recently, the Ack is expected to be resent regularly. - timeout = (int)Math.Max(0, (AckRetransmitTimeout - (now - lastAck)).TotalMilliseconds); + // Wait for the transport to have data available or fall back to polling, or any of the above timeouts + if (usePolling && timeout == Timeout.Infinite || timeout > 10) + timeout = 10; - var oldestFrame = transportAwaitingAck.FirstOrDefault(); - if (oldestFrame != null) + WaitHandle.WaitAny(waitHandlesArray, timeout); + } + catch (Exception e) { - // There are unacknowledged frames, it is expected to be resent regularly. - var timeUntilResend = (int)(FrameRetransmitTimeout - (now - oldestFrame.LastSentTime)).TotalMilliseconds; - if (timeout == Timeout.Infinite || timeout > timeUntilResend) - timeout = timeUntilResend; + logger?.LogError(e, "Unhandled exception in MIN protocol worker thread: {message}", e.Message); + Thread.Sleep(500); } - - - // Wait for the transport to have data available or fall back to polling, or any of the above timeouts - if (usePolling && timeout == Timeout.Infinite || timeout > 10) - timeout = 10; - - WaitHandle.WaitAny(waitHandlesArray, timeout); } - catch (Exception e) + } + finally + { + cancellationTokenSource.Dispose(); + + if (disposeTransport) { - logger?.LogError(e, "Unhandled exception in MIN protocol worker thread: {message}", e.Message); - Thread.Sleep(500); + transport?.Dispose(); + transport = null; } } - - cancellationTokenSource.Dispose(); }