Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions src/SparkplugNet/Core/Node/SparkplugNodeBase.Device.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public partial class SparkplugNodeBase<T>
/// <summary>
/// Gets the known devices.
/// </summary>
public ConcurrentDictionary<string, List<T>> KnownDevices { get; } = new();
public ConcurrentDictionary<string, KnownMetricStorage> KnownDevices { get; } = new();

/// <summary>
/// Publishes a device birth message to the MQTT broker.
Expand Down Expand Up @@ -56,7 +56,7 @@ public async Task<MqttClientPublishResult> PublishDeviceBirthMessage(List<T> kno
this.IncrementLastSequenceNumber();

// Add the known metrics to the known devices.
this.KnownDevices.TryAdd(deviceIdentifier, knownMetrics);
this.KnownDevices.TryAdd(deviceIdentifier, new KnownMetricStorage(knownMetrics));

// Invoke the device birth event.
await this.FireDeviceBirthPublishingAsync(deviceIdentifier, knownMetrics);
Expand Down Expand Up @@ -146,6 +146,42 @@ public async Task<MqttClientPublishResult> PublishDeviceDeathMessage(string devi
/// </summary>
/// <param name="metrics">The metrics.</param>
/// <param name="deviceIdentifier">The device identifier.</param>
/// <exception cref="ArgumentNullException">Thrown if the options are null.</exception>
/// <exception cref="Exception">Thrown if the device is unknown or an invalid metric type was specified.</exception>
/// <returns>A <see cref="MqttClientPublishResult"/>.</returns>
protected abstract Task<MqttClientPublishResult> PublishMessageForDevice(IEnumerable<T> metrics, string deviceIdentifier);
protected virtual async Task<MqttClientPublishResult> PublishMessageForDevice(IEnumerable<T> metrics, string deviceIdentifier)
{
if (this.Options is null)
{
throw new ArgumentNullException(nameof(this.Options), "The options aren't set properly.");
}

if (!this.KnownDevices.TryGetValue(deviceIdentifier, out KnownMetricStorage deviceMetricStorage))
{
throw new Exception("The device is unknown, please publish a device birth message first.");
}

if (deviceMetricStorage is null)
{
throw new ArgumentNullException(deviceIdentifier, $"The KnownMetrics for the device {deviceIdentifier} aren't set properly.");
}

// Get the data message.
var dataMessage = this.MessageGenerator.GetSparkPlugDeviceDataMessage(
this.NameSpace,
this.Options.GroupIdentifier,
this.Options.EdgeNodeIdentifier,
deviceIdentifier,
deviceMetricStorage.FilterOutgoingMetrics(metrics),
this.LastSequenceNumber,
this.LastSessionNumber,
DateTimeOffset.Now,
this.Options.AddSessionNumberToDataMessages);

// Increment the sequence number.
this.IncrementLastSequenceNumber();

// Publish the message.
return await this.Client.PublishAsync(dataMessage);
}
}
2 changes: 1 addition & 1 deletion src/SparkplugNet/Core/Node/SparkplugNodeBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ private async Task PublishInternal()
{
foreach (var device in this.KnownDevices)
{
await this.PublishDeviceBirthMessage(device.Value, device.Key);
await this.PublishDeviceBirthMessage(device.Value.Metrics, device.Key);
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/SparkplugNet/Core/SparkplugBase.KnownMetricStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ namespace SparkplugNet.Core;
/// <seealso cref="ConcurrentDictionary{TKey, TValue}"/>
public class KnownMetricStorage : ConcurrentDictionary<string, T>
{
/// <summary>
/// Returns the Metrics as List
/// </summary>
/// <returns></returns>
public List<T> Metrics => this.Values.ToList();

/// <summary>
/// Initializes a new instance of the <see cref="KnownMetricStorage"/> class.
/// </summary>
Expand Down
46 changes: 0 additions & 46 deletions src/SparkplugNet/VersionA/SparkplugNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,50 +127,4 @@ protected override async Task OnMessageReceived(SparkplugMessageTopic topic, byt
}
}
}

/// <summary>
/// Publishes version A metrics for a device.
/// </summary>
/// <param name="metrics">The metrics.</param>
/// <param name="deviceIdentifier">The device identifier.</param>
/// <exception cref="ArgumentNullException">Thrown if the options are null.</exception>
/// <exception cref="Exception">Thrown if the device is unknown or an invalid metric type was specified.</exception>
/// <returns>A <see cref="MqttClientPublishResult"/>.</returns>
protected override async Task<MqttClientPublishResult> PublishMessageForDevice(IEnumerable<VersionAData.KuraMetric> metrics, string deviceIdentifier)
{
if (this.Options is null)
{
throw new ArgumentNullException(nameof(this.Options), "The options aren't set properly.");
}

if (!this.KnownDevices.ContainsKey(deviceIdentifier))
{
throw new Exception("The device is unknown, please publish a device birth message first.");
}

var deviceMetrics = this.KnownDevices[deviceIdentifier];

if (deviceMetrics is null)
{
throw new Exception("Invalid metric type specified for version A metric.");
}

// Get the data message.
var dataMessage = this.MessageGenerator.GetSparkPlugDeviceDataMessage(
this.NameSpace,
this.Options.GroupIdentifier,
this.Options.EdgeNodeIdentifier,
deviceIdentifier,
this.KnownMetricsStorage.FilterOutgoingMetrics(deviceMetrics),
this.LastSequenceNumber,
this.LastSessionNumber,
DateTimeOffset.Now,
this.Options.AddSessionNumberToDataMessages);

// Increment the sequence number.
this.IncrementLastSequenceNumber();

// Publish the message.
return await this.Client.PublishAsync(dataMessage);
}
}
2 changes: 1 addition & 1 deletion src/SparkplugNet/VersionB/SparkplugApplication.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ await this.FireNodeBirthReceivedAsync(topic.GroupIdentifier, topic.EdgeNodeIdent
this.ProcessPayload(topic, payload, SparkplugMetricStatus.Online));
break;
case SparkplugMessageType.DeviceBirth:
await this.FireDeviceBirthReceivedAsync(topic.GroupIdentifier, topic.EdgeNodeIdentifier, topic.EdgeNodeIdentifier,
await this.FireDeviceBirthReceivedAsync(topic.GroupIdentifier, topic.EdgeNodeIdentifier, topic.DeviceIdentifier,
this.ProcessPayload(topic, payload, SparkplugMetricStatus.Online));
break;
case SparkplugMessageType.NodeData:
Expand Down
46 changes: 0 additions & 46 deletions src/SparkplugNet/VersionB/SparkplugNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,50 +127,4 @@ protected override async Task OnMessageReceived(SparkplugMessageTopic topic, byt
}
}
}

/// <summary>
/// Publishes version B metrics for a device.
/// </summary>
/// <param name="metrics">The metrics.</param>
/// <param name="deviceIdentifier">The device identifier.</param>
/// <exception cref="ArgumentNullException">Thrown if the options are null.</exception>
/// <exception cref="Exception">Thrown if the device is unknown or an invalid metric type was specified.</exception>
/// <returns>A <see cref="MqttClientPublishResult"/>.</returns>
protected override async Task<MqttClientPublishResult> PublishMessageForDevice(IEnumerable<VersionBData.Metric> metrics, string deviceIdentifier)
{
if (this.Options is null)
{
throw new ArgumentNullException(nameof(this.Options), "The options aren't set properly.");
}

if (!this.KnownDevices.ContainsKey(deviceIdentifier))
{
throw new Exception("The device is unknown, please publish a device birth message first.");
}

var deviceMetrics = this.KnownDevices[deviceIdentifier];

if (deviceMetrics is null)
{
throw new ArgumentNullException(deviceIdentifier, $"The KnownMetrics for the device {deviceIdentifier} aren't set properly.");
}

// Get the data message.
var dataMessage = this.MessageGenerator.GetSparkPlugDeviceDataMessage(
this.NameSpace,
this.Options.GroupIdentifier,
this.Options.EdgeNodeIdentifier,
deviceIdentifier,
this.KnownMetricsStorage.FilterOutgoingMetrics(deviceMetrics),
this.LastSequenceNumber,
this.LastSessionNumber,
DateTimeOffset.Now,
this.Options.AddSessionNumberToDataMessages);

// Increment the sequence number.
this.IncrementLastSequenceNumber();

// Publish the message.
return await this.Client.PublishAsync(dataMessage);
}
}