Skip to content

Commit

Permalink
Fixed issue with transport not being disposed, locking the serial port
Browse files Browse the repository at this point in the history
  • Loading branch information
MvRens committed Mar 8, 2021
1 parent 65c76b3 commit 35b664d
Showing 1 changed file with 133 additions and 112 deletions.
245 changes: 133 additions & 112 deletions MIN/MINProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class MINProtocol : IMINProtocol
public TimeSpan FrameRetransmitTimeout { get; set; } = TimeSpan.FromMilliseconds(50);


private readonly IMINTransport transport;
private IMINTransport transport;
private readonly ILogger logger;
private readonly IMINTimeProvider timeProvider;

Expand All @@ -61,6 +61,7 @@ public class MINProtocol : IMINProtocol

private readonly object statsLock = new object();
private readonly MINStats stats = new MINStats();
private bool disposeTransport;


// Null propagation is fine too, but I like skipping it if the log messages requires extra work to generate
Expand All @@ -72,7 +73,7 @@ public class MINProtocol : IMINProtocol
/// <summary>
/// Creates a new instance of the MINProtocol
/// </summary>
/// <param name="transport">The transport implementation to use</param>
/// <param name="transport">The transport implementation to use. The instance will be disposed when the new MINProtocol instance is disposed.</param>
/// <param name="logger">A Microsoft.Extensions.Logging compatible implementation which receives protocol logging. If not provided, no logging will occur.</param>
/// <param name="timeProvider">An implementation of IMINTimeProvider for testing purposes. Defaults to SystemTimeProvider.</param>
public MINProtocol(IMINTransport transport, ILogger logger = null, IMINTimeProvider timeProvider = null)
Expand All @@ -87,6 +88,14 @@ public MINProtocol(IMINTransport transport, ILogger logger = null, IMINTimeProvi
/// <inheritdoc />
public void Dispose()
{
if (workerThreadCancellation == null)
{
// Already stopped, dispose the transport ourselves
transport?.Dispose();
return;
}

disposeTransport = true;
Stop();
}

Expand Down Expand Up @@ -232,158 +241,170 @@ private void QueueTransport(QueuedFrame queuedFrame)

private void RunWorker(CancellationTokenSource cancellationTokenSource)
{
var waitHandles = new List<WaitHandle>
try
{
transportQueueEvent.WaitHandle,
sendResetEvent.WaitHandle,
cancellationTokenSource.Token.WaitHandle
};


var usePolling = false;
if (transport is IMINAwaitableTransport awaitableTransport)
waitHandles.Add(awaitableTransport.DataAvailable());
else
usePolling = true;
var waitHandles = new List<WaitHandle>
{
transportQueueEvent.WaitHandle,
sendResetEvent.WaitHandle,
cancellationTokenSource.Token.WaitHandle
};

var waitHandlesArray = waitHandles.ToArray();

var usePolling = false;
if (transport is IMINAwaitableTransport awaitableTransport)
waitHandles.Add(awaitableTransport.DataAvailable());
else
usePolling = true;

var transportConnected = false;
transport.OnDisconnected += (sender, args) =>
{
OnDisconnected?.Invoke(this, EventArgs.Empty);
transportConnected = false;
};
var waitHandlesArray = waitHandles.ToArray();


while (!cancellationTokenSource.Token.IsCancellationRequested)
{
try

var transportConnected = false;
transport.OnDisconnected += (sender, args) =>
{
OnDisconnected?.Invoke(this, EventArgs.Empty);
transportConnected = false;
};


while (!cancellationTokenSource.Token.IsCancellationRequested)
{
if (!transportConnected)
try
{
transport.Connect(cancellationTokenSource.Token);
if (cancellationTokenSource.Token.IsCancellationRequested)
break;

OnConnected?.Invoke(this, EventArgs.Empty);
transportConnected = true;
}
if (!transportConnected)
{
transport.Connect(cancellationTokenSource.Token);
if (cancellationTokenSource.Token.IsCancellationRequested)
break;

OnConnected?.Invoke(this, EventArgs.Empty);
transportConnected = true;
}


DateTime now;
bool remoteActive;

var yield = true;
try
{
if (sendResetEvent.IsSet)
DateTime now;
bool remoteActive;

var yield = true;
try
{
logger?.LogDebug("Sending RESET");
var resetFrame = new MINFrame(MINWire.Reset, Array.Empty<byte>(), true);
if (sendResetEvent.IsSet)
{
logger?.LogDebug("Sending RESET");
var resetFrame = new MINFrame(MINWire.Reset, Array.Empty<byte>(), true);

WriteFrameData(resetFrame, 0, cancellationTokenSource.Token);
WriteFrameData(resetFrame, 0, cancellationTokenSource.Token);

transport.Reset();
InternalReset(false);

WriteFrameData(resetFrame, 0, cancellationTokenSource.Token);
WriteFrameData(resetFrame, 0, cancellationTokenSource.Token);
sendResetEvent.Reset();

transport.Reset();
InternalReset(false);

sendResetEvent.Reset();
lock (resetCompletedLock)
{
resetCompleted?.TrySetResult(true);
resetCompleted = null;
}
}

if (cancellationTokenSource.Token.IsCancellationRequested)
break;

lock (resetCompletedLock)
var incomingData = transport.ReadAll();
if (incomingData.Length > 0)
{
resetCompleted?.TrySetResult(true);
resetCompleted = null;
ProcessIncomingData(incomingData, cancellationTokenSource.Token);
yield = false;
}
}

if (cancellationTokenSource.Token.IsCancellationRequested)
break;

var incomingData = transport.ReadAll();
if (incomingData.Length > 0)
{
ProcessIncomingData(incomingData, cancellationTokenSource.Token);
yield = false;
}


if (cancellationTokenSource.Token.IsCancellationRequested)
break;
if (cancellationTokenSource.Token.IsCancellationRequested)
break;

if (transportConnected)
{
var queuedFrame = GetNextQueuedFrame();
if (queuedFrame != null)
if (transportConnected)
{
if (ProcessQueuedFrame(queuedFrame, cancellationTokenSource.Token))
FrameSent(queuedFrame);

yield = false;
var queuedFrame = GetNextQueuedFrame();
if (queuedFrame != null)
{
if (ProcessQueuedFrame(queuedFrame, cancellationTokenSource.Token))
FrameSent(queuedFrame);

yield = false;
}
}
else
yield = false;

if (cancellationTokenSource.Token.IsCancellationRequested)
break;

now = timeProvider.Now();
remoteActive = now - lastReceivedFrame < IdleTimeout;

if (now - lastAck > AckRetransmitTimeout && remoteActive && transportConnected)
SendAck(cancellationTokenSource.Token);
}
catch (OperationCanceledException)
{
break;
}
else
yield = false;



if (cancellationTokenSource.Token.IsCancellationRequested)
break;

now = timeProvider.Now();
remoteActive = now - lastReceivedFrame < IdleTimeout;

if (now - lastAck > AckRetransmitTimeout && remoteActive && transportConnected)
SendAck(cancellationTokenSource.Token);
}
catch (OperationCanceledException)
{
break;
}
// If any frames were processed, spin once more to check again
if (!yield)
continue;



if (cancellationTokenSource.Token.IsCancellationRequested)
break;
// Wait for the next thing to do to prevent an idle loop
var timeout = Timeout.Infinite;


// If any frames were processed, spin once more to check again
if (!yield)
continue;
if (remoteActive)
// Frames were received recently, the Ack is expected to be resent regularly.
timeout = (int) Math.Max(0, (AckRetransmitTimeout - (now - lastAck)).TotalMilliseconds);


// Wait for the next thing to do to prevent an idle loop
var timeout = Timeout.Infinite;
var oldestFrame = transportAwaitingAck.FirstOrDefault();
if (oldestFrame != null)
{
// There are unacknowledged frames, it is expected to be resent regularly.
var timeUntilResend = (int) (FrameRetransmitTimeout - (now - oldestFrame.LastSentTime))
.TotalMilliseconds;
if (timeout == Timeout.Infinite || timeout > timeUntilResend)
timeout = timeUntilResend;
}

if (remoteActive)
// Frames were received recently, the Ack is expected to be resent regularly.
timeout = (int)Math.Max(0, (AckRetransmitTimeout - (now - lastAck)).TotalMilliseconds);

// Wait for the transport to have data available or fall back to polling, or any of the above timeouts
if (usePolling && timeout == Timeout.Infinite || timeout > 10)
timeout = 10;

var oldestFrame = transportAwaitingAck.FirstOrDefault();
if (oldestFrame != null)
WaitHandle.WaitAny(waitHandlesArray, timeout);
}
catch (Exception e)
{
// There are unacknowledged frames, it is expected to be resent regularly.
var timeUntilResend = (int)(FrameRetransmitTimeout - (now - oldestFrame.LastSentTime)).TotalMilliseconds;
if (timeout == Timeout.Infinite || timeout > timeUntilResend)
timeout = timeUntilResend;
logger?.LogError(e, "Unhandled exception in MIN protocol worker thread: {message}", e.Message);
Thread.Sleep(500);
}


// Wait for the transport to have data available or fall back to polling, or any of the above timeouts
if (usePolling && timeout == Timeout.Infinite || timeout > 10)
timeout = 10;

WaitHandle.WaitAny(waitHandlesArray, timeout);
}
catch (Exception e)
}
finally
{
cancellationTokenSource.Dispose();

if (disposeTransport)
{
logger?.LogError(e, "Unhandled exception in MIN protocol worker thread: {message}", e.Message);
Thread.Sleep(500);
transport?.Dispose();
transport = null;
}
}

cancellationTokenSource.Dispose();
}


Expand Down

0 comments on commit 35b664d

Please sign in to comment.