Skip to content

Commit

Permalink
Adjusted filtering of messages. Fixes #48.
Browse files Browse the repository at this point in the history
  • Loading branch information
SeppPenner committed Mar 21, 2024
1 parent 9ccbddd commit 9cb2deb
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 31 deletions.
29 changes: 18 additions & 11 deletions src/SparkplugNet/VersionA/SparkplugApplication.cs
Original file line number Diff line number Diff line change
Expand Up @@ -167,22 +167,29 @@ protected override async Task OnMessageReceived(SparkplugMessageTopic topic, byt
/// <exception cref="Exception">Thrown if the metric is unknown.</exception>
private async Task HandleMessagesForVersionA(SparkplugMessageTopic topic, VersionAData.Payload payload)
{
// Filter out settion number metric.
// Filter out session number metric.
var sessionNumberMetric = payload.Metrics.FirstOrDefault(m => m.Name != Constants.SessionNumberMetricName);
var metricsWithoutSequenceMetric = payload.Metrics.Where(m => m.Name != Constants.SessionNumberMetricName);
this.KnownMetricsStorage.FilterMetrics(metricsWithoutSequenceMetric, topic.MessageType);
var filteredMetrics = this.KnownMetricsStorage.FilterMetrics(metricsWithoutSequenceMetric, topic.MessageType).ToList();

if (sessionNumberMetric is not null)
{
filteredMetrics.Add(sessionNumberMetric);
}

// Handle messages.
switch (topic.MessageType)
{
case SparkplugMessageType.NodeBirth:
await this.FireNodeBirthReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier,
this.ProcessPayload(topic, payload, SparkplugMetricStatus.Online));
this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Online));
break;
case SparkplugMessageType.DeviceBirth:
await this.FireDeviceBirthReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier, topic.EdgeNodeIdentifier,
this.ProcessPayload(topic, payload, SparkplugMetricStatus.Online));
this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Online));
break;
case SparkplugMessageType.NodeData:
var nodeDataMetrics = this.ProcessPayload(topic, payload, SparkplugMetricStatus.Online);
var nodeDataMetrics = this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Online);
await this.FireNodeDataReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier, nodeDataMetrics);
break;
case SparkplugMessageType.DeviceData:
Expand All @@ -191,11 +198,11 @@ private async Task HandleMessagesForVersionA(SparkplugMessageTopic topic, Versio
throw new InvalidOperationException($"Topic {topic} is invalid!");
}

var deviceDataMetrics = this.ProcessPayload(topic, payload, SparkplugMetricStatus.Online);
var deviceDataMetrics = this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Online);
await this.FireDeviceDataReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier, topic.DeviceIdentifier, deviceDataMetrics);
break;
case SparkplugMessageType.NodeDeath:
this.ProcessPayload(topic, payload, SparkplugMetricStatus.Offline);
this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Offline);
await this.FireNodeDeathReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier);
break;
case SparkplugMessageType.DeviceDeath:
Expand All @@ -204,7 +211,7 @@ private async Task HandleMessagesForVersionA(SparkplugMessageTopic topic, Versio
throw new InvalidOperationException($"Topic {topic} is invalid!");
}

this.ProcessPayload(topic, payload, SparkplugMetricStatus.Offline);
this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Offline);
await this.FireDeviceDeathReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier, topic.DeviceIdentifier);
break;
}
Expand All @@ -214,12 +221,12 @@ private async Task HandleMessagesForVersionA(SparkplugMessageTopic topic, Versio
/// Handles the device message.
/// </summary>
/// <param name="topic">The topic.</param>
/// <param name="payload">The payload.</param>
/// <param name="metrics">The metrics.</param>
/// <param name="metricStatus">The metric status.</param>
/// <exception cref="InvalidCastException">Thrown if the metric cast is invalid.</exception>
private IEnumerable<VersionAData.KuraMetric> ProcessPayload(
SparkplugMessageTopic topic,
VersionAData.Payload payload,
List<VersionAData.KuraMetric> metrics,
SparkplugMetricStatus metricStatus)
{
var metricState = new MetricState<VersionAData.KuraMetric>
Expand All @@ -241,7 +248,7 @@ private async Task HandleMessagesForVersionA(SparkplugMessageTopic topic, Versio
this.NodeStates[topic.EdgeNodeIdentifier] = metricState;
}

foreach (var payloadMetric in payload.Metrics)
foreach (var payloadMetric in metrics)
{
if (payloadMetric is not VersionAData.KuraMetric convertedMetric)
{
Expand Down
13 changes: 10 additions & 3 deletions src/SparkplugNet/VersionA/SparkplugNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,16 @@ protected override async Task OnMessageReceived(SparkplugMessageTopic topic, byt
private async Task HandleMessagesForVersionB(SparkplugMessageTopic topic, VersionAData.Payload payload)
{
// Filter out session number metric.
var sessionNumberMetric = payload.Metrics.FirstOrDefault(m => m.Name != Constants.SessionNumberMetricName);
var metricsWithoutSequenceMetric = payload.Metrics.Where(m => m.Name != Constants.SessionNumberMetricName);
this.KnownMetricsStorage.FilterMetrics(metricsWithoutSequenceMetric, topic.MessageType);
var filteredMetrics = this.KnownMetricsStorage.FilterMetrics(metricsWithoutSequenceMetric, topic.MessageType).ToList();

if (sessionNumberMetric is not null)
{
filteredMetrics.Add(sessionNumberMetric);
}

// Handle messages.
switch (topic.MessageType)
{
case SparkplugMessageType.DeviceCommand:
Expand All @@ -130,11 +137,11 @@ private async Task HandleMessagesForVersionB(SparkplugMessageTopic topic, Versio
throw new InvalidOperationException($"Topic {topic} is invalid!");
}

await this.FireDeviceCommandReceived(topic.DeviceIdentifier, payload.Metrics);
await this.FireDeviceCommandReceived(topic.DeviceIdentifier, filteredMetrics);
break;

case SparkplugMessageType.NodeCommand:
await this.FireNodeCommandReceived(payload.Metrics);
await this.FireNodeCommandReceived(filteredMetrics);
break;
}
}
Expand Down
29 changes: 18 additions & 11 deletions src/SparkplugNet/VersionB/SparkplugApplication.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,22 @@ protected override async Task OnMessageReceived(SparkplugMessageTopic topic, byt
/// <exception cref="Exception">Thrown if the metric is unknown.</exception>
private async Task HandleMessagesForVersionB(SparkplugMessageTopic topic, Payload payload)
{
// Filter out settion number metric.
// Filter out session number metric.
var sessionNumberMetric = payload.Metrics.FirstOrDefault(m => m.Name != Constants.SessionNumberMetricName);
var metricsWithoutSequenceMetric = payload.Metrics.Where(m => m.Name != Constants.SessionNumberMetricName);
this.KnownMetricsStorage.FilterMetrics(metricsWithoutSequenceMetric, topic.MessageType);
var filteredMetrics = this.KnownMetricsStorage.FilterMetrics(metricsWithoutSequenceMetric, topic.MessageType).ToList();

if (sessionNumberMetric is not null)
{
filteredMetrics.Add(sessionNumberMetric);
}

// Handle messages.
switch (topic.MessageType)
{
case SparkplugMessageType.NodeBirth:
await this.FireNodeBirthReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier,
this.ProcessPayload(topic, payload, SparkplugMetricStatus.Online));
this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Online));
break;
case SparkplugMessageType.DeviceBirth:
if (string.IsNullOrWhiteSpace(topic.DeviceIdentifier))
Expand All @@ -166,10 +173,10 @@ private async Task HandleMessagesForVersionB(SparkplugMessageTopic topic, Payloa
}

await this.FireDeviceBirthReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier, topic.DeviceIdentifier,
this.ProcessPayload(topic, payload, SparkplugMetricStatus.Online));
this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Online));
break;
case SparkplugMessageType.NodeData:
var nodeDataMetrics = this.ProcessPayload(topic, payload, SparkplugMetricStatus.Online);
var nodeDataMetrics = this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Online);
await this.FireNodeDataReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier, nodeDataMetrics);
break;
case SparkplugMessageType.DeviceData:
Expand All @@ -178,11 +185,11 @@ private async Task HandleMessagesForVersionB(SparkplugMessageTopic topic, Payloa
throw new InvalidOperationException($"Topic {topic} is invalid!");
}

var deviceDataMetrics = this.ProcessPayload(topic, payload, SparkplugMetricStatus.Online);
var deviceDataMetrics = this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Online);
await this.FireDeviceDataReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier, topic.DeviceIdentifier, deviceDataMetrics);
break;
case SparkplugMessageType.NodeDeath:
this.ProcessPayload(topic, payload, SparkplugMetricStatus.Offline);
this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Offline);
await this.FireNodeDeathReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier);
break;
case SparkplugMessageType.DeviceDeath:
Expand All @@ -191,7 +198,7 @@ private async Task HandleMessagesForVersionB(SparkplugMessageTopic topic, Payloa
throw new InvalidOperationException($"Topic {topic} is invalid!");
}

this.ProcessPayload(topic, payload, SparkplugMetricStatus.Offline);
this.ProcessPayload(topic, filteredMetrics, SparkplugMetricStatus.Offline);
await this.FireDeviceDeathReceived(topic.GroupIdentifier, topic.EdgeNodeIdentifier, topic.DeviceIdentifier);
break;
}
Expand All @@ -201,10 +208,10 @@ private async Task HandleMessagesForVersionB(SparkplugMessageTopic topic, Payloa
/// Handles the device message.
/// </summary>
/// <param name="topic">The topic.</param>
/// <param name="payload">The payload.</param>
/// <param name="metrics">The metrics.</param>
/// <param name="metricStatus">The metric status.</param>
/// <exception cref="InvalidCastException">Thrown if the metric cast is invalid.</exception>
private IEnumerable<Metric> ProcessPayload(SparkplugMessageTopic topic, Payload payload, SparkplugMetricStatus metricStatus)
private IEnumerable<Metric> ProcessPayload(SparkplugMessageTopic topic, List<Metric> metrics, SparkplugMetricStatus metricStatus)
{
var metricState = new MetricState<Metric>
{
Expand All @@ -225,7 +232,7 @@ private IEnumerable<Metric> ProcessPayload(SparkplugMessageTopic topic, Payload
this.NodeStates[topic.EdgeNodeIdentifier] = metricState;
}

foreach (var payloadMetric in payload.Metrics)
foreach (var payloadMetric in metrics)
{
if (payloadMetric is not Metric convertedMetric)
{
Expand Down
16 changes: 10 additions & 6 deletions src/SparkplugNet/VersionB/SparkplugNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,16 @@ protected override async Task OnMessageReceived(SparkplugMessageTopic topic, byt
private async Task HandleMessagesForVersionB(SparkplugMessageTopic topic, Payload payload)
{
// Filter out session number metric.
var sessionNumberMetric = payload.Metrics.FirstOrDefault(m => m.Name != Constants.SessionNumberMetricName);
var metricsWithoutSequenceMetric = payload.Metrics.Where(m => m.Name != Constants.SessionNumberMetricName);
this.KnownMetricsStorage.FilterMetrics(metricsWithoutSequenceMetric, topic.MessageType);
var filteredMetrics = this.KnownMetricsStorage.FilterMetrics(metricsWithoutSequenceMetric, topic.MessageType).ToList();

if (sessionNumberMetric is not null)
{
filteredMetrics.Add(sessionNumberMetric);
}

// Handle messages.
switch (topic.MessageType)
{
case SparkplugMessageType.DeviceCommand:
Expand All @@ -125,15 +132,12 @@ private async Task HandleMessagesForVersionB(SparkplugMessageTopic topic, Payloa
throw new InvalidOperationException($"Topic {topic} is invalid!");
}

await this.FireDeviceCommandReceived(topic.DeviceIdentifier, payload.Metrics);
await this.FireDeviceCommandReceived(topic.DeviceIdentifier, filteredMetrics);
break;

case SparkplugMessageType.NodeCommand:
await this.FireNodeCommandReceived(payload.Metrics);
await this.FireNodeCommandReceived(filteredMetrics);
break;
}
}

// Todo: Check exception description in the method description,
// bei casts überall exceptions werfen, wenn es nicht klappt!
}

0 comments on commit 9cb2deb

Please sign in to comment.