Skip to content

Commit

Permalink
Reworked MQTT options, fixes #83.
Browse files Browse the repository at this point in the history
  • Loading branch information
SeppPenner committed Mar 27, 2024
1 parent 8b72a43 commit dfc03bb
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 138 deletions.
8 changes: 0 additions & 8 deletions src/SparkplugNet.Examples/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,11 @@ private static async Task RunVersionAApplication()
nameof(RunVersionAApplication),
"user",
"password",
false,
"scada1",
TimeSpan.FromSeconds(30),
SparkplugMqttProtocolVersion.V311,
null,
null,
null,
true,
CancellationTokenSource.Token);
var application = new VersionA.SparkplugApplication(VersionAMetricsApplication, SparkplugSpecificationVersion.Version22);
Expand Down Expand Up @@ -227,13 +225,11 @@ private static async Task RunVersionANode()
"node 1",
"user",
"password",
false,
"scada1B",
TimeSpan.FromSeconds(30),
SparkplugMqttProtocolVersion.V311,
null,
null,
null,
"group1",
"node1",
CancellationTokenSource.Token);
Expand Down Expand Up @@ -299,13 +295,11 @@ private static async Task RunVersionBApplication()
nameof(RunVersionBApplication),
"user",
"password",
false,
"scada1",
TimeSpan.FromSeconds(30),
SparkplugMqttProtocolVersion.V311,
null,
null,
null,
true,
CancellationTokenSource.Token);
var application = new VersionB.SparkplugApplication(VersionBMetricsApplication, SparkplugSpecificationVersion.Version22);
Expand Down Expand Up @@ -365,13 +359,11 @@ private static async Task RunVersionBNode()
"node 1",
"user",
"password",
false,
"scada1B",
TimeSpan.FromSeconds(30),
SparkplugMqttProtocolVersion.V311,
null,
null,
null,
"group1",
"node1",
CancellationTokenSource.Token);
Expand Down
52 changes: 20 additions & 32 deletions src/SparkplugNet/Core/Application/SparkplugApplicationBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ private Task OnApplicationMessageReceived(MqttApplicationMessageReceivedEventArg

if (SparkplugMessageTopic.TryParse(topic, out var topicParsed))
{
return this.OnMessageReceived(topicParsed!, args.ApplicationMessage.Payload);
var data = args.ApplicationMessage.PayloadSegment.Array ?? [];
return this.OnMessageReceived(topicParsed!, data);
}
else if (topic.Contains(SparkplugMessageType.StateMessage.GetDescription()))
{
Expand Down Expand Up @@ -334,7 +335,7 @@ private async Task ConnectInternal()
false);

// Build up the MQTT client and connect.
this.Options.CancellationToken ??= CancellationToken.None;
this.Options.CancellationToken ??= SystemCancellationToken.None;

var builder = new MqttClientOptionsBuilder()
.WithClientId(this.Options.ClientId)
Expand All @@ -352,44 +353,31 @@ private async Task ConnectInternal()
break;
}

if (this.Options.UseTls)
if (this.Options.MqttTlsOptions is not null)
{
if (this.Options.GetTlsParameters is not null)
{
MqttClientOptionsBuilderTlsParameters? tlsParameters = this.Options.GetTlsParameters();

if (tlsParameters is not null)
{
builder.WithTls(tlsParameters);
}
else
{
builder.WithTls();
}
}
else
{
builder.WithTls();
}
builder.WithTlsOptions(this.Options.MqttTlsOptions);
}
else
{
builder.WithTlsOptions(o => o.UseTls());
}

if (this.Options.WebSocketParameters is null)
if (this.Options.MqttWebSocketOptions is null)
{
builder.WithTcpServer(this.Options.BrokerAddress, this.Options.Port);
}
else
{
builder.WithWebSocketServer(this.Options.BrokerAddress, this.Options.WebSocketParameters);
}

if (this.Options.ProxyOptions is not null)
{
builder.WithProxy(
this.Options.ProxyOptions.Address,
this.Options.ProxyOptions.Username,
this.Options.ProxyOptions.Password,
this.Options.ProxyOptions.Domain,
this.Options.ProxyOptions.BypassOnLocal);
builder.WithWebSocketServer(options =>
options.WithCookieContainer(this.Options.MqttWebSocketOptions.CookieContainer)
.WithCookieContainer(this.Options.MqttWebSocketOptions.Credentials)
.WithProxyOptions(this.Options.MqttWebSocketOptions.ProxyOptions)
.WithRequestHeaders(this.Options.MqttWebSocketOptions.RequestHeaders)
.WithSubProtocols(this.Options.MqttWebSocketOptions.SubProtocols)
.WithUri(this.Options.BrokerAddress)
.WithKeepAliveInterval(this.Options.MqttWebSocketOptions.KeepAliveInterval)
.WithUseDefaultCredentials(this.Options.MqttWebSocketOptions.UseDefaultCredentials)
);
}

if (this.Options.IsPrimaryApplication)
Expand Down
18 changes: 6 additions & 12 deletions src/SparkplugNet/Core/Application/SparkplugApplicationOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,12 @@ public sealed class SparkplugApplicationOptions : SparkplugBaseOptions
/// <param name="clientId">The client identifier.</param>
/// <param name="userName">The name of the user.</param>
/// <param name="password">The password.</param>
/// <param name="useTls">A value indicating whether TLS is used or not.</param>
/// <param name="scadaHostIdentifier">The SCADA host identifier.</param>
/// <param name="reconnectInterval">The reconnect interval.</param>
/// <param name="mqttProtocolVersion">The MQTT protocol version.</param>
/// <param name="isPrimaryApplication">A value indicating whether the application is a primary application or not.</param>
/// <param name="getTlsParameters">The delegate to provide TLS parameters.</param>
/// <param name="webSocketParameters">The web socket parameters.</param>
/// <param name="proxyOptions">The proxy options.</param>
/// <param name="mqttTlsOptions">The MQTT TLS options.</param>
/// <param name="mqttWebSocketOptions">The MQTT WebSocket options.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <seealso cref="SparkplugBaseOptions"/>
public SparkplugApplicationOptions(
Expand All @@ -47,13 +45,11 @@ public sealed class SparkplugApplicationOptions : SparkplugBaseOptions
string? clientId = null,
string? userName = null,
string? password = null,
bool? useTls = null,
string? scadaHostIdentifier = null,
TimeSpan? reconnectInterval = null,
SparkplugMqttProtocolVersion? mqttProtocolVersion = null,
GetTlsParametersDelegate? getTlsParameters = null,
MqttClientOptionsBuilderWebSocketParameters? webSocketParameters = null,
MqttClientWebSocketProxyOptions? proxyOptions = null,
MqttClientTlsOptions? mqttTlsOptions = null,
MqttClientWebSocketOptions? mqttWebSocketOptions = null,
bool? isPrimaryApplication = null,
SystemCancellationToken? cancellationToken = null)
: base(
Expand All @@ -62,13 +58,11 @@ public sealed class SparkplugApplicationOptions : SparkplugBaseOptions
clientId,
userName,
password,
useTls,
scadaHostIdentifier,
reconnectInterval,
mqttProtocolVersion,
getTlsParameters,
webSocketParameters,
proxyOptions)
mqttTlsOptions,
mqttWebSocketOptions)
{
this.IsPrimaryApplication = isPrimaryApplication ?? DefaultIsPrimaryApplication;
this.CancellationToken = cancellationToken ?? SystemCancellationToken.None;
Expand Down
56 changes: 22 additions & 34 deletions src/SparkplugNet/Core/Node/SparkplugNodeBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -218,16 +218,17 @@ private async Task OnApplicationMessageReceived(MqttApplicationMessageReceivedEv

if (SparkplugMessageTopic.TryParse(topic, out var messageTopic))
{
await this.OnMessageReceived(messageTopic!, args.ApplicationMessage.Payload);
var data = args.ApplicationMessage.PayloadSegment.Array ?? [];
await this.OnMessageReceived(messageTopic!, data);
}
else if (topic.Contains(SparkplugMessageType.StateMessage.GetDescription()))
{
// Handle the STATE message before anything else as they're UTF-8 encoded.
await this.FireStatusMessageReceived(Encoding.UTF8.GetString(args.ApplicationMessage.Payload));
await this.FireStatusMessageReceived(Encoding.UTF8.GetString(args.ApplicationMessage.PayloadSegment));
}
else
{
throw new InvalidOperationException($"Received message on unkown topic {topic}: {args.ApplicationMessage.Payload:X2}.");
throw new InvalidOperationException($"Received message on unkown topic {topic}: {args.ApplicationMessage.PayloadSegment:X2}.");
}
}

Expand Down Expand Up @@ -274,44 +275,31 @@ private async Task ConnectInternal()
break;
}

if (this.Options.UseTls)
if (this.Options.MqttTlsOptions is not null)
{
if (this.Options.GetTlsParameters is not null)
{
MqttClientOptionsBuilderTlsParameters? tlsParameter = this.Options.GetTlsParameters();

if (tlsParameter is not null)
{
builder.WithTls(tlsParameter);
}
else
{
builder.WithTls();
}
}
else
{
builder.WithTls();
}
builder.WithTlsOptions(this.Options.MqttTlsOptions);
}
else
{
builder.WithTlsOptions(o => o.UseTls());
}

if (this.Options.WebSocketParameters is null)
if (this.Options.MqttWebSocketOptions is null)
{
builder.WithTcpServer(this.Options.BrokerAddress, this.Options.Port);
}
else
{
builder.WithWebSocketServer(this.Options.BrokerAddress, this.Options.WebSocketParameters);
}

if (this.Options.ProxyOptions is not null)
{
builder.WithProxy(
this.Options.ProxyOptions.Address,
this.Options.ProxyOptions.Username,
this.Options.ProxyOptions.Password,
this.Options.ProxyOptions.Domain,
this.Options.ProxyOptions.BypassOnLocal);
builder.WithWebSocketServer(o =>
o.WithCookieContainer(this.Options.MqttWebSocketOptions.CookieContainer)
.WithCookieContainer(this.Options.MqttWebSocketOptions.Credentials)
.WithProxyOptions(this.Options.MqttWebSocketOptions.ProxyOptions)
.WithRequestHeaders(this.Options.MqttWebSocketOptions.RequestHeaders)
.WithSubProtocols(this.Options.MqttWebSocketOptions.SubProtocols)
.WithUri(this.Options.BrokerAddress)
.WithKeepAliveInterval(this.Options.MqttWebSocketOptions.KeepAliveInterval)
.WithUseDefaultCredentials(this.Options.MqttWebSocketOptions.UseDefaultCredentials)
);
}

// Add the will message data.
Expand Down Expand Up @@ -349,7 +337,7 @@ private async Task PublishInternal()
}

// Get the online message.
var onlineMessage = this.messageGenerator.GetSparkplugNodeBirthMessage<T>(
var onlineMessage = this.messageGenerator.GetSparkplugNodeBirthMessage(
this.NameSpace,
this.Options.GroupIdentifier,
this.Options.EdgeNodeIdentifier,
Expand Down
18 changes: 6 additions & 12 deletions src/SparkplugNet/Core/Node/SparkplugNodeOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,13 @@ public sealed class SparkplugNodeOptions : SparkplugBaseOptions
/// <param name="clientId">The client identifier.</param>
/// <param name="userName">The name of the user.</param>
/// <param name="password">The password.</param>
/// <param name="useTls">A value indicating whether TLS is used or not.</param>
/// <param name="scadaHostIdentifier">The SCADA host identifier.</param>
/// <param name="groupIdentifier">The group identifier.</param>
/// <param name="edgeNodeIdentifier">The edge node identifier.</param>
/// <param name="reconnectInterval">The reconnect interval.</param>
/// <param name="mqttProtocolVersion">The MQTT protocol version.</param>
/// <param name="getTlsParameters">The delegate to provide TLS parameters.</param>
/// <param name="webSocketParameters">The web socket parameters.</param>
/// <param name="proxyOptions">The proxy options.</param>
/// <param name="mqttTlsOptions">The MQTT TLS options.</param>
/// <param name="mqttWebSocketOptions">The MQTT WebSocket options.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <seealso cref="SparkplugBaseOptions"/>
public SparkplugNodeOptions(
Expand All @@ -59,13 +57,11 @@ public sealed class SparkplugNodeOptions : SparkplugBaseOptions
string? clientId = null,
string? userName = null,
string? password = null,
bool? useTls = null,
string? scadaHostIdentifier = null,
TimeSpan? reconnectInterval = null,
SparkplugMqttProtocolVersion? mqttProtocolVersion = null,
GetTlsParametersDelegate? getTlsParameters = null,
MqttClientOptionsBuilderWebSocketParameters? webSocketParameters = null,
MqttClientWebSocketProxyOptions? proxyOptions = null,
MqttClientTlsOptions? mqttTlsOptions = null,
MqttClientWebSocketOptions? mqttWebSocketOptions = null,
string? groupIdentifier = null,
string? edgeNodeIdentifier = null,
SystemCancellationToken? cancellationToken = null)
Expand All @@ -75,13 +71,11 @@ public sealed class SparkplugNodeOptions : SparkplugBaseOptions
clientId,
userName,
password,
useTls,
scadaHostIdentifier,
reconnectInterval,
mqttProtocolVersion,
getTlsParameters,
webSocketParameters,
proxyOptions)
mqttTlsOptions,
mqttWebSocketOptions)
{
this.GroupIdentifier = string.IsNullOrWhiteSpace(groupIdentifier) ? DefaultGroupIdentifier : groupIdentifier;
this.EdgeNodeIdentifier = string.IsNullOrWhiteSpace(edgeNodeIdentifier) ? DefaultEdgeNodeIdentifier : edgeNodeIdentifier;
Expand Down

0 comments on commit dfc03bb

Please sign in to comment.