Skip to content

Commit

Permalink
Merge pull request #47 from ChrisPulman/SerialPortSubscribe
Browse files Browse the repository at this point in the history
Add Serial port Subscribe methods
  • Loading branch information
ChrisPulman committed Mar 19, 2024
2 parents 7089a5a + 024e32a commit 8a4f7db
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 2 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
<ItemGroup>
<!--<Compile Update="**\*.cs" DependentUpon="I%(Filename).cs" />-->
<PackageReference Include="stylecop.analyzers" Version="1.2.0-beta.556" PrivateAssets="all" />
<PackageReference Include="Roslynator.Analyzers" Version="4.11.0" PrivateAssets="All" />
<PackageReference Include="Roslynator.Analyzers" Version="4.12.0" PrivateAssets="All" />
<AdditionalFiles Include="$(MSBuildThisFileDirectory)stylecop.json" Link="stylecop.json" />
</ItemGroup>
</Project>
2 changes: 1 addition & 1 deletion Version.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"$schema": "https://raw.githubusercontent.com/dotnet/Nerdbank.GitVersioning/master/src/NerdBank.GitVersioning/version.schema.json",
"version": "1.3.1",
"version": "2.0.1",
"publicReleaseRefSpec": [
"^refs/heads/master$",
"^refs/heads/main$"
Expand Down
133 changes: 133 additions & 0 deletions src/MQTTnet.Rx.SerialPort/Create.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Reactive.Linq;
using CP.IO.Ports;
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Rx.Client;

namespace MQTTnet.Rx.SerialPort
Expand Down Expand Up @@ -35,5 +36,137 @@ public static IObservable<MqttClientPublishResult> PublishSerialPort(this IObser

return client.PublishMessage(serialPort?.DataReceived.BufferUntil(startsWith, endsWith, timeOut).Select(payLoad => (topic, payLoad))!);
}

/// <summary>
/// Subscribes the serial port.
/// </summary>
/// <param name="client">The client.</param>
/// <param name="topic">The topic.</param>
/// <param name="configurePort">The configure port.</param>
/// <param name="payloadFactory">The payload factory.</param>
public static void SubscribeSerialPortWriteLine(this IObservable<IMqttClient> client, string topic, Action<ISerialPortRx> configurePort, Func<string, string> payloadFactory)
{
client.ThrowArgumentNullExceptionIfNull(nameof(client));
configurePort.ThrowArgumentNullExceptionIfNull(nameof(configurePort));

var serialPort = default(ISerialPortRx)!;
configurePort?.Invoke(serialPort);
serialPort.ThrowArgumentNullExceptionIfNull(nameof(serialPort));

client.SubscribeToTopic(topic).Subscribe(message => serialPort.WriteLine(payloadFactory(message.ApplicationMessage.ConvertPayloadToString())));
}

/// <summary>
/// Subscribes the serial port.
/// </summary>
/// <param name="client">The client.</param>
/// <param name="topic">The topic.</param>
/// <param name="configurePort">The configure port.</param>
/// <param name="payloadFactory">The payload factory.</param>
public static void SubscribeSerialPortWrite(this IObservable<IMqttClient> client, string topic, Action<ISerialPortRx> configurePort, Func<string, string> payloadFactory)
{
client.ThrowArgumentNullExceptionIfNull(nameof(client));
configurePort.ThrowArgumentNullExceptionIfNull(nameof(configurePort));

var serialPort = default(ISerialPortRx)!;
configurePort?.Invoke(serialPort);
serialPort.ThrowArgumentNullExceptionIfNull(nameof(serialPort));

client.SubscribeToTopic(topic).Subscribe(message => serialPort.Write(payloadFactory(message.ApplicationMessage.ConvertPayloadToString())));
}

/// <summary>
/// Subscribes the serial port.
/// </summary>
/// <param name="client">The client.</param>
/// <param name="topic">The topic.</param>
/// <param name="configurePort">The configure port.</param>
/// <param name="payloadFactory">The payload factory.</param>
public static void SubscribeSerialPortWrite(this IObservable<IMqttClient> client, string topic, Action<ISerialPortRx> configurePort, Func<string, byte[]> payloadFactory)
{
client.ThrowArgumentNullExceptionIfNull(nameof(client));
configurePort.ThrowArgumentNullExceptionIfNull(nameof(configurePort));

var serialPort = default(ISerialPortRx)!;
configurePort?.Invoke(serialPort);
serialPort.ThrowArgumentNullExceptionIfNull(nameof(serialPort));

client.SubscribeToTopic(topic).Subscribe(message => serialPort.Write(payloadFactory(message.ApplicationMessage.ConvertPayloadToString())));
}

/// <summary>
/// Publishes the serial port.
/// </summary>
/// <param name="client">The client.</param>
/// <param name="topic">The topic.</param>
/// <param name="serialPort">The serial port.</param>
/// <param name="startsWith">The starts with.</param>
/// <param name="endsWith">The ends with.</param>
/// <param name="timeOut">The time out.</param>
/// <returns>A ApplicationMessageProcessedEventArgs.</returns>
public static IObservable<ApplicationMessageProcessedEventArgs> PublishSerialPort(this IObservable<IManagedMqttClient> client, string topic, ISerialPortRx serialPort, IObservable<char> startsWith, IObservable<char> endsWith, int timeOut)
{
client.ThrowArgumentNullExceptionIfNull(nameof(client));
serialPort.ThrowArgumentNullExceptionIfNull(nameof(serialPort));

return client.PublishMessage(serialPort?.DataReceived.BufferUntil(startsWith, endsWith, timeOut).Select(payLoad => (topic, payLoad))!);
}

/// <summary>
/// Subscribes the serial port.
/// </summary>
/// <param name="client">The client.</param>
/// <param name="topic">The topic.</param>
/// <param name="configurePort">The configure port.</param>
/// <param name="payloadFactory">The payload factory.</param>
public static void SubscribeSerialPortWriteLine(this IObservable<IManagedMqttClient> client, string topic, Action<ISerialPortRx> configurePort, Func<string, string> payloadFactory)
{
client.ThrowArgumentNullExceptionIfNull(nameof(client));
configurePort.ThrowArgumentNullExceptionIfNull(nameof(configurePort));

var serialPort = default(ISerialPortRx)!;
configurePort?.Invoke(serialPort);
serialPort.ThrowArgumentNullExceptionIfNull(nameof(serialPort));

client.SubscribeToTopic(topic).Subscribe(message => serialPort.WriteLine(payloadFactory(message.ApplicationMessage.ConvertPayloadToString())));
}

/// <summary>
/// Subscribes the serial port.
/// </summary>
/// <param name="client">The client.</param>
/// <param name="topic">The topic.</param>
/// <param name="configurePort">The configure port.</param>
/// <param name="payloadFactory">The payload factory.</param>
public static void SubscribeSerialPortWrite(this IObservable<IManagedMqttClient> client, string topic, Action<ISerialPortRx> configurePort, Func<string, string> payloadFactory)
{
client.ThrowArgumentNullExceptionIfNull(nameof(client));
configurePort.ThrowArgumentNullExceptionIfNull(nameof(configurePort));

var serialPort = default(ISerialPortRx)!;
configurePort?.Invoke(serialPort);
serialPort.ThrowArgumentNullExceptionIfNull(nameof(serialPort));

client.SubscribeToTopic(topic).Subscribe(message => serialPort.Write(payloadFactory(message.ApplicationMessage.ConvertPayloadToString())));
}

/// <summary>
/// Subscribes the serial port.
/// </summary>
/// <param name="client">The client.</param>
/// <param name="topic">The topic.</param>
/// <param name="configurePort">The configure port.</param>
/// <param name="payloadFactory">The payload factory.</param>
public static void SubscribeSerialPortWrite(this IObservable<IManagedMqttClient> client, string topic, Action<ISerialPortRx> configurePort, Func<string, byte[]> payloadFactory)
{
client.ThrowArgumentNullExceptionIfNull(nameof(client));
configurePort.ThrowArgumentNullExceptionIfNull(nameof(configurePort));

var serialPort = default(ISerialPortRx)!;
configurePort?.Invoke(serialPort);
serialPort.ThrowArgumentNullExceptionIfNull(nameof(serialPort));

client.SubscribeToTopic(topic).Subscribe(message => serialPort.Write(payloadFactory(message.ApplicationMessage.ConvertPayloadToString())));
}
}
}

0 comments on commit 8a4f7db

Please sign in to comment.