Skip to content

Commit

Permalink
Implemented retry mechanism for serial connection
Browse files Browse the repository at this point in the history
Implemented AwaitableTransport interface for serial transport
  • Loading branch information
MvRens committed Mar 5, 2021
1 parent 6db7da6 commit 65c76b3
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 31 deletions.
108 changes: 91 additions & 17 deletions MIN.SerialPort/MINSerialTransport.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Diagnostics;
using System.IO;
using System.IO.Ports;
using System.Threading;
Expand All @@ -10,10 +11,16 @@ namespace MIN.SerialPort
/// Implements the MIN transport for a SerialPort connection.
/// </summary>
// ReSharper disable once UnusedMember.Global - public API
public class MINSerialTransport : IMINTransport
public class MINSerialTransport : IMINAwaitableTransport
{
private readonly System.IO.Ports.SerialPort serialPort;
private System.IO.Ports.SerialPort serialPort;
private readonly Func<System.IO.Ports.SerialPort> serialPortFactory;
private readonly ManualResetEventSlim dataAvailableEvent = new ManualResetEventSlim();

/// <inheritdoc />
public event EventHandler OnDisconnected;


/// <summary>
/// Initializes a new instance of the MIN serial transport.
/// </summary>
Expand All @@ -27,7 +34,7 @@ public class MINSerialTransport : IMINTransport
public MINSerialTransport(string portName, int baudRate, Parity parity = Parity.None, int dataBits = 8, StopBits stopBits = StopBits.One,
Handshake handshake = Handshake.None, bool dtrEnable = false)
{
serialPort = new System.IO.Ports.SerialPort(portName, baudRate, parity, dataBits, stopBits)
serialPortFactory = () => new System.IO.Ports.SerialPort(portName, baudRate, parity, dataBits, stopBits)
{
Handshake = handshake,
DtrEnable = dtrEnable
Expand All @@ -40,7 +47,7 @@ public void Dispose()
{
try
{
serialPort.Dispose();
serialPort?.Dispose();
}
catch
{
Expand All @@ -52,46 +59,113 @@ public void Dispose()
/// <inheritdoc />
public void Connect(CancellationToken cancellationToken)
{
// TODO (must have - port from old source) retry
serialPort.Open();
dataAvailableEvent.Reset();

// TODO (must have - port from old source) detect disconnects and report back

while ((serialPort == null || !serialPort.IsOpen) && !cancellationToken.IsCancellationRequested)
{
try
{
if (serialPort == null)
serialPort = serialPortFactory();

Debug.Assert(serialPort != null);
serialPort.Open();

serialPort.DataReceived += (sender, args) =>
{
dataAvailableEvent.Set();
};

serialPort.ErrorReceived += (sender, args) =>
{
Disconnect();
};
}
catch
{
Thread.Sleep(500);
}
}
}


private void Disconnect()
{
try
{
serialPort?.Dispose();
}
catch
{
// Ignored
}

serialPort = null;
dataAvailableEvent.Reset();

OnDisconnected?.Invoke(this, EventArgs.Empty);
}


/// <inheritdoc />
public void Reset()
{
if (serialPort.IsOpen)
if (serialPort != null && serialPort.IsOpen)
serialPort.DiscardInBuffer();
}


/// <inheritdoc />
public void Write(byte[] data, CancellationToken cancellationToken)
{
if (!serialPort.IsOpen)
if (serialPort == null || !serialPort.IsOpen)
return;

serialPort.Write(data, 0, data.Length);

try
{
serialPort.Write(data, 0, data.Length);
}
catch
{
Disconnect();
}
}


/// <inheritdoc />
public byte[] ReadAll()
{
if (!serialPort.IsOpen)
if (serialPort == null || !serialPort.IsOpen)
return Array.Empty<byte>();

var bytesToRead = serialPort.BytesToRead;
if (bytesToRead == 0)
return Array.Empty<byte>();

var data = new byte[bytesToRead];
if (serialPort.Read(data, 0, bytesToRead) < bytesToRead)
throw new IOException("SerialPort lied about the available BytesToRead");
try
{
var data = new byte[bytesToRead];
if (serialPort.Read(data, 0, bytesToRead) < bytesToRead)
throw new IOException("SerialPort lied about the available BytesToRead");

if (serialPort.BytesToRead == 0)
dataAvailableEvent.Reset();

return data;
}
catch
{
Disconnect();
return Array.Empty<byte>();
}
}


return data;
/// <inheritdoc />
public WaitHandle DataAvailable()
{
return dataAvailableEvent.WaitHandle;
}
}
}
9 changes: 7 additions & 2 deletions MIN/Abstractions/IMINProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,14 @@ public interface IMINProtocol : IDisposable
/// An event which is called when an incoming frame arrives.
/// </summary>
event MINFrameEventHandler OnFrame;

/// <summary>
/// An event which is called when a reset has been requested by the target.
/// </summary>
event MINConnectionStateEventHandler OnReset;
}


/// <summary>
/// Statistics on the MIN protocol since the last start.
/// </summary>
Expand Down
5 changes: 5 additions & 0 deletions MIN/Abstractions/IMINTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ public interface IMINTransport : IDisposable
/// </summary>
/// <returns>The raw data available</returns>
byte[] ReadAll();

/// <summary>
/// Event which is raised when the transport disconnects.
/// </summary>
event EventHandler OnDisconnected;
}


Expand Down
58 changes: 46 additions & 12 deletions MIN/MINProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,10 @@ public Task Reset()
/// <inheritdoc />
public event MINFrameEventHandler OnFrame;


/// <inheritdoc />
public event MINConnectionStateEventHandler OnReset;


// ReSharper disable once SuggestBaseTypeForParameter - I like the explicitness
private static void ValidateFrame(byte id, byte[] payload)
{
Expand Down Expand Up @@ -246,15 +249,30 @@ private void RunWorker(CancellationTokenSource cancellationTokenSource)
var waitHandlesArray = waitHandles.ToArray();


// TODO (must have) connect transport, handle OnDisconnect
transport.Connect(cancellationTokenSource.Token);
OnConnected?.Invoke(this, EventArgs.Empty);
var transportConnected = false;
transport.OnDisconnected += (sender, args) =>
{
OnDisconnected?.Invoke(this, EventArgs.Empty);
transportConnected = false;
};


while (!cancellationTokenSource.Token.IsCancellationRequested)
{
try
{
if (!transportConnected)
{
transport.Connect(cancellationTokenSource.Token);
if (cancellationTokenSource.Token.IsCancellationRequested)
break;

OnConnected?.Invoke(this, EventArgs.Empty);
transportConnected = true;
}



DateTime now;
bool remoteActive;

Expand All @@ -281,6 +299,9 @@ private void RunWorker(CancellationTokenSource cancellationTokenSource)
}
}

if (cancellationTokenSource.Token.IsCancellationRequested)
break;

var incomingData = transport.ReadAll();
if (incomingData.Length > 0)
{
Expand All @@ -289,20 +310,30 @@ private void RunWorker(CancellationTokenSource cancellationTokenSource)
}


var queuedFrame = GetNextQueuedFrame();
if (queuedFrame != null)
if (cancellationTokenSource.Token.IsCancellationRequested)
break;

if (transportConnected)
{
if (ProcessQueuedFrame(queuedFrame, cancellationTokenSource.Token))
FrameSent(queuedFrame);
var queuedFrame = GetNextQueuedFrame();
if (queuedFrame != null)
{
if (ProcessQueuedFrame(queuedFrame, cancellationTokenSource.Token))
FrameSent(queuedFrame);

yield = false;
yield = false;
}
}
else
yield = false;

if (cancellationTokenSource.Token.IsCancellationRequested)
break;

now = timeProvider.Now();
remoteActive = now - lastReceivedFrame < IdleTimeout;

if (now - lastAck > AckRetransmitTimeout && remoteActive)
if (now - lastAck > AckRetransmitTimeout && remoteActive && transportConnected)
SendAck(cancellationTokenSource.Token);
}
catch (OperationCanceledException)
Expand All @@ -311,7 +342,11 @@ private void RunWorker(CancellationTokenSource cancellationTokenSource)
}



if (cancellationTokenSource.Token.IsCancellationRequested)
break;


// If any frames were processed, spin once more to check again
if (!yield)
continue;
Expand Down Expand Up @@ -622,8 +657,7 @@ private void ProcessReceivedResetFrame()
}

InternalReset(true);

// TODO (nice to have) raise OnReset event?
OnReset?.Invoke(this, EventArgs.Empty);
}


Expand Down

0 comments on commit 65c76b3

Please sign in to comment.