Skip to content

Commit

Permalink
CsvExportAdapters: Added downsampling and time reasonability validation
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchiecarroll committed Oct 28, 2022
1 parent 548cb77 commit c6d7da1
Showing 1 changed file with 141 additions and 30 deletions.
171 changes: 141 additions & 30 deletions Source/Libraries/Adapters/CsvAdapters/CsvExportAdapter.cs
Expand Up @@ -22,9 +22,13 @@
//******************************************************************************************************

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.IO;
using System.Linq;
using System.Text;
using GSF;
using GSF.Collections;
using GSF.Configuration;
using GSF.Diagnostics;
using GSF.IO;
Expand Down Expand Up @@ -54,12 +58,34 @@ public class CsvExportAdapter : OutputAdapterBase
/// </summary>
public const string DefaultTimestampFormat = "yyyy-MM-dd HH:mm:ss.fffffff";

/// <summary>
/// Default value for the <see cref="DownsampleInterval"/> property.
/// </summary>
public const double DefaultDownsampleInterval = 0.0D;

/// <summary>
/// Default value for the <see cref="EnableTimeReasonabilityValidation"/> property.
/// </summary>
public const bool DefaultEnableTimeReasonabilityValidation = false;

/// <summary>
/// Default value for the <see cref="LagTime"/> property.
/// </summary>
public const double DefaultLagTime = 5.0D;

/// <summary>
/// Default value for the <see cref="LeadTime"/> property.
/// </summary>
public const double DefaultLeadTime = 5.0D;

private const string ScheduleName = nameof(CsvExportAdapter);

// Fields
private string m_activeFileName;
private object m_activeFileLock;
private ScheduleManager m_scheduleManager;
private readonly object m_activeFileLock;
private readonly ScheduleManager m_scheduleManager;
private readonly Dictionary<Guid, long> m_lastTimestamps;
private long m_totalExports;
private bool m_disposed;

#endregion
Expand All @@ -74,6 +100,7 @@ public CsvExportAdapter()
m_activeFileLock = new object();
m_scheduleManager = new ScheduleManager();
m_scheduleManager.ScheduleDue += ScheduleManager_ScheduleDue;
m_lastTimestamps = new Dictionary<Guid, long>();
}

#endregion
Expand Down Expand Up @@ -114,6 +141,38 @@ public CsvExportAdapter()
[Description("Defines the format of timestamps in the CSV exports")]
public string TimestampFormat { get; set; }

/// <summary>
/// Gets or sets the downsampling interval, in seconds, set to zero for no downsampling.
/// </summary>
[ConnectionStringParameter]
[DefaultValue(DefaultDownsampleInterval)]
[Description("Defines the downsampling interval, in seconds, set to 0.0 for no downsampling")]
public double DownsampleInterval { get; set; }

/// <summary>
/// Gets or sets the flag that determines if timestamps should be validated against local clock for reasonability.
/// </summary>
[ConnectionStringParameter]
[DefaultValue(DefaultEnableTimeReasonabilityValidation)]
[Description("Defines the flag that determines if timestamps should be validated against local clock for reasonability")]
public bool EnableTimeReasonabilityValidation { get; set; }

/// <summary>
/// Gets or sets the allowed past time deviation tolerance against local clock for time reasonability validation, in seconds (can be sub-second).
/// </summary>
[ConnectionStringParameter]
[DefaultValue(DefaultLagTime)]
[Description("Defines the allowed past time deviation tolerance against local clock for time reasonability validation, in seconds (can be sub-second)")]
public double LagTime { get; set; }

/// <summary>
/// Gets or sets the allowed future time deviation tolerance against local clock for time reasonability validation, in seconds (can be sub-second).
/// </summary>
[ConnectionStringParameter]
[DefaultValue(DefaultLeadTime)]
[Description("Defines the allowed future time deviation tolerance against local clock for time reasonability validation, in seconds (can be sub-second)")]
public double LeadTime { get; set; }

/// <summary>
/// Gets the flag that determines if measurements sent to this <see cref="CsvExportAdapter"/> are destined for archival.
/// </summary>
Expand Down Expand Up @@ -173,13 +232,35 @@ public void OffloadLingeringFiles()
}

/// <summary>
/// Gets a short one-line status of this <see cref="CsvExportAdapter"/>.
/// Gets the status of this <see cref="CsvExportAdapter"/>.
/// </summary>
public override string GetShortStatus(int maxLength)
public override string Status
{
return $"{ProcessedMeasurements} measurements exported so far...".CenterText(maxLength);
get
{
StringBuilder status = new StringBuilder(base.Status);

status.AppendLine($" Export Path: {FilePath.TrimFileName(ExportPath, 51)}");
status.AppendLine($" Offload Path: {FilePath.TrimFileName(OffloadPath, 51)}");
status.AppendLine($" Rollover Schedule: {RolloverSchedule}");
status.AppendLine($" Timestamp Format: {TimestampFormat}");
status.AppendLine($" Downsample Interval: {(DownsampleInterval > 0.0D ? $"{DownsampleInterval:N4} seconds)" : "Disabled - Full Resolution Export")}");
status.AppendLine($" Time Reasonability Checks: {(EnableTimeReasonabilityValidation ? "Enabled" : "Disabled")}");
status.AppendLine($" Allowed Lag Time: {LagTime:N4} seconds");
status.AppendLine($" Allowed Lead Time: {LeadTime:N4} seconds");
status.AppendLine($" Active CSV Export File: {FilePath.TrimFileName(m_activeFileName, 51)}");
status.AppendLine($" Total CSV Exports: {m_totalExports:N0}");

return status.ToString();
}
}

/// <summary>
/// Gets a short one-line status of this <see cref="CsvExportAdapter"/>.
/// </summary>
public override string GetShortStatus(int maxLength) =>
$"{ProcessedMeasurements} measurements exported so far...".CenterText(maxLength);

/// <summary>
/// Attempts to connect to data output stream.
/// </summary>
Expand All @@ -204,6 +285,41 @@ protected override void AttemptDisconnection()
RollOver();
}

/// <summary>
/// Queues a collection of measurements for processing. Measurements are automatically filtered to the defined <see cref="IAdapter.InputMeasurementKeys"/>.
/// </summary>
/// <param name="measurements">Measurements to queue for processing.</param>
public override void QueueMeasurementsForProcessing(IEnumerable<IMeasurement> measurements)
{
if (EnableTimeReasonabilityValidation)
measurements = measurements.Where(measurement => measurement.Timestamp.UtcTimeIsValid(LagTime, LeadTime));

if (DownsampleInterval > 0.0D)
{
List<IMeasurement> exportMeasurements = new List<IMeasurement>();

foreach (IMeasurement measurement in measurements)
{
// Get last measurement timestamp -- initial timestamp is from top of second
long lastTimestamp = m_lastTimestamps.GetOrDefault(measurement.ID, _ =>
measurement.Timestamp.BaselinedTimestamp(BaselineTimeInterval.Second).Value);

if ((measurement.Timestamp - lastTimestamp).ToSeconds() >= DownsampleInterval)
{
exportMeasurements.Add(measurement);
m_lastTimestamps[measurement.ID] = measurement.Timestamp.Value;
}
}

measurements = exportMeasurements;
}

if (measurements.Count() == 0)
return;

base.QueueMeasurementsForProcessing(measurements);
}

/// <summary>
/// Serializes measurements to data output stream.
/// </summary>
Expand Down Expand Up @@ -235,22 +351,22 @@ protected override void ProcessMeasurements(IMeasurement[] measurements)
/// <param name="disposing">true to release both managed and unmanaged resources; false to release only unmanaged resources.</param>
protected override void Dispose(bool disposing)
{
if (!m_disposed)
if (m_disposed)
return;

try
{
try
{
if (disposing)
{
m_scheduleManager.Stop();
m_scheduleManager.Dispose();
}
}
finally
if (disposing)
{
m_disposed = true; // Prevent duplicate dispose.
base.Dispose(disposing); // Call base class Dispose().
m_scheduleManager.Stop();
m_scheduleManager.Dispose();
}
}
finally
{
m_disposed = true; // Prevent duplicate dispose.
base.Dispose(disposing); // Call base class Dispose().
}
}

// Converts the measurement to a row in CSV format.
Expand All @@ -259,15 +375,13 @@ private string ToCSV(IMeasurement measurement)
string timestamp = measurement.Timestamp.ToString(TimestampFormat);
string id = measurement.Key.SignalID.ToString();
string value = measurement.AdjustedValue.ToString();

return string.Join(",", timestamp, id, value);
}

// Generates the name of the next active file.
private string GenerateActiveFileName()
{
DateTime now = DateTime.UtcNow;
return $"{now:yyyy-MM-dd HH.mm}.csv";
}
private string GenerateActiveFileName() =>
$"{DateTime.UtcNow:yyyy-MM-dd HH.mm}.csv";

// Rolls over the active file by moving it to the offload directory
// and unsetting the active file name so that a new active file
Expand All @@ -283,6 +397,7 @@ private void RollOver()

activeFileName = m_activeFileName;
m_activeFileName = null;
m_totalExports++;
}

string activeFilePath = Path.Combine(ExportPath, activeFileName);
Expand All @@ -297,25 +412,21 @@ private void RollOver()

// Executes the rollover process to offload the
// currently active file and create a new active file.
private void ScheduleManager_ScheduleDue(object sender, EventArgs<Schedule> e)
{
private void ScheduleManager_ScheduleDue(object sender, EventArgs<Schedule> e) =>
RollOver();
}

// Handles the given exception.
private void HandleException(Exception ex)
{
private void HandleException(Exception ex) =>
OnProcessException(MessageLevel.Error, ex);
}

private StreamWriter AppendToFile(string filePath)
{
// The "Path traversal" security warning is not relevant
// because the path comes from software configuration
// performed by an administrator, not user input
#pragma warning disable SG0018 // Path traversal
#pragma warning disable SG0018 // Path traversal
return File.AppendText(filePath);
#pragma warning restore SG0018 // Path traversal
#pragma warning restore SG0018
}

#endregion
Expand Down

0 comments on commit c6d7da1

Please sign in to comment.