Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
executable file 5280 lines (4508 sloc) 266 KB
//******************************************************************************************************
// DataSubscriber.cs - Gbtc
//
// Copyright © 2012, Grid Protection Alliance. All Rights Reserved.
//
// Licensed to the Grid Protection Alliance (GPA) under one or more contributor license agreements. See
// the NOTICE file distributed with this work for additional information regarding copyright ownership.
// The GPA licenses this file to you under the MIT License (MIT), the "License"; you may
// not use this file except in compliance with the License. You may obtain a copy of the License at:
//
// http://www.opensource.org/licenses/MIT
//
// Unless agreed to in writing, the subject software distributed under the License is distributed on an
// "AS-IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Refer to the
// License for the specific language governing permissions and limitations.
//
// Code Modification History:
// ----------------------------------------------------------------------------------------------------
// 08/20/2010 - J. Ritchie Carroll
// Generated original version of source code.
// 02/07/2012 - Mehulbhai Thakkar
// Modified SynchronizeMetadata to filter devices by original source and modified insert query
// to populate OriginalSource value. Added to flag to optionally avoid meta-data synchronization.
// 12/20/2012 - Starlynn Danyelle Gilliam
// Modified Header.
//
//******************************************************************************************************
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net.Security;
using System.Net.Sockets;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Xml;
using GSF.Collections;
using GSF.Communication;
using GSF.Configuration;
using GSF.Data;
using GSF.Diagnostics;
using GSF.IO;
using GSF.Net.Security;
using GSF.Reflection;
using GSF.Security.Cryptography;
using GSF.Threading;
using GSF.TimeSeries.Adapters;
using GSF.TimeSeries.Data;
using GSF.TimeSeries.Statistics;
using GSF.TimeSeries.Transport.TSSC;
using GSF.Units;
using Random = GSF.Security.Cryptography.Random;
using TcpClient = GSF.Communication.TcpClient;
using UdpClient = GSF.Communication.UdpClient;
using System.Globalization;
#pragma warning disable 672
namespace GSF.TimeSeries.Transport
{
/// <summary>
/// Represents a data subscribing client that will connect to a data publisher for a data subscription.
/// </summary>
[Description("DataSubscriber: client that subscribes to a publishing server for a streaming data.")]
[EditorBrowsable(EditorBrowsableState.Advanced)] // Normally defined as an input device protocol
public class DataSubscriber : InputAdapterBase
{
#region [ Members ]
// Nested Types
// Local measurement concentrator
private class LocalConcentrator : ConcentratorBase
{
#region [ Members ]
// Fields
private DataSubscriber m_parent;
private bool m_disposed;
#endregion
#region [ Constructors ]
/// <summary>
/// Creates a new local concentrator.
/// </summary>
public LocalConcentrator(DataSubscriber parent)
{
m_parent = parent;
}
#endregion
#region [ Methods ]
/// <summary>
/// Releases the unmanaged resources used by the <see cref="LocalConcentrator"/> object and optionally releases the managed resources.
/// </summary>
/// <param name="disposing">true to release both managed and unmanaged resources; false to release only unmanaged resources.</param>
protected override void Dispose(bool disposing)
{
if (!m_disposed)
{
try
{
if (disposing)
m_parent = null;
}
finally
{
m_disposed = true; // Prevent duplicate dispose.
base.Dispose(disposing); // Call base class Dispose().
}
}
}
/// <summary>
/// Publish <see cref="IFrame"/> of time-aligned collection of <see cref="IMeasurement"/> values that arrived within the
/// concentrator's defined <see cref="ConcentratorBase.LagTime"/>.
/// </summary>
/// <param name="frame"><see cref="IFrame"/> of measurements with the same timestamp that arrived within <see cref="ConcentratorBase.LagTime"/> that are ready for processing.</param>
/// <param name="index">Index of <see cref="IFrame"/> within a second ranging from zero to <c><see cref="ConcentratorBase.FramesPerSecond"/> - 1</c>.</param>
protected override void PublishFrame(IFrame frame, int index)
{
// Publish locally sorted measurements
if ((object)m_parent != null)
m_parent.OnNewMeasurements(frame.Measurements.Values);
}
#endregion
}
private class SubscribedDevice : IDevice, IDisposable
{
#region [ Members ]
// Fields
private readonly string m_name;
private Guid m_statusFlagsID;
private Guid m_frequencyID;
private Guid m_deltaFrequencyID;
private long m_dataQualityErrors;
private long m_timeQualityErrors;
private long m_deviceErrors;
private long m_measurementsReceived;
private double m_measurementsExpected;
private long m_measurementsWithError;
private long m_measurementsDefined;
private bool m_disposed;
#endregion
#region [ Constructors ]
public SubscribedDevice(string name)
{
if ((object)name == null)
throw new ArgumentNullException(nameof(name));
m_name = name;
StatisticsEngine.Register(this, name, "Device", "PMU");
}
/// <summary>
/// Releases the unmanaged resources before the <see cref="SubscribedDevice"/> object is reclaimed by <see cref="GC"/>.
/// </summary>
~SubscribedDevice()
{
Unregister();
}
#endregion
#region [ Properties ]
public string Name => m_name;
public Guid StatusFlagsID
{
get
{
return m_statusFlagsID;
}
set
{
m_statusFlagsID = value;
}
}
public Guid FrequencyID
{
get
{
return m_frequencyID;
}
set
{
m_frequencyID = value;
}
}
public Guid DeltaFrequencyID
{
get
{
return m_deltaFrequencyID;
}
set
{
m_deltaFrequencyID = value;
}
}
public long DataQualityErrors
{
get
{
return Interlocked.Read(ref m_dataQualityErrors);
}
set
{
Interlocked.Exchange(ref m_dataQualityErrors, value);
}
}
public long TimeQualityErrors
{
get
{
return Interlocked.Read(ref m_timeQualityErrors);
}
set
{
Interlocked.Exchange(ref m_timeQualityErrors, value);
}
}
public long DeviceErrors
{
get
{
return Interlocked.Read(ref m_deviceErrors);
}
set
{
Interlocked.Exchange(ref m_deviceErrors, value);
}
}
public long MeasurementsReceived
{
get
{
return Interlocked.Read(ref m_measurementsReceived);
}
set
{
Interlocked.Exchange(ref m_measurementsReceived, value);
}
}
public long MeasurementsExpected
{
get
{
return (long)Interlocked.CompareExchange(ref m_measurementsExpected, 0.0D, 0.0D);
}
set
{
Interlocked.Exchange(ref m_measurementsExpected, value);
}
}
public long MeasurementsWithError
{
get
{
return Interlocked.Read(ref m_measurementsWithError);
}
set
{
Interlocked.Exchange(ref m_measurementsWithError, value);
}
}
public long MeasurementsDefined
{
get
{
return Interlocked.Read(ref m_measurementsDefined);
}
set
{
Interlocked.Exchange(ref m_measurementsDefined, value);
}
}
#endregion
#region [ Methods ]
public override bool Equals(object obj)
{
SubscribedDevice subscribedDevice = obj as SubscribedDevice;
return (object)subscribedDevice != null && m_name.Equals(subscribedDevice.m_name);
}
public override int GetHashCode()
{
return m_name.GetHashCode();
}
/// <summary>
/// Releases all the resources used by the <see cref="SubscribedDevice"/> object.
/// </summary>
public void Dispose()
{
Unregister();
GC.SuppressFinalize(this);
}
private void Unregister()
{
if (!m_disposed)
{
try
{
StatisticsEngine.Unregister(this);
}
finally
{
m_disposed = true; // Prevent duplicate dispose.
}
}
}
#endregion
}
/// <summary>
/// EventArgs implementation for handling user commands.
/// </summary>
public class UserCommandArgs : EventArgs
{
/// <summary>
/// Creates a new instance of the <see cref="UserCommandArgs"/> class.
/// </summary>
/// <param name="command">The code for the user command.</param>
/// <param name="response">The code for the server's response.</param>
/// <param name="buffer">Buffer containing the message from the server.</param>
/// <param name="startIndex">Index into the buffer used to skip the header.</param>
/// <param name="length">The length of the message in the buffer, including the header.</param>
public UserCommandArgs(ServerCommand command, ServerResponse response, byte[] buffer, int startIndex, int length)
{
Command = command;
Response = response;
Buffer = buffer;
StartIndex = startIndex;
Length = length;
}
/// <summary>
/// Gets the code for the user command.
/// </summary>
public ServerCommand Command { get; }
/// <summary>
/// Gets the code for the server's response.
/// </summary>
public ServerResponse Response { get; }
/// <summary>
/// Gets the buffer containing the message from the server.
/// </summary>
public byte[] Buffer { get; }
/// <summary>
/// Gets the index into the buffer used to skip the header.
/// </summary>
public int StartIndex { get; }
/// <summary>
/// Gets the length of the message in the buffer, including the header.
/// </summary>
public int Length { get; }
}
// Constants
/// <summary>
/// Defines default value for <see cref="DataSubscriber.OperationalModes"/>.
/// </summary>
public const OperationalModes DefaultOperationalModes = OperationalModes.CompressMetadata | OperationalModes.CompressSignalIndexCache | OperationalModes.ReceiveInternalMetadata | OperationalModes.UseCommonSerializationFormat;
/// <summary>
/// Defines the default value for the <see cref="MetadataSynchronizationTimeout"/> property.
/// </summary>
public const int DefaultMetadataSynchronizationTimeout = 0;
/// <summary>
/// Defines the default value for the <see cref="UseTransactionForMetadata"/> property.
/// </summary>
public const bool DefaultUseTransactionForMetadata = true;
/// <summary>
/// Default value for <see cref="LoggingPath"/>.
/// </summary>
public const string DefaultLoggingPath = "ConfigurationCache";
/// <summary>
/// Specifies the default value for the <see cref="AllowedParsingExceptions"/> property.
/// </summary>
public const int DefaultAllowedParsingExceptions = 10;
/// <summary>
/// Specifies the default value for the <see cref="ParsingExceptionWindow"/> property.
/// </summary>
public const long DefaultParsingExceptionWindow = 50000000L; // 5 seconds
private const int EvenKey = 0; // Even key/IV index
private const int OddKey = 1; // Odd key/IV index
private const int KeyIndex = 0; // Index of cipher key component in keyIV array
private const int IVIndex = 1; // Index of initialization vector component in keyIV array
private const long MissingCacheWarningInterval = 20000000;
// Events
/// <summary>
/// Occurs when client connection to the data publication server is established.
/// </summary>
public event EventHandler ConnectionEstablished;
/// <summary>
/// Occurs when client connection to the data publication server is terminated.
/// </summary>
public event EventHandler ConnectionTerminated;
/// <summary>
/// Occurs when client connection to the data publication server has successfully authenticated.
/// </summary>
public event EventHandler ConnectionAuthenticated;
/// <summary>
/// Occurs when client receives response from the server.
/// </summary>
public event EventHandler<EventArgs<ServerResponse, ServerCommand>> ReceivedServerResponse;
/// <summary>
/// Occurs when client receives message from the server in response to a user command.
/// </summary>
public event EventHandler<UserCommandArgs> ReceivedUserCommandResponse;
/// <summary>
/// Occurs when client receives requested meta-data transmitted by data publication server.
/// </summary>
public event EventHandler<EventArgs<DataSet>> MetaDataReceived;
/// <summary>
/// Occurs when first measurement is transmitted by data publication server.
/// </summary>
public event EventHandler<EventArgs<Ticks>> DataStartTime;
/// <summary>
/// Indicates that processing for an input adapter (via temporal session) has completed.
/// </summary>
/// <remarks>
/// This event is expected to only be raised when an input adapter has been designed to process
/// a finite amount of data, e.g., reading a historical range of data during temporal processing.
/// </remarks>
public new event EventHandler<EventArgs<string>> ProcessingComplete;
/// <summary>
/// Occurs when a notification has been received from the <see cref="DataPublisher"/>.
/// </summary>
public event EventHandler<EventArgs<string>> NotificationReceived;
/// <summary>
/// Occurs when the server has sent a notification that its configuration has changed, this
/// can allow subscriber to request updated meta-data if desired.
/// </summary>
public event EventHandler ServerConfigurationChanged;
/// <summary>
/// Occurs when number of parsing exceptions exceed <see cref="AllowedParsingExceptions"/> during <see cref="ParsingExceptionWindow"/>.
/// </summary>
public event EventHandler ExceededParsingExceptionThreshold;
// Fields
private volatile Dictionary<Guid, DeviceStatisticsHelper<SubscribedDevice>> m_subscribedDevicesLookup;
private volatile List<DeviceStatisticsHelper<SubscribedDevice>> m_statisticsHelpers;
private readonly LongSynchronizedOperation m_registerStatisticsOperation;
private IClient m_commandChannel;
private UdpClient m_dataChannel;
private bool m_useZeroMQChannel;
private LocalConcentrator m_localConcentrator;
private bool m_tsscResetRequested;
private TsscDecoder m_tsscDecoder;
private ushort m_tsscSequenceNumber;
private SharedTimer m_dataStreamMonitor;
private long m_commandChannelConnectionAttempts;
private long m_dataChannelConnectionAttempts;
private volatile SignalIndexCache m_remoteSignalIndexCache;
private volatile SignalIndexCache m_signalIndexCache;
private volatile long[] m_baseTimeOffsets;
private volatile int m_timeIndex;
private volatile byte[][][] m_keyIVs;
private volatile bool m_authenticated;
private volatile bool m_subscribed;
private volatile int m_lastBytesReceived;
private long m_monitoredBytesReceived;
private long m_totalBytesReceived;
private long m_lastMissingCacheWarning;
private Guid m_nodeID;
private int m_gatewayProtocolID;
private SecurityMode m_securityMode;
private bool m_synchronizedSubscription;
private bool m_useMillisecondResolution;
private bool m_requestNaNValueFilter;
private bool m_autoConnect;
private string m_metadataFilters;
private string m_sharedSecret;
private string m_authenticationID;
private string m_localCertificate;
private string m_remoteCertificate;
private SslPolicyErrors m_validPolicyErrors;
private X509ChainStatusFlags m_validChainFlags;
private bool m_checkCertificateRevocation;
private bool m_internal;
private bool m_includeTime;
private bool m_filterOutputMeasurements;
private bool m_autoSynchronizeMetadata;
private bool m_useTransactionForMetadata;
private bool m_useSourcePrefixNames;
private bool m_useLocalClockAsRealTime;
private bool m_metadataRefreshPending;
private int m_metadataSynchronizationTimeout;
private readonly LongSynchronizedOperation m_synchronizeMetadataOperation;
private volatile DataSet m_receivedMetadata;
private DataSet m_synchronizedMetadata;
private DateTime m_lastMetaDataRefreshTime;
private OperationalModes m_operationalModes;
private Encoding m_encoding;
private string m_loggingPath;
private RunTimeLog m_runTimeLog;
private bool m_bypassingStatistics;
private bool m_dataGapRecoveryEnabled;
private DataGapRecoverer m_dataGapRecoverer;
private int m_parsingExceptionCount;
private long m_lastParsingExceptionTime;
private int m_allowedParsingExceptions;
private Ticks m_parsingExceptionWindow;
private bool m_supportsRealTimeProcessing;
private bool m_supportsTemporalProcessing;
//private Ticks m_lastMeasurementCheck;
//private Ticks m_minimumMissingMeasurementThreshold = 5;
//private double m_transmissionDelayTimeAdjustment = 5.0;
private readonly List<BufferBlockMeasurement> m_bufferBlockCache;
private uint m_expectedBufferBlockSequenceNumber;
private Ticks m_realTime;
private Ticks m_lastStatisticsHelperUpdate;
private SharedTimer m_subscribedDevicesTimer;
private long m_lifetimeMeasurements;
private long m_minimumMeasurementsPerSecond;
private long m_maximumMeasurementsPerSecond;
private long m_totalMeasurementsPerSecond;
private long m_measurementsPerSecondCount;
private long m_measurementsInSecond;
private long m_lastSecondsSinceEpoch;
private long m_lifetimeTotalLatency;
private long m_lifetimeMinimumLatency;
private long m_lifetimeMaximumLatency;
private long m_lifetimeLatencyMeasurements;
private long m_syncProgressTotalActions;
private long m_syncProgressActionsCount;
private long m_syncProgressUpdateInterval;
private long m_syncProgressLastMessage;
private bool m_disposed;
#endregion
#region [ Constructors ]
/// <summary>
/// Creates a new <see cref="DataSubscriber"/>.
/// </summary>
public DataSubscriber()
{
m_registerStatisticsOperation = new LongSynchronizedOperation(HandleDeviceStatisticsRegistration)
{
IsBackground = true
};
m_synchronizeMetadataOperation = new LongSynchronizedOperation(SynchronizeMetadata)
{
IsBackground = true
};
m_encoding = Encoding.Unicode;
m_operationalModes = DefaultOperationalModes;
m_metadataSynchronizationTimeout = DefaultMetadataSynchronizationTimeout;
m_allowedParsingExceptions = DefaultAllowedParsingExceptions;
m_parsingExceptionWindow = DefaultParsingExceptionWindow;
string loggingPath = FilePath.GetDirectoryName(FilePath.GetAbsolutePath(DefaultLoggingPath));
if (Directory.Exists(loggingPath))
m_loggingPath = loggingPath;
// Default to not using transactions for meta-data on SQL server (helps avoid deadlocks)
try
{
using (AdoDataConnection database = new AdoDataConnection("systemSettings"))
{
m_useTransactionForMetadata = database.DatabaseType != DatabaseType.SQLServer;
}
}
catch
{
m_useTransactionForMetadata = DefaultUseTransactionForMetadata;
}
DataLossInterval = 10.0D;
m_bufferBlockCache = new List<BufferBlockMeasurement>();
m_useLocalClockAsRealTime = true;
m_useSourcePrefixNames = true;
}
#endregion
#region [ Properties ]
/// <summary>
/// Gets or sets the security mode used for communications over the command channel.
/// </summary>
public SecurityMode SecurityMode
{
get
{
return m_securityMode;
}
set
{
m_securityMode = value;
}
}
/// <summary>
/// Gets or sets flag that determines if <see cref="DataPublisher"/> requires subscribers to authenticate before making data requests.
/// </summary>
public bool RequireAuthentication
{
get
{
return m_securityMode != SecurityMode.None;
}
set
{
m_securityMode = value ? SecurityMode.Gateway : SecurityMode.None;
}
}
/// <summary>
/// Gets or sets flag that determines if ZeroMQ should be used for command channel communications.
/// </summary>
public bool UseZeroMQChannel
{
get
{
return m_useZeroMQChannel;
}
set
{
m_useZeroMQChannel = value;
}
}
/// <summary>
/// Gets or sets logging path to be used to be runtime and outage logs of the subscriber which are required for
/// automated data recovery.
/// </summary>
/// <remarks>
/// Leave value blank for default path, i.e., installation folder. Can be a fully qualified path or a path that
/// is relative to the installation folder, e.g., a value of "ConfigurationCache" might resolve to
/// "C:\Program Files\MyTimeSeriespPp\ConfigurationCache\".
/// </remarks>
public string LoggingPath
{
get
{
return m_loggingPath;
}
set
{
if (!string.IsNullOrWhiteSpace(value))
{
string loggingPath = FilePath.GetDirectoryName(FilePath.GetAbsolutePath(value));
if (Directory.Exists(loggingPath))
value = loggingPath;
}
m_loggingPath = value;
}
}
/// <summary>
/// Gets or sets flag that determines if <see cref="DataSubscriber"/> should attempt to auto-connection to <see cref="DataPublisher"/> using defined connection settings.
/// </summary>
public bool AutoConnect
{
get
{
return m_autoConnect;
}
set
{
m_autoConnect = value;
}
}
/// <summary>
/// Gets or sets flag that determines if <see cref="DataSubscriber"/> should
/// automatically request meta-data synchronization and synchronize publisher
/// meta-data with its own database configuration.
/// </summary>
public bool AutoSynchronizeMetadata
{
get
{
return m_autoSynchronizeMetadata;
}
set
{
m_autoSynchronizeMetadata = value;
}
}
/// <summary>
/// Gets flag that indicates whether the connection will be persisted
/// even while the adapter is offline in order to synchronize metadata.
/// </summary>
public bool PersistConnectionForMetadata =>
!AutoStart && AutoSynchronizeMetadata && !this.TemporalConstraintIsDefined();
/// <summary>
/// Gets or sets flag that determines if child devices associated with a subscription
/// should be prefixed with the subscription name and an exclamation point to ensure
/// device name uniqueness - recommended value is <c>true</c>.
/// </summary>
public bool UseSourcePrefixNames
{
get
{
return m_useSourcePrefixNames;
}
set
{
m_useSourcePrefixNames = value;
}
}
/// <summary>
/// Gets or sets requested meta-data filter expressions to be applied by <see cref="DataPublisher"/> before meta-data is sent.
/// </summary>
/// <remarks>
/// Multiple meta-data filters, such filters for different data tables, should be separated by a semicolon. Specifying fields in the filter
/// expression that do not exist in the data publisher's current meta-data set could cause filter expressions to not be applied and possibly
/// result in no meta-data being received for the specified data table.
/// </remarks>
/// <example>
/// FILTER MeasurementDetail WHERE SignalType &lt;&gt; 'STAT'; FILTER PhasorDetail WHERE Phase = '+'
/// </example>
public string MetadataFilters
{
get
{
return m_metadataFilters;
}
set
{
m_metadataFilters = value;
}
}
/// <summary>
/// Gets or sets flag that informs publisher if base time-offsets can use millisecond resolution to conserve bandwidth.
/// </summary>
[Obsolete("SubscriptionInfo object defines this parameter.", false)]
public bool UseMillisecondResolution
{
get
{
return m_useMillisecondResolution;
}
set
{
m_useMillisecondResolution = value;
}
}
/// <summary>
/// Gets flag that determines whether the command channel is connected.
/// </summary>
public bool CommandChannelConnected =>
(object)m_commandChannel != null &&
m_commandChannel.Enabled;
/// <summary>
/// Gets flag that determines if this <see cref="DataSubscriber"/> has successfully authenticated with the <see cref="DataPublisher"/>.
/// </summary>
public bool Authenticated => m_authenticated;
/// <summary>
/// Gets total data packet bytes received during this session.
/// </summary>
public long TotalBytesReceived => m_totalBytesReceived;
/// <summary>
/// Gets or sets data loss monitoring interval, in seconds. Set to zero to disable monitoring.
/// </summary>
public double DataLossInterval
{
get
{
if ((object)m_dataStreamMonitor != null)
return m_dataStreamMonitor.Interval / 1000.0D;
return 0.0D;
}
set
{
if (value > 0.0D)
{
if ((object)m_dataStreamMonitor == null)
{
// Create data stream monitoring timer
m_dataStreamMonitor = Common.TimerScheduler.CreateTimer();
m_dataStreamMonitor.Elapsed += m_dataStreamMonitor_Elapsed;
m_dataStreamMonitor.AutoReset = true;
m_dataStreamMonitor.Enabled = false;
}
// Set user specified interval
m_dataStreamMonitor.Interval = (int)(value * 1000.0D);
}
else
{
// Disable data monitor
if ((object)m_dataStreamMonitor != null)
{
m_dataStreamMonitor.Elapsed -= m_dataStreamMonitor_Elapsed;
m_dataStreamMonitor.Dispose();
}
m_dataStreamMonitor = null;
}
}
}
/// <summary>
/// Gets or sets a set of flags that define ways in
/// which the subscriber and publisher communicate.
/// </summary>
public OperationalModes OperationalModes
{
get
{
return m_operationalModes;
}
set
{
OperationalEncoding operationalEncoding;
m_operationalModes = value;
operationalEncoding = (OperationalEncoding)(value & OperationalModes.EncodingMask);
m_encoding = GetCharacterEncoding(operationalEncoding);
}
}
/// <summary>
/// Gets or sets the operational mode flag to compress meta-data.
/// </summary>
public bool CompressMetadata
{
get
{
return m_operationalModes.HasFlag(OperationalModes.CompressMetadata);
}
set
{
if (value)
m_operationalModes |= OperationalModes.CompressMetadata;
else
m_operationalModes &= ~OperationalModes.CompressMetadata;
}
}
/// <summary>
/// Gets or sets the operational mode flag to compress the signal index cache.
/// </summary>
public bool CompressSignalIndexCache
{
get
{
return m_operationalModes.HasFlag(OperationalModes.CompressSignalIndexCache);
}
set
{
if (value)
m_operationalModes |= OperationalModes.CompressSignalIndexCache;
else
m_operationalModes &= ~OperationalModes.CompressSignalIndexCache;
}
}
/// <summary>
/// Gets or sets the operational mode flag to compress data payloads.
/// </summary>
public bool CompressPayload
{
get
{
return m_operationalModes.HasFlag(OperationalModes.CompressPayloadData);
}
set
{
if (value)
m_operationalModes |= OperationalModes.CompressPayloadData;
else
m_operationalModes &= ~OperationalModes.CompressPayloadData;
}
}
/// <summary>
/// Gets or sets the operational mode flag to receive internal meta-data.
/// </summary>
public bool ReceiveInternalMetadata
{
get
{
return m_operationalModes.HasFlag(OperationalModes.ReceiveInternalMetadata);
}
set
{
if (value)
m_operationalModes |= OperationalModes.ReceiveInternalMetadata;
else
m_operationalModes &= ~OperationalModes.ReceiveInternalMetadata;
}
}
/// <summary>
/// Gets or sets the operational mode flag to receive external meta-data.
/// </summary>
public bool ReceiveExternalMetadata
{
get
{
return m_operationalModes.HasFlag(OperationalModes.ReceiveExternalMetadata);
}
set
{
if (value)
m_operationalModes |= OperationalModes.ReceiveExternalMetadata;
else
m_operationalModes &= ~OperationalModes.ReceiveExternalMetadata;
}
}
/// <summary>
/// Gets or sets the operational mode flag to use the common serialization format.
/// </summary>
public bool UseCommonSerializationFormat
{
get
{
return m_operationalModes.HasFlag(OperationalModes.UseCommonSerializationFormat);
}
set
{
if (value)
m_operationalModes |= OperationalModes.UseCommonSerializationFormat;
else
m_operationalModes &= ~OperationalModes.UseCommonSerializationFormat;
}
}
/// <summary>
/// Gets or sets the <see cref="OperationalEncoding"/> used by the subscriber and publisher.
/// </summary>
public OperationalEncoding OperationalEncoding
{
get
{
return (OperationalEncoding)(m_operationalModes & OperationalModes.EncodingMask);
}
set
{
m_operationalModes &= ~OperationalModes.EncodingMask;
m_operationalModes |= (OperationalModes)value;
m_encoding = GetCharacterEncoding(value);
}
}
/// <summary>
/// Gets or sets the <see cref="CompressionModes"/> used by the subscriber and publisher.
/// </summary>
public CompressionModes CompressionModes
{
get
{
return (CompressionModes)(m_operationalModes & OperationalModes.CompressionModeMask);
}
set
{
m_operationalModes &= ~OperationalModes.CompressionModeMask;
m_operationalModes |= (OperationalModes)value;
if (value.HasFlag(CompressionModes.TSSC))
CompressPayload = true;
}
}
/// <summary>
/// Gets the version number of the protocol in use by this subscriber.
/// </summary>
public int Version => (int)(m_operationalModes & OperationalModes.VersionMask);
/// <summary>
/// Gets the character encoding defined by the
/// <see cref="OperationalEncoding"/> of the communications stream.
/// </summary>
public Encoding Encoding => m_encoding;
/// <summary>
/// Gets flag indicating if this adapter supports real-time processing.
/// </summary>
/// <remarks>
/// Setting this value to false indicates that the adapter should not be enabled unless it exists within a temporal session.
/// As an example, this flag can be used in a gateway system to set up two separate subscribers: one to the PDC for real-time
/// data streams and one to the historian for historical data streams. In this scenario, the assumption is that the PDC is
/// the data source for the historian, implying that only local data is destined for archival.
/// </remarks>
public bool SupportsRealTimeProcessing => m_supportsRealTimeProcessing;
/// <summary>
/// Gets the flag indicating if this adapter supports temporal processing.
/// </summary>
/// <remarks>
/// <para>
/// Although the data subscriber provisions support for temporal processing by receiving historical data from a remote source,
/// the adapter opens sockets and does not need to be engaged within an actual temporal <see cref="IaonSession"/>, therefore
/// this method normally returns <c>false</c> to make sure the adapter doesn't get instantiated within a temporal session.
/// </para>
/// <para>
/// Setting this to <c>true</c> means that a subscriber will be initialized within a temporal session to provide historical
/// data from a remote source - this should only be enabled in cases where (1) there is no locally defined, e.g., in-process,
/// historian that can already provide historical data for temporal sessions, and (2) a remote subscriber should be allowed
/// to proxy temporal requests, e.g., those requested for data gap recovery, to an up-stream subscription. This is useful in
/// cases where a primary data subscriber that has data gap recovery enabled can also allow a remote subscription to proxy in
/// data gap recovery requests. It is recommended that remote data gap recovery request parameters be (1) either slightly
/// looser than those of local system to reduce the possibility of duplicated recovery sessions for the same data loss, or
/// (2) only enabled in the end-most system that most needs the recovered data, like a historian.
/// </para>
/// </remarks>
public override bool SupportsTemporalProcessing => m_supportsTemporalProcessing;
/// <summary>
/// Gets or sets the desired processing interval, in milliseconds, for the adapter.
/// </summary>
/// <remarks>
/// With the exception of the values of -1 and 0, this value specifies the desired processing interval for data, i.e.,
/// basically a delay, or timer interval, over which to process data. A value of -1 means to use the default processing
/// interval while a value of 0 means to process data as fast as possible.
/// </remarks>
public override int ProcessingInterval
{
get
{
return base.ProcessingInterval;
}
set
{
base.ProcessingInterval = value;
// Request server update the processing interval
SendServerCommand(ServerCommand.UpdateProcessingInterval, BigEndian.GetBytes(value));
}
}
/// <summary>
/// Gets or sets the timeout used when executing database queries during meta-data synchronization.
/// </summary>
public int MetadataSynchronizationTimeout
{
get
{
return m_metadataSynchronizationTimeout;
}
set
{
m_metadataSynchronizationTimeout = value;
}
}
/// <summary>
/// Gets or sets flag that determines if meta-data synchronization should be performed within a transaction.
/// </summary>
public bool UseTransactionForMetadata
{
get
{
return m_useTransactionForMetadata;
}
set
{
m_useTransactionForMetadata = value;
}
}
/// <summary>
/// Gets or sets flag that determines whether to use the local clock when calculating statistics.
/// </summary>
public bool UseLocalClockAsRealTime
{
get
{
return m_useLocalClockAsRealTime;
}
set
{
m_useLocalClockAsRealTime = value;
}
}
/// <summary>
/// Gets or sets number of parsing exceptions allowed during <see cref="ParsingExceptionWindow"/> before connection is reset.
/// </summary>
public int AllowedParsingExceptions
{
get
{
return m_allowedParsingExceptions;
}
set
{
m_allowedParsingExceptions = value;
}
}
/// <summary>
/// Gets or sets time duration, in <see cref="Ticks"/>, to monitor parsing exceptions.
/// </summary>
public Ticks ParsingExceptionWindow
{
get
{
return m_parsingExceptionWindow;
}
set
{
m_parsingExceptionWindow = value;
}
}
/// <summary>
/// Gets or sets <see cref="DataSet"/> based data source available to this <see cref="DataSubscriber"/>.
/// </summary>
public override DataSet DataSource
{
get
{
return base.DataSource;
}
set
{
base.DataSource = value;
m_registerStatisticsOperation.RunOnce();
bool outputMeasurementsUpdated = AutoConnect && UpdateOutputMeasurements();
// For automatic connections, when meta-data refresh is complete, update output measurements to see if any
// points for subscription have changed after re-application of filter expressions and if so, resubscribe
if (outputMeasurementsUpdated && Enabled && CommandChannelConnected)
{
OnStatusMessage(MessageLevel.Info, "Meta-data received from publisher modified measurement availability, adjusting active subscription...");
// Updating subscription will restart data stream monitor upon successful resubscribe
if (AutoStart)
SubscribeToOutputMeasurements(true);
}
if ((object)m_dataGapRecoverer != null)
m_dataGapRecoverer.DataSource = value;
}
}
/// <summary>
/// Gets or sets output measurement keys that are requested by other adapters based on what adapter says it can provide.
/// </summary>
public override MeasurementKey[] RequestedOutputMeasurementKeys
{
get
{
return base.RequestedOutputMeasurementKeys;
}
set
{
MeasurementKey[] oldKeys = base.RequestedOutputMeasurementKeys ?? new MeasurementKey[0];
MeasurementKey[] newKeys = value ?? new MeasurementKey[0];
HashSet<MeasurementKey> oldKeySet = new HashSet<MeasurementKey>(oldKeys);
base.RequestedOutputMeasurementKeys = value;
if (!AutoStart && Enabled && CommandChannelConnected && !oldKeySet.SetEquals(newKeys))
{
OnStatusMessage(MessageLevel.Info, "Requested measurements have changed, adjusting active subscription...");
SubscribeToOutputMeasurements(true);
}
}
}
/// <summary>
/// Gets or sets output measurements that the <see cref="AdapterBase"/> will produce, if any.
/// </summary>
public override IMeasurement[] OutputMeasurements
{
get
{
return base.OutputMeasurements;
}
set
{
base.OutputMeasurements = value;
if ((object)m_dataGapRecoverer != null)
m_dataGapRecoverer.FilterExpression = this.OutputMeasurementKeys().Select(key => key.SignalID.ToString()).ToDelimitedString(';');
}
}
/// <summary>
/// Gets connection info for adapter, if any.
/// </summary>
public override string ConnectionInfo
{
get
{
string commandChannelServerUri = m_commandChannel?.ServerUri;
string dataChannelServerUri = m_dataChannel?.ServerUri;
if (string.IsNullOrWhiteSpace(commandChannelServerUri) && string.IsNullOrWhiteSpace(dataChannelServerUri))
return null;
if (string.IsNullOrWhiteSpace(dataChannelServerUri))
return commandChannelServerUri;
if (string.IsNullOrWhiteSpace(commandChannelServerUri))
return dataChannelServerUri;
return $"{commandChannelServerUri} / {dataChannelServerUri}";
}
}
/// <summary>
/// Gets the status of this <see cref="DataSubscriber"/>.
/// </summary>
/// <remarks>
/// Derived classes should provide current status information about the adapter for display purposes.
/// </remarks>
public override string Status
{
get
{
StringBuilder status = new StringBuilder();
status.AppendFormat(" Subscription mode: {0}", m_synchronizedSubscription ? "Remotely Synchronized" : (object)m_localConcentrator == null ? "Unsynchronized" : "Locally Synchronized");
status.AppendLine();
status.AppendFormat(" Authenticated: {0}", m_authenticated);
status.AppendLine();
status.AppendFormat(" Subscribed: {0}", m_subscribed);
status.AppendLine();
status.AppendFormat(" Security mode: {0}", SecurityMode);
status.AppendLine();
status.AppendFormat(" Compression modes: {0}", CompressionModes);
status.AppendLine();
if ((object)m_dataChannel != null)
{
status.AppendFormat(" UDP Data packet security: {0}", (object)m_keyIVs == null ? "Unencrypted" : "Encrypted");
status.AppendLine();
}
status.AppendFormat(" Data monitor enabled: {0}", (object)m_dataStreamMonitor != null && m_dataStreamMonitor.Enabled);
status.AppendLine();
status.AppendFormat(" Logging path: {0}", FilePath.TrimFileName(m_loggingPath.ToNonNullNorWhiteSpace(FilePath.GetAbsolutePath("")), 51));
status.AppendLine();
if (DataLossInterval > 0.0D)
status.AppendFormat("No data reconnect interval: {0:0.000} seconds", DataLossInterval);
else
status.Append("No data reconnect interval: disabled");
status.AppendLine();
status.AppendFormat(" Data gap recovery mode: {0}", m_dataGapRecoveryEnabled ? "Enabled" : "Disabled");
status.AppendLine();
if (m_dataGapRecoveryEnabled && (object)m_dataGapRecoverer != null)
status.Append(m_dataGapRecoverer.Status);
if ((object)m_runTimeLog != null)
{
status.AppendLine();
status.AppendLine("Run-Time Log Status".CenterText(50));
status.AppendLine("-------------------".CenterText(50));
status.AppendFormat(m_runTimeLog.Status);
}
if ((object)m_dataChannel != null)
{
status.AppendLine();
status.AppendLine("Data Channel Status".CenterText(50));
status.AppendLine("-------------------".CenterText(50));
status.Append(m_dataChannel.Status);
}
if ((object)m_commandChannel != null)
{
status.AppendLine();
status.AppendLine("Command Channel Status".CenterText(50));
status.AppendLine("----------------------".CenterText(50));
status.Append(m_commandChannel.Status);
}
if ((object)m_localConcentrator != null)
{
status.AppendLine();
status.AppendLine("Local Concentrator Status".CenterText(50));
status.AppendLine("-------------------------".CenterText(50));
status.Append(m_localConcentrator.Status);
}
status.Append(base.Status);
return status.ToString();
}
}
/// <summary>
/// Gets a flag that determines if this <see cref="DataSubscriber"/> uses an asynchronous connection.
/// </summary>
protected override bool UseAsyncConnect => true;
/// <summary>
/// Gets or sets reference to <see cref="UdpClient"/> data channel, attaching and/or detaching to events as needed.
/// </summary>
protected UdpClient DataChannel
{
get
{
return m_dataChannel;
}
set
{
if ((object)m_dataChannel != null)
{
// Detach from events on existing data channel reference
m_dataChannel.ConnectionException -= m_dataChannel_ConnectionException;
m_dataChannel.ConnectionAttempt -= m_dataChannel_ConnectionAttempt;
m_dataChannel.ReceiveData -= m_dataChannel_ReceiveData;
m_dataChannel.ReceiveDataException -= m_dataChannel_ReceiveDataException;
if ((object)m_dataChannel != value)
m_dataChannel.Dispose();
}
// Assign new data channel reference
m_dataChannel = value;
if ((object)m_dataChannel != null)
{
// Attach to desired events on new data channel reference
m_dataChannel.ConnectionException += m_dataChannel_ConnectionException;
m_dataChannel.ConnectionAttempt += m_dataChannel_ConnectionAttempt;
m_dataChannel.ReceiveData += m_dataChannel_ReceiveData;
m_dataChannel.ReceiveDataException += m_dataChannel_ReceiveDataException;
}
}
}
/// <summary>
/// Gets or sets reference to <see cref="Communication.TcpClient"/> command channel, attaching and/or detaching to events as needed.
/// </summary>
protected IClient CommandChannel
{
get
{
return m_commandChannel;
}
set
{
if ((object)m_commandChannel != null)
{
// Detach from events on existing command channel reference
m_commandChannel.ConnectionAttempt -= m_commandChannel_ConnectionAttempt;
m_commandChannel.ConnectionEstablished -= m_commandChannel_ConnectionEstablished;
m_commandChannel.ConnectionException -= m_commandChannel_ConnectionException;
m_commandChannel.ConnectionTerminated -= m_commandChannel_ConnectionTerminated;
m_commandChannel.ReceiveData -= m_commandChannel_ReceiveData;
m_commandChannel.ReceiveDataException -= m_commandChannel_ReceiveDataException;
m_commandChannel.SendDataException -= m_commandChannel_SendDataException;
if (m_commandChannel != value)
m_commandChannel.Dispose();
}
// Assign new command channel reference
m_commandChannel = value;
if ((object)m_commandChannel != null)
{
// Attach to desired events on new command channel reference
m_commandChannel.ConnectionAttempt += m_commandChannel_ConnectionAttempt;
m_commandChannel.ConnectionEstablished += m_commandChannel_ConnectionEstablished;
m_commandChannel.ConnectionException += m_commandChannel_ConnectionException;
m_commandChannel.ConnectionTerminated += m_commandChannel_ConnectionTerminated;
m_commandChannel.ReceiveData += m_commandChannel_ReceiveData;
m_commandChannel.ReceiveDataException += m_commandChannel_ReceiveDataException;
m_commandChannel.SendDataException += m_commandChannel_SendDataException;
}
}
}
/// <summary>
/// Gets the total number of measurements processed through this data publisher over the lifetime of the subscriber.
/// </summary>
public long LifetimeMeasurements => m_lifetimeMeasurements;
/// <summary>
/// Gets the minimum value of the measurements per second calculation.
/// </summary>
public long MinimumMeasurementsPerSecond => m_minimumMeasurementsPerSecond;
/// <summary>
/// Gets the maximum value of the measurements per second calculation.
/// </summary>
public long MaximumMeasurementsPerSecond => m_maximumMeasurementsPerSecond;
/// <summary>
/// Gets the average value of the measurements per second calculation.
/// </summary>
public long AverageMeasurementsPerSecond
{
get
{
if (m_measurementsPerSecondCount == 0L)
return 0L;
return m_totalMeasurementsPerSecond / m_measurementsPerSecondCount;
}
}
/// <summary>
/// Gets the minimum latency calculated over the full lifetime of the subscriber.
/// </summary>
public int LifetimeMinimumLatency => (int)Ticks.ToMilliseconds(m_lifetimeMinimumLatency);
/// <summary>
/// Gets the maximum latency calculated over the full lifetime of the subscriber.
/// </summary>
public int LifetimeMaximumLatency => (int)Ticks.ToMilliseconds(m_lifetimeMaximumLatency);
/// <summary>
/// Gets the average latency calculated over the full lifetime of the subscriber.
/// </summary>
public int LifetimeAverageLatency
{
get
{
if (m_lifetimeLatencyMeasurements == 0)
return -1;
return (int)Ticks.ToMilliseconds(m_lifetimeTotalLatency / m_lifetimeLatencyMeasurements);
}
}
/// <summary>
/// Gets real-time as determined by either the local clock or the latest measurement received.
/// </summary>
protected Ticks RealTime => m_useLocalClockAsRealTime ? (Ticks)DateTime.UtcNow.Ticks : m_realTime;
#endregion
#region [ Methods ]
/// <summary>
/// Releases the unmanaged resources used by the <see cref="DataSubscriber"/> object and optionally releases the managed resources.
/// </summary>
/// <param name="disposing">true to release both managed and unmanaged resources; false to release only unmanaged resources.</param>
protected override void Dispose(bool disposing)
{
if (!m_disposed)
{
try
{
if (disposing)
{
DataLossInterval = 0.0D;
CommandChannel = null;
DataChannel = null;
DisposeLocalConcentrator();
if ((object)m_dataGapRecoverer != null)
{
m_dataGapRecoverer.RecoveredMeasurements -= m_dataGapRecoverer_RecoveredMeasurements;
m_dataGapRecoverer.StatusMessage -= m_dataGapRecoverer_StatusMessage;
m_dataGapRecoverer.ProcessException -= m_dataGapRecoverer_ProcessException;
m_dataGapRecoverer.Dispose();
m_dataGapRecoverer = null;
}
if ((object)m_runTimeLog != null)
{
m_runTimeLog.ProcessException -= m_runTimeLog_ProcessException;
m_runTimeLog.Dispose();
m_runTimeLog = null;
}
if ((object)m_subscribedDevicesTimer != null)
{
m_subscribedDevicesTimer.Elapsed -= SubscribedDevicesTimer_Elapsed;
m_subscribedDevicesTimer.Dispose();
m_subscribedDevicesTimer = null;
}
}
}
finally
{
m_disposed = true; // Prevent duplicate dispose.
base.Dispose(disposing); // Call base class Dispose().
}
}
}
/// <summary>
/// Initializes <see cref="DataSubscriber"/>.
/// </summary>
public override void Initialize()
{
base.Initialize();
Dictionary<string, string> settings = Settings;
// Setup connection to data publishing server with or without authentication required
if (settings.TryGetValue("requireAuthentication", out string setting))
RequireAuthentication = setting.ParseBoolean();
// See if user has opted for different operational modes
if (settings.TryGetValue("operationalModes", out setting) && Enum.TryParse(setting, true, out OperationalModes operationalModes))
OperationalModes = operationalModes;
// Set the security mode if explicitly defined
if (!settings.TryGetValue("securityMode", out setting) || !Enum.TryParse(setting, true, out m_securityMode))
m_securityMode = SecurityMode.None;
// Apply gateway compression mode to operational mode flags
if (settings.TryGetValue("compressionModes", out setting) && Enum.TryParse(setting, true, out CompressionModes compressionModes))
CompressionModes = compressionModes;
// Check if output measurements should be filtered to only those belonging to the subscriber
m_filterOutputMeasurements = !settings.TryGetValue("filterOutputMeasurements", out setting) || setting.ParseBoolean();
// Check if the subscriber supports real-time and historical processing
m_supportsRealTimeProcessing = !settings.TryGetValue("supportsRealTimeProcessing", out setting) || setting.ParseBoolean();
m_supportsTemporalProcessing = settings.TryGetValue("supportsTemporalProcessing", out setting) && setting.ParseBoolean();
if (settings.TryGetValue("useZeroMQChannel", out setting))
m_useZeroMQChannel = setting.ParseBoolean();
// FUTURE: Remove this exception when CURVE is enabled in GSF ZeroMQ library
if (m_useZeroMQChannel && m_securityMode == SecurityMode.TLS)
throw new ArgumentException("CURVE security settings are not yet available for GSF ZeroMQ client channel.");
// Settings specific to Gateway security
if (m_securityMode == SecurityMode.Gateway)
{
if (!settings.TryGetValue("sharedSecret", out m_sharedSecret) || string.IsNullOrWhiteSpace(m_sharedSecret))
throw new ArgumentException("The \"sharedSecret\" setting must be defined when using Gateway security mode.");
if (!settings.TryGetValue("authenticationID", out m_authenticationID) || string.IsNullOrWhiteSpace(m_authenticationID))
throw new ArgumentException("The \"authenticationID\" setting must be defined when using Gateway security mode.");
}
// Settings specific to Transport Layer Security
if (m_securityMode == SecurityMode.TLS)
{
if (!settings.TryGetValue("localCertificate", out m_localCertificate) || !File.Exists(m_localCertificate))
m_localCertificate = GetLocalCertificate();
if (!settings.TryGetValue("remoteCertificate", out m_remoteCertificate) || !RemoteCertificateExists())
throw new ArgumentException("The \"remoteCertificate\" setting must be defined and certificate file must exist when using TLS security mode.");
if (!settings.TryGetValue("validPolicyErrors", out setting) || !Enum.TryParse(setting, out m_validPolicyErrors))
m_validPolicyErrors = SslPolicyErrors.None;
if (!settings.TryGetValue("validChainFlags", out setting) || !Enum.TryParse(setting, out m_validChainFlags))
m_validChainFlags = X509ChainStatusFlags.NoError;
if (settings.TryGetValue("checkCertificateRevocation", out setting) && !string.IsNullOrWhiteSpace(setting))
m_checkCertificateRevocation = setting.ParseBoolean();
else
m_checkCertificateRevocation = true;
}
// Check if measurements for this connection should be marked as "internal" - i.e., owned and allowed for proxy
if (settings.TryGetValue("internal", out setting))
m_internal = setting.ParseBoolean();
// Check if user has explicitly defined the ReceiveInternalMetadata flag
if (settings.TryGetValue("receiveInternalMetadata", out setting))
ReceiveInternalMetadata = setting.ParseBoolean();
// Check if user has explicitly defined the ReceiveExternalMetadata flag
if (settings.TryGetValue("receiveExternalMetadata", out setting))
ReceiveExternalMetadata = setting.ParseBoolean();
// Check if user has defined a meta-data synchronization timeout
if (settings.TryGetValue("metadataSynchronizationTimeout", out setting) && int.TryParse(setting, out int metadataSynchronizationTimeout))
m_metadataSynchronizationTimeout = metadataSynchronizationTimeout;
// Check if user has defined a flag for using a transaction during meta-data synchronization
if (settings.TryGetValue("useTransactionForMetadata", out setting))
m_useTransactionForMetadata = setting.ParseBoolean();
// Check if user wants to request that publisher use millisecond resolution to conserve bandwidth
if (settings.TryGetValue("useMillisecondResolution", out setting))
m_useMillisecondResolution = setting.ParseBoolean();
// Check if user wants to request that publisher remove NaN from the data stream to conserve bandwidth
if (settings.TryGetValue("requestNaNValueFilter", out setting))
m_requestNaNValueFilter = setting.ParseBoolean();
// Check if user has defined any meta-data filter expressions
if (settings.TryGetValue("metadataFilters", out setting))
m_metadataFilters = setting;
// Define auto connect setting
if (settings.TryGetValue("autoConnect", out setting))
{
m_autoConnect = setting.ParseBoolean();
if (m_autoConnect)
m_autoSynchronizeMetadata = true;
}
// Define the maximum allowed exceptions before resetting the connection
if (settings.TryGetValue("allowedParsingExceptions", out setting))
m_allowedParsingExceptions = int.Parse(setting);
// Define the window of time over which parsing exceptions are tolerated
if (settings.TryGetValue("parsingExceptionWindow", out setting))
m_parsingExceptionWindow = Ticks.FromSeconds(double.Parse(setting));
// Check if synchronize meta-data is explicitly enabled or disabled
if (settings.TryGetValue("synchronizeMetadata", out setting))
m_autoSynchronizeMetadata = setting.ParseBoolean();
// Determine if source name prefixes should be applied during metadata synchronization
if (settings.TryGetValue("useSourcePrefixNames", out setting))
m_useSourcePrefixNames = setting.ParseBoolean();
// Define data loss interval
if (settings.TryGetValue("dataLossInterval", out setting) && double.TryParse(setting, out double interval))
DataLossInterval = interval;
// Define buffer size
if (!settings.TryGetValue("bufferSize", out setting) || !int.TryParse(setting, out int bufferSize))
bufferSize = ClientBase.DefaultReceiveBufferSize;
if (settings.TryGetValue("useLocalClockAsRealTime", out setting))
m_useLocalClockAsRealTime = setting.ParseBoolean();
if (m_autoConnect)
{
// Connect to local events when automatically engaging connection cycle
ConnectionAuthenticated += DataSubscriber_ConnectionAuthenticated;
MetaDataReceived += DataSubscriber_MetaDataReceived;
// Update output measurements to include "subscribed" points
UpdateOutputMeasurements(true);
}
else if (m_autoSynchronizeMetadata)
{
// Output measurements do not include "subscribed" points,
// but should still be filtered if applicable
TryFilterOutputMeasurements();
}
if (m_securityMode != SecurityMode.TLS)
{
if (m_useZeroMQChannel)
{
// Create a new ZeroMQ Dealer
ZeroMQClient commandChannel = new ZeroMQClient();
// Initialize default settings
commandChannel.PersistSettings = false;
commandChannel.MaxConnectionAttempts = 1;
commandChannel.ReceiveBufferSize = bufferSize;
commandChannel.SendBufferSize = bufferSize;
// Assign command channel client reference and attach to needed events
CommandChannel = commandChannel;
}
else
{
// Create a new TCP client
TcpClient commandChannel = new TcpClient();
// Initialize default settings
commandChannel.PayloadAware = true;
commandChannel.PersistSettings = false;
commandChannel.MaxConnectionAttempts = 1;
commandChannel.ReceiveBufferSize = bufferSize;
commandChannel.SendBufferSize = bufferSize;
commandChannel.NoDelay = true;
// Assign command channel client reference and attach to needed events
CommandChannel = commandChannel;
}
}
else
{
if (m_useZeroMQChannel)
{
// Create a new ZeroMQ Dealer with CURVE security enabled
ZeroMQClient commandChannel = new ZeroMQClient();
// Initialize default settings
commandChannel.PersistSettings = false;
commandChannel.MaxConnectionAttempts = 1;
commandChannel.ReceiveBufferSize = bufferSize;
commandChannel.SendBufferSize = bufferSize;
// FUTURE: Parse certificate and pass keys to ZeroMQClient for CURVE security
// Assign command channel client reference and attach to needed events
CommandChannel = commandChannel;
}
else
{
// Create a new TLS client and certificate checker
TlsClient commandChannel = new TlsClient();
SimpleCertificateChecker certificateChecker = new SimpleCertificateChecker();
// Set up certificate checker
certificateChecker.TrustedCertificates.Add(new X509Certificate2(FilePath.GetAbsolutePath(m_remoteCertificate)));
certificateChecker.ValidPolicyErrors = m_validPolicyErrors;
certificateChecker.ValidChainFlags = m_validChainFlags;
// Initialize default settings
commandChannel.PayloadAware = true;
commandChannel.PersistSettings = false;
commandChannel.MaxConnectionAttempts = 1;
commandChannel.CertificateFile = FilePath.GetAbsolutePath(m_localCertificate);
commandChannel.CheckCertificateRevocation = m_checkCertificateRevocation;
commandChannel.CertificateChecker = certificateChecker;
commandChannel.ReceiveBufferSize = bufferSize;
commandChannel.SendBufferSize = bufferSize;
commandChannel.NoDelay = true;
// Assign command channel client reference and attach to needed events
CommandChannel = commandChannel;
}
}
// Get proper connection string - either from specified command channel or from base connection string
if (settings.TryGetValue("commandChannel", out setting))
m_commandChannel.ConnectionString = setting;
else
m_commandChannel.ConnectionString = ConnectionString;
// Check for simplified compression setup flag
if (settings.TryGetValue("compression", out setting) && setting.ParseBoolean())
{
CompressionModes |= CompressionModes.TSSC | CompressionModes.GZip;
OperationalModes |= OperationalModes.CompressPayloadData | OperationalModes.CompressMetadata | OperationalModes.CompressSignalIndexCache | OperationalModes.UseCommonSerializationFormat;
}
// Get logging path, if any has been defined
if (settings.TryGetValue("loggingPath", out setting))
{
setting = FilePath.GetDirectoryName(FilePath.GetAbsolutePath(setting));
if (Directory.Exists(setting))
m_loggingPath = setting;
else
OnStatusMessage(MessageLevel.Info, $"Logging path \"{setting}\" not found, defaulting to \"{FilePath.GetAbsolutePath("")}\"...", flags: MessageFlags.UsageIssue);
}
// Initialize data gap recovery processing, if requested
if (settings.TryGetValue("dataGapRecovery", out setting))
{
// Make sure setting exists to allow user to by-pass phasor data source validation at startup
ConfigurationFile configFile = ConfigurationFile.Current;
CategorizedSettingsElementCollection systemSettings = configFile.Settings["systemSettings"];
CategorizedSettingsElement dataGapRecoveryEnabledSetting = systemSettings["DataGapRecoveryEnabled"];
// See if this node should process phasor source validation
if ((object)dataGapRecoveryEnabledSetting == null || dataGapRecoveryEnabledSetting.ValueAsBoolean())
{
// Example connection string for data gap recovery:
// dataGapRecovery={enabled=true; recoveryStartDelay=10.0; minimumRecoverySpan=0.0; maximumRecoverySpan=3600.0}
Dictionary<string, string> dataGapSettings = setting.ParseKeyValuePairs();
if (dataGapSettings.TryGetValue("enabled", out setting) && setting.ParseBoolean())
{
// Remove dataGapRecovery connection setting from command channel connection string, if defined there.
// This will prevent any recursive data gap recovery operations from being established:
Dictionary<string, string> connectionSettings = m_commandChannel.ConnectionString.ParseKeyValuePairs();
connectionSettings.Remove("dataGapRecovery");
connectionSettings.Remove("autoConnect");
connectionSettings.Remove("synchronizeMetadata");
connectionSettings.Remove("outputMeasurements");
// Note that the data gap recoverer will connect on the same command channel port as
// the real-time subscriber (TCP only)
m_dataGapRecoveryEnabled = true;
m_dataGapRecoverer = new DataGapRecoverer();
m_dataGapRecoverer.SourceConnectionName = Name;
m_dataGapRecoverer.DataSource = DataSource;
m_dataGapRecoverer.ConnectionString = string.Join("; ", $"autoConnect=false; synchronizeMetadata=false{(string.IsNullOrWhiteSpace(m_loggingPath) ? "" : "; loggingPath=" + m_loggingPath)}", dataGapSettings.JoinKeyValuePairs(), connectionSettings.JoinKeyValuePairs());
m_dataGapRecoverer.FilterExpression = this.OutputMeasurementKeys().Select(key => key.SignalID.ToString()).ToDelimitedString(';');
m_dataGapRecoverer.RecoveredMeasurements += m_dataGapRecoverer_RecoveredMeasurements;
m_dataGapRecoverer.StatusMessage += m_dataGapRecoverer_StatusMessage;
m_dataGapRecoverer.ProcessException += m_dataGapRecoverer_ProcessException;
m_dataGapRecoverer.Initialize();
}
else
{
m_dataGapRecoveryEnabled = false;
}
}
}
else
{
m_dataGapRecoveryEnabled = false;
}
if (!settings.TryGetValue("BypassStatistics", out setting) || !setting.ParseBoolean())
{
void statisticsCalculated(object sender, EventArgs args) => ResetMeasurementsPerSecondCounters();
StatisticsEngine.Register(this, "Subscriber", "SUB");
StatisticsEngine.Calculated += statisticsCalculated;
Disposed += (sender, args) => StatisticsEngine.Calculated -= statisticsCalculated;
}
else
{
m_bypassingStatistics = true;
}
if (PersistConnectionForMetadata)
m_commandChannel.ConnectAsync();
Initialized = true;
}
// Gets the path to the local certificate from the configuration file
private string GetLocalCertificate()
{
CategorizedSettingsElement localCertificateElement = ConfigurationFile.Current.Settings["systemSettings"]["LocalCertificate"];
string localCertificate = null;
if ((object)localCertificateElement != null)
localCertificate = localCertificateElement.Value;
if ((object)localCertificate == null || !File.Exists(FilePath.GetAbsolutePath(localCertificate)))
throw new InvalidOperationException("Unable to find local certificate. Local certificate file must exist when using TLS security mode.");
return localCertificate;
}
// Checks if the specified certificate exists
private bool RemoteCertificateExists()
{
string fullPath = FilePath.GetAbsolutePath(m_remoteCertificate);
CategorizedSettingsElement remoteCertificateElement;
if (!File.Exists(fullPath))
{
remoteCertificateElement = ConfigurationFile.Current.Settings["systemSettings"]["RemoteCertificatesPath"];
if ((object)remoteCertificateElement != null)
{
m_remoteCertificate = Path.Combine(remoteCertificateElement.Value, m_remoteCertificate);
fullPath = FilePath.GetAbsolutePath(m_remoteCertificate);
}
}
return File.Exists(fullPath);
}
// Initialize (or reinitialize) the output measurements associated with the data subscriber.
// Returns true if output measurements were updated, otherwise false if they remain the same.
private bool UpdateOutputMeasurements(bool initialCall = false)
{
IMeasurement[] originalOutputMeasurements = OutputMeasurements;
// Reapply output measurements if reinitializing - this way filter expressions and/or sourceIDs
// will be reapplied. This can be important after a meta-data refresh which may have added new
// measurements that could now be applicable as desired output measurements.
if (!initialCall)
{
if (Settings.TryGetValue("outputMeasurements", out string setting))
OutputMeasurements = ParseOutputMeasurements(DataSource, true, setting);
OutputSourceIDs = OutputSourceIDs;
}
// If active measurements are defined, attempt to defined desired subscription points from there
if (m_filterOutputMeasurements && (object)DataSource != null && DataSource.Tables.Contains("ActiveMeasurements"))
{
try
{
// Filter to points associated with this subscriber that have been requested for subscription, are enabled and not owned locally
DataRow[] filteredRows = DataSource.Tables["ActiveMeasurements"].Select("Subscribed <> 0");
List<IMeasurement> subscribedMeasurements = new List<IMeasurement>();
Guid signalID;
foreach (DataRow row in filteredRows)
{
// Create a new measurement for the provided field level information
Measurement measurement = new Measurement();
// Parse primary measurement identifier
signalID = row["SignalID"].ToNonNullString(Guid.Empty.ToString()).ConvertToType<Guid>();
// Set measurement key if defined
MeasurementKey key = MeasurementKey.LookUpOrCreate(signalID, row["ID"].ToString());
measurement.Metadata = key.Metadata;
subscribedMeasurements.Add(measurement);
}
if (subscribedMeasurements.Count > 0)
{
// Combine subscribed output measurement with any existing output measurement and return unique set
if ((object)OutputMeasurements == null)
OutputMeasurements = subscribedMeasurements.ToArray();
else
OutputMeasurements = subscribedMeasurements.Concat(OutputMeasurements).Distinct().ToArray();
}
}
catch (Exception ex)
{
// Errors here may not be catastrophic, this simply limits the auto-assignment of input measurement keys desired for subscription
OnProcessException(MessageLevel.Warning, new InvalidOperationException($"Failed to apply subscribed measurements to subscription filter: {ex.Message}", ex));
}
}
// Ensure that we are not attempting to subscribe to
// measurements that we know cannot be published
TryFilterOutputMeasurements();
// Determine if output measurements have changed
return originalOutputMeasurements.CompareTo(OutputMeasurements, false) != 0;
}
// When synchronizing meta-data, the publisher sends meta-data for all possible signals we can subscribe to.
// Here we check each signal defined in OutputMeasurements to determine whether that signal was defined in
// the published meta-data rather than blindly attempting to subscribe to all signals.
private void TryFilterOutputMeasurements()
{
if (!m_filterOutputMeasurements)
return;
IEnumerable<Guid> measurementIDs;
ISet<Guid> measurementIDSet;
Guid signalID = Guid.Empty;
try
{
if ((object)OutputMeasurements != null && (object)DataSource != null && DataSource.Tables.Contains("ActiveMeasurements"))
{
// Have to use a Convert expression for DeviceID column in Select function
// here since SQLite doesn't report data types for COALESCE based columns
measurementIDs = DataSource.Tables["ActiveMeasurements"]
.Select($"Convert(DeviceID, 'System.String') = '{ID}'")
.Where(row => Guid.TryParse(row["SignalID"].ToNonNullString(), out signalID))
.Select(row => signalID);
measurementIDSet = new HashSet<Guid>(measurementIDs);
OutputMeasurements = OutputMeasurements.Where(measurement => measurementIDSet.Contains(measurement.ID)).ToArray();
}
}
catch (Exception ex)
{
OnProcessException(MessageLevel.Warning, new InvalidOperationException($"Error when filtering output measurements by device ID: {ex.Message}", ex));
}
}
/// <summary>
/// Authenticates subscriber to a data publisher.
/// </summary>
/// <param name="sharedSecret">Shared secret used to look up private crypto key and initialization vector.</param>
/// <param name="authenticationID">Authentication ID that publisher will use to validate subscriber identity.</param>
/// <returns><c>true</c> if authentication transmission was successful; otherwise <c>false</c>.</returns>
public virtual bool Authenticate(string sharedSecret, string authenticationID)
{
if (!string.IsNullOrWhiteSpace(authenticationID))
{
try
{
using (BlockAllocatedMemoryStream buffer = new BlockAllocatedMemoryStream())
{
byte[] salt = new byte[DataPublisher.CipherSaltLength];
byte[] bytes;
// Generate some random prefix data to make sure auth key transmission is always unique
Random.GetBytes(salt);
// Get encoded bytes of authentication key
bytes = salt.Combine(m_encoding.GetBytes(authenticationID));
// Encrypt authentication key
bytes = bytes.Encrypt(sharedSecret, CipherStrength.Aes256);
// Write encoded authentication key length into buffer
buffer.Write(BigEndian.GetBytes(bytes.Length), 0, 4);
// Encode encrypted authentication key into buffer
buffer.Write(bytes, 0, bytes.Length);
// Send authentication command to server with associated command buffer
return SendServerCommand(ServerCommand.Authenticate, buffer.ToArray());
}
}
catch (Exception ex)
{
OnProcessException(MessageLevel.Error, new InvalidOperationException("Exception occurred while trying to authenticate publisher subscription: " + ex.Message, ex));
}
}
else
{
OnProcessException(MessageLevel.Error, new InvalidOperationException("Cannot authenticate subscription without a connection string."));
}
return false;
}
/// <summary>
/// Subscribes (or re-subscribes) to a data publisher for a set of data points.
/// </summary>
/// <param name="info">Configuration object that defines the subscription.</param>
/// <returns><c>true</c> if subscribe transmission was successful; otherwise <c>false</c>.</returns>
public bool Subscribe(SubscriptionInfo info)
{
SynchronizedSubscriptionInfo synchronizedSubscriptionInfo = info as SynchronizedSubscriptionInfo;
if ((object)synchronizedSubscriptionInfo != null)
return SynchronizedSubscribe(synchronizedSubscriptionInfo);
UnsynchronizedSubscriptionInfo unsynchronizedSubscriptionInfo = info as UnsynchronizedSubscriptionInfo;
if ((object)unsynchronizedSubscriptionInfo != null)
return UnsynchronizedSubscribe(unsynchronizedSubscriptionInfo);
throw new NotSupportedException("Type of subscription used is not supported");
}
/// <summary>
/// Subscribes (or re-subscribes) to a data publisher for a synchronized set of data points.
/// </summary>
/// <param name="info">Configuration object that defines the subscription.</param>
/// <returns><c>true</c> if subscribe transmission was successful; otherwise <c>false</c>.</returns>
public bool SynchronizedSubscribe(SynchronizedSubscriptionInfo info)
{
StringBuilder connectionString = new StringBuilder();
AssemblyInfo assemblyInfo = new AssemblyInfo(typeof(DataSubscriber).Assembly);
// Dispose of any previously established local concentrator
DisposeLocalConcentrator();
if (info.RemotelySynchronized)
{
connectionString.AppendFormat("framesPerSecond={0};", info.FramesPerSecond);
connectionString.AppendFormat("lagTime={0};", info.LagTime);
connectionString.AppendFormat("leadTime={0};", info.LeadTime);
connectionString.AppendFormat("includeTime=false;");
connectionString.AppendFormat("useLocalClockAsRealTime={0};", info.UseLocalClockAsRealTime);
connectionString.AppendFormat("ignoreBadTimestamps={0};", info.IgnoreBadTimestamps);
connectionString.AppendFormat("allowSortsByArrival={0};", info.AllowSortsByArrival);
connectionString.AppendFormat("timeResolution={0};", info.TimeResolution);
connectionString.AppendFormat("allowPreemptivePublishing={0};", info.AllowPreemptivePublishing);
connectionString.AppendFormat("requestNaNValueFilter={0};", info.RequestNaNValueFilter);
connectionString.AppendFormat("downsamplingMethod={0};", info.DownsamplingMethod);
connectionString.AppendFormat("processingInterval={0};", info.ProcessingInterval);
connectionString.AppendFormat("assemblyInfo={{source={0};version={1}.{2}.{3};buildDate={4:yyyy-MM-dd HH:mm:ss}}};", assemblyInfo.Name, assemblyInfo.Version.Major, assemblyInfo.Version.Minor, assemblyInfo.Version.Build, assemblyInfo.BuildDate);
if (!string.IsNullOrWhiteSpace(info.FilterExpression))
connectionString.AppendFormat("inputMeasurementKeys={{{0}}};", info.FilterExpression);
if (info.UdpDataChannel)
connectionString.AppendFormat("dataChannel={{localport={0}}};", info.DataChannelLocalPort);
if (!string.IsNullOrWhiteSpace(info.StartTime))
connectionString.AppendFormat("startTimeConstraint={0};", info.StartTime);
if (!string.IsNullOrWhiteSpace(info.StopTime))
connectionString.AppendFormat("stopTimeConstraint={0};", info.StopTime);
if (!string.IsNullOrWhiteSpace(info.ConstraintParameters))
connectionString.AppendFormat("timeConstraintParameters={0};", info.ConstraintParameters);
if (!string.IsNullOrWhiteSpace(info.ExtraConnectionStringParameters))
connectionString.AppendFormat("{0};", info.ExtraConnectionStringParameters);
return Subscribe(true, info.UseCompactMeasurementFormat, connectionString.ToString());
}
// Locally concentrated subscription simply uses an unsynchronized subscription and concentrates the
// measurements on the subscriber side
if (Subscribe(FromLocallySynchronizedInfo(info)))
{
// Establish a local concentrator to synchronize received measurements
LocalConcentrator localConcentrator = new LocalConcentrator(this);
localConcentrator.ProcessException += m_localConcentrator_ProcessException;
localConcentrator.FramesPerSecond = info.FramesPerSecond;
localConcentrator.LagTime = info.LagTime;
localConcentrator.LeadTime = info.LeadTime;
localConcentrator.UseLocalClockAsRealTime = info.UseLocalClockAsRealTime;
localConcentrator.IgnoreBadTimestamps = info.IgnoreBadTimestamps;
localConcentrator.AllowSortsByArrival = info.AllowSortsByArrival;
localConcentrator.TimeResolution = info.TimeResolution;
localConcentrator.AllowPreemptivePublishing = info.AllowPreemptivePublishing;
localConcentrator.DownsamplingMethod = info.DownsamplingMethod;
localConcentrator.UsePrecisionTimer = false;
// Parse time constraints, if defined
DateTime startTimeConstraint = !string.IsNullOrWhiteSpace(info.StartTime) ? ParseTimeTag(info.StartTime) : DateTime.MinValue;
DateTime stopTimeConstraint = !string.IsNullOrWhiteSpace(info.StopTime) ? ParseTimeTag(info.StopTime) : DateTime.MaxValue;
// When processing historical data, timestamps should not be evaluated for reasonability
if (startTimeConstraint != DateTime.MinValue || stopTimeConstraint != DateTime.MaxValue)
{
localConcentrator.PerformTimestampReasonabilityCheck = false;
localConcentrator.LeadTime = double.MaxValue;
}
// Assign alternate processing interval, if defined
if (info.ProcessingInterval != -1)
localConcentrator.ProcessingInterval = info.ProcessingInterval;
// Start local concentrator
localConcentrator.Start();
// Move concentrator to member variable
Interlocked.Exchange(ref m_localConcentrator, localConcentrator);
return true;
}
return false;
}
/// <summary>
/// Subscribes (or re-subscribes) to a data publisher for an unsynchronized set of data points.
/// </summary>
/// <param name="info">Configuration object that defines the subscription.</param>
/// <returns><c>true</c> if subscribe transmission was successful; otherwise <c>false</c>.</returns>
public bool UnsynchronizedSubscribe(UnsynchronizedSubscriptionInfo info)
{
// Dispose of any previously established local concentrator
DisposeLocalConcentrator();
StringBuilder connectionString = new StringBuilder();
AssemblyInfo assemblyInfo = new AssemblyInfo(typeof(DataSubscriber).Assembly);
connectionString.AppendFormat("trackLatestMeasurements={0};", info.Throttled);
connectionString.AppendFormat("publishInterval={0};", info.PublishInterval);
connectionString.AppendFormat("includeTime={0};", info.IncludeTime);
connectionString.AppendFormat("lagTime={0};", info.LagTime);
connectionString.AppendFormat("leadTime={0};", info.LeadTime);
connectionString.AppendFormat("useLocalClockAsRealTime={0};", info.UseLocalClockAsRealTime);
connectionString.AppendFormat("processingInterval={0};", info.ProcessingInterval);
connectionString.AppendFormat("useMillisecondResolution={0};", info.UseMillisecondResolution);
connectionString.AppendFormat("requestNaNValueFilter={0};", info.RequestNaNValueFilter);
connectionString.AppendFormat("assemblyInfo={{source={0};version={1}.{2}.{3};buildDate={4:yyyy-MM-dd HH:mm:ss}}};", assemblyInfo.Name, assemblyInfo.Version.Major, assemblyInfo.Version.Minor, assemblyInfo.Version.Build, assemblyInfo.BuildDate);
if (!string.IsNullOrWhiteSpace(info.FilterExpression))
connectionString.AppendFormat("inputMeasurementKeys={{{0}}};", info.FilterExpression);
if (info.UdpDataChannel)
connectionString.AppendFormat("dataChannel={{localport={0}}};", info.DataChannelLocalPort);
if (!string.IsNullOrWhiteSpace(info.StartTime))
connectionString.AppendFormat("startTimeConstraint={0};", info.StartTime);
if (!string.IsNullOrWhiteSpace(info.StopTime))
connectionString.AppendFormat("stopTimeConstraint={0};", info.StopTime);
if (!string.IsNullOrWhiteSpace(info.ConstraintParameters))
connectionString.AppendFormat("timeConstraintParameters={0};", info.ConstraintParameters);
if (!string.IsNullOrWhiteSpace(info.ExtraConnectionStringParameters))
connectionString.AppendFormat("{0};", info.ExtraConnectionStringParameters);
// Make sure not to monitor for data loss any faster than down-sample time on throttled connections - additionally
// you will want to make sure data stream monitor is twice lag-time to allow time for initial points to arrive.
if (info.Throttled && (object)m_dataStreamMonitor != null && m_dataStreamMonitor.Interval / 1000.0D < info.LagTime)
m_dataStreamMonitor.Interval = (int)(2.0D * info.LagTime * 1000.0D);
// Set millisecond resolution member variable for compact measurement parsing
m_useMillisecondResolution = info.UseMillisecondResolution;
return Subscribe(false, info.UseCompactMeasurementFormat, connectionString.ToString());
}
/// <summary>
/// Subscribes (or re-subscribes) to a data publisher for a remotely synchronized set of data points.
/// </summary>
/// <param name="compactFormat">Boolean value that determines if the compact measurement format should be used. Set to <c>false</c> for full fidelity measurement serialization; otherwise set to <c>true</c> for bandwidth conservation.</param>
/// <param name="framesPerSecond">The desired number of data frames per second.</param>
/// <param name="lagTime">Allowed past time deviation tolerance, in seconds (can be sub-second).</param>
/// <param name="leadTime">Allowed future time deviation tolerance, in seconds (can be sub-second).</param>
/// <param name="filterExpression">Filtering expression that defines the measurements that are being subscribed.</param>
/// <param name="dataChannel">Desired UDP return data channel connection string to use for data packet transmission. Set to <c>null</c> to use TCP channel for data transmission.</param>
/// <param name="useLocalClockAsRealTime">Boolean value that determines whether or not to use the local clock time as real-time.</param>
/// <param name="ignoreBadTimestamps">Boolean value that determines if bad timestamps (as determined by measurement's timestamp quality) should be ignored when sorting measurements.</param>
/// <param name="allowSortsByArrival"> Gets or sets flag that determines whether or not to allow incoming measurements with bad timestamps to be sorted by arrival time.</param>
/// <param name="timeResolution">Gets or sets the maximum time resolution, in ticks, to use when sorting measurements by timestamps into their proper destination frame.</param>
/// <param name="allowPreemptivePublishing">Gets or sets flag that allows system to preemptively publish frames assuming all expected measurements have arrived.</param>
/// <param name="downsamplingMethod">Gets the total number of down-sampled measurements processed by the concentrator.</param>
/// <param name="startTime">Defines a relative or exact start time for the temporal constraint to use for historical playback.</param>
/// <param name="stopTime">Defines a relative or exact stop time for the temporal constraint to use for historical playback.</param>
/// <param name="constraintParameters">Defines any temporal parameters related to the constraint to use for historical playback.</param>
/// <param name="processingInterval">Defines the desired processing interval milliseconds, i.e., historical play back speed, to use when temporal constraints are defined.</param>
/// <param name="waitHandleNames">Comma separated list of wait handle names used to establish external event wait handles needed for inter-adapter synchronization.</param>
/// <param name="waitHandleTimeout">Maximum wait time for external events, in milliseconds, before proceeding.</param>
/// <returns><c>true</c> if subscribe transmission was successful; otherwise <c>false</c>.</returns>
/// <remarks>
/// <para>
/// When the <paramref name="startTime"/> or <paramref name="stopTime"/> temporal processing constraints are defined (i.e., not <c>null</c>), this
/// specifies the start and stop time over which the subscriber session will process data. Passing in <c>null</c> for the <paramref name="startTime"/>
/// and <paramref name="stopTime"/> specifies the subscriber session will process data in standard, i.e., real-time, operation.
/// </para>
/// <para>
/// With the exception of the values of -1 and 0, the <paramref name="processingInterval"/> value specifies the desired historical playback data
/// processing interval in milliseconds. This is basically a delay, or timer interval, over which to process data. Setting this value to -1 means
/// to use the default processing interval while setting the value to 0 means to process data as fast as possible.
/// </para>
/// <para>
/// The <paramref name="startTime"/> and <paramref name="stopTime"/> parameters can be specified in one of the
/// following formats:
/// <list type="table">
/// <listheader>
/// <term>Time Format</term>
/// <description>Format Description</description>
/// </listheader>
/// <item>
/// <term>12-30-2000 23:59:59.033</term>
/// <description>Absolute date and time.</description>
/// </item>
/// <item>
/// <term>*</term>
/// <description>Evaluates to <see cref="DateTime.UtcNow"/>.</description>
/// </item>
/// <item>
/// <term>*-20s</term>
/// <description>Evaluates to 20 seconds before <see cref="DateTime.UtcNow"/>.</description>
/// </item>
/// <item>
/// <term>*-10m</term>
/// <description>Evaluates to 10 minutes before <see cref="DateTime.UtcNow"/>.</description>
/// </item>
/// <item>
/// <term>*-1h</term>
/// <description>Evaluates to 1 hour before <see cref="DateTime.UtcNow"/>.</description>
/// </item>
/// <item>
/// <term>*-1d</term>
/// <description>Evaluates to 1 day before <see cref="DateTime.UtcNow"/>.</description>
/// </item>
/// </list>
/// </para>
/// </remarks>
[Obsolete("Preferred method uses SubscriptionInfo object to subscribe.", false)]
public virtual bool RemotelySynchronizedSubscribe(bool compactFormat, int framesPerSecond, double lagTime, double leadTime, string filterExpression, string dataChannel = null, bool useLocalClockAsRealTime = false, bool ignoreBadTimestamps = false, bool allowSortsByArrival = true, long timeResolution = Ticks.PerMillisecond, bool allowPreemptivePublishing = true, DownsamplingMethod downsamplingMethod = DownsamplingMethod.LastReceived, string startTime = null, string stopTime = null, string constraintParameters = null, int processingInterval = -1, string waitHandleNames = null, int waitHandleTimeout = 0)
{
// Dispose of any previously established local concentrator
DisposeLocalConcentrator();
StringBuilder connectionString = new StringBuilder();
AssemblyInfo assemblyInfo = new AssemblyInfo(typeof(DataSubscriber).Assembly);
connectionString.AppendFormat("framesPerSecond={0}; ", framesPerSecond);
connectionString.AppendFormat("lagTime={0}; ", lagTime);
connectionString.AppendFormat("leadTime={0}; ", leadTime);
connectionString.AppendFormat("inputMeasurementKeys={{{0}}}; ", filterExpression.ToNonNullString());
connectionString.AppendFormat("dataChannel={{{0}}}; ", dataChannel.ToNonNullString());
connectionString.AppendFormat("includeTime=false; ");
connectionString.AppendFormat("useLocalClockAsRealTime={0}; ", useLocalClockAsRealTime);
connectionString.AppendFormat("ignoreBadTimestamps={0}; ", ignoreBadTimestamps);
connectionString.AppendFormat("allowSortsByArrival={0}; ", allowSortsByArrival);
connectionString.AppendFormat("timeResolution={0}; ", (long)timeResolution);
connectionString.AppendFormat("allowPreemptivePublishing={0}; ", allowPreemptivePublishing);
connectionString.AppendFormat("downsamplingMethod={0}; ", downsamplingMethod.ToString());
connectionString.AppendFormat("startTimeConstraint={0}; ", startTime.ToNonNullString());
connectionString.AppendFormat("stopTimeConstraint={0}; ", stopTime.ToNonNullString());
connectionString.AppendFormat("timeConstraintParameters={0}; ", constraintParameters.ToNonNullString());
connectionString.AppendFormat("processingInterval={0}; ", processingInterval);
connectionString.AppendFormat("assemblyInfo={{source={0}; version={1}.{2}.{3}; buildDate={4:yyyy-MM-dd HH:mm:ss}}}", assemblyInfo.Name, assemblyInfo.Version.Major, assemblyInfo.Version.Minor, assemblyInfo.Version.Build, assemblyInfo.BuildDate);
if (!string.IsNullOrWhiteSpace(waitHandleNames))
{
connectionString.AppendFormat("; waitHandleNames={0}", waitHandleNames);
connectionString.AppendFormat("; waitHandleTimeout={0}", waitHandleTimeout);
}
return Subscribe(true, compactFormat, connectionString.ToString());
}
/// <summary>
/// Subscribes (or re-subscribes) to a data publisher for a locally synchronized set of data points.
/// </summary>
/// <param name="compactFormat">Boolean value that determines if the compact measurement format should be used. Set to <c>false</c> for full fidelity measurement serialization; otherwise set to <c>true</c> for bandwidth conservation.</param>
/// <param name="framesPerSecond">The desired number of data frames per second.</param>
/// <param name="lagTime">Allowed past time deviation tolerance, in seconds (can be sub-second).</param>
/// <param name="leadTime">Allowed future time deviation tolerance, in seconds (can be sub-second).</param>
/// <param name="filterExpression">Filtering expression that defines the measurements that are being subscribed.</param>
/// <param name="dataChannel">Desired UDP return data channel connection string to use for data packet transmission. Set to <c>null</c> to use TCP channel for data transmission.</param>
/// <param name="useLocalClockAsRealTime">Boolean value that determines whether or not to use the local clock time as real-time.</param>
/// <param name="ignoreBadTimestamps">Boolean value that determines if bad timestamps (as determined by measurement's timestamp quality) should be ignored when sorting measurements.</param>
/// <param name="allowSortsByArrival"> Gets or sets flag that determines whether or not to allow incoming measurements with bad timestamps to be sorted by arrival time.</param>
/// <param name="timeResolution">Gets or sets the maximum time resolution, in ticks, to use when sorting measurements by timestamps into their proper destination frame.</param>
/// <param name="allowPreemptivePublishing">Gets or sets flag that allows system to preemptively publish frames assuming all expected measurements have arrived.</param>
/// <param name="downsamplingMethod">Gets the total number of down-sampled measurements processed by the concentrator.</param>
/// <param name="startTime">Defines a relative or exact start time for the temporal constraint to use for historical playback.</param>
/// <param name="stopTime">Defines a relative or exact stop time for the temporal constraint to use for historical playback.</param>
/// <param name="constraintParameters">Defines any temporal parameters related to the constraint to use for historical playback.</param>
/// <param name="processingInterval">Defines the desired processing interval milliseconds, i.e., historical play back speed, to use when temporal constraints are defined.</param>
/// <param name="waitHandleNames">Comma separated list of wait handle names used to establish external event wait handles needed for inter-adapter synchronization.</param>
/// <param name="waitHandleTimeout">Maximum wait time for external events, in milliseconds, before proceeding.</param>
/// <returns><c>true</c> if subscribe transmission was successful; otherwise <c>false</c>.</returns>
/// <remarks>
/// <para>
/// When the <paramref name="startTime"/> or <paramref name="stopTime"/> temporal processing constraints are defined (i.e., not <c>null</c>), this
/// specifies the start and stop time over which the subscriber session will process data. Passing in <c>null</c> for the <paramref name="startTime"/>
/// and <paramref name="stopTime"/> specifies the subscriber session will process data in standard, i.e., real-time, operation.
/// </para>
/// <para>
/// With the exception of the values of -1 and 0, the <paramref name="processingInterval"/> value specifies the desired historical playback data
/// processing interval in milliseconds. This is basically a delay, or timer interval, over which to process data. Setting this value to -1 means
/// to use the default processing interval while setting the value to 0 means to process data as fast as possible.
/// </para>
/// <para>
/// The <paramref name="startTime"/> and <paramref name="stopTime"/> parameters can be specified in one of the
/// following formats:
/// <list type="table">
/// <listheader>
/// <term>Time Format</term>
/// <description>Format Description</description>
/// </listheader>
/// <item>
/// <term>12-30-2000 23:59:59.033</term>
/// <description>Absolute date and time.</description>
/// </item>
/// <item>
/// <term>*</term>
/// <description>Evaluates to <see cref="DateTime.UtcNow"/>.</description>
/// </item>
/// <item>
/// <term>*-20s</term>
/// <description>Evaluates to 20 seconds before <see cref="DateTime.UtcNow"/>.</description>
/// </item>
/// <item>
/// <term>*-10m</term>
/// <description>Evaluates to 10 minutes before <see cref="DateTime.UtcNow"/>.</description>
/// </item>
/// <item>
/// <term>*-1h</term>
/// <description>Evaluates to 1 hour before <see cref="DateTime.UtcNow"/>.</description>
/// </item>
/// <item>
/// <term>*-1d</term>
/// <description>Evaluates to 1 day before <see cref="DateTime.UtcNow"/>.</description>
/// </item>
/// </list>
/// </para>
/// </remarks>
[Obsolete("Preferred method uses SubscriptionInfo object to subscribe.", false)]
public virtual bool LocallySynchronizedSubscribe(bool compactFormat, int framesPerSecond, double lagTime, double leadTime, string filterExpression, string dataChannel = null, bool useLocalClockAsRealTime = false, bool ignoreBadTimestamps = false, bool allowSortsByArrival = true, long timeResolution = Ticks.PerMillisecond, bool allowPreemptivePublishing = true, DownsamplingMethod downsamplingMethod = DownsamplingMethod.LastReceived, string startTime = null, string stopTime = null, string constraintParameters = null, int processingInterval = -1, string waitHandleNames = null, int waitHandleTimeout = 0)
{
// Dispose of any previously established local concentrator
DisposeLocalConcentrator();
// Establish a local concentrator to synchronize received measurements
m_localConcentrator = new LocalConcentrator(this);
m_localConcentrator.ProcessException += m_localConcentrator_ProcessException;
m_localConcentrator.FramesPerSecond = framesPerSecond;
m_localConcentrator.LagTime = lagTime;
m_localConcentrator.LeadTime = leadTime;
m_localConcentrator.UseLocalClockAsRealTime = useLocalClockAsRealTime;
m_localConcentrator.IgnoreBadTimestamps = ignoreBadTimestamps;
m_localConcentrator.AllowSortsByArrival = allowSortsByArrival;
m_localConcentrator.TimeResolution = timeResolution;
m_localConcentrator.AllowPreemptivePublishing = allowPreemptivePublishing;
m_localConcentrator.DownsamplingMethod = downsamplingMethod;
m_localConcentrator.UsePrecisionTimer = false;
// Parse time constraints, if defined
DateTime startTimeConstraint = !string.IsNullOrWhiteSpace(startTime) ? ParseTimeTag(startTime) : DateTime.MinValue;
DateTime stopTimeConstraint = !string.IsNullOrWhiteSpace(stopTime) ? ParseTimeTag(stopTime) : DateTime.MaxValue;
// When processing historical data, timestamps should not be evaluated for reasonability
if (startTimeConstraint != DateTime.MinValue || stopTimeConstraint != DateTime.MaxValue)
{
m_localConcentrator.PerformTimestampReasonabilityCheck = false;
m_localConcentrator.LeadTime = double.MaxValue;
}
// Assign alternate processing interval, if defined
if (processingInterval != -1)
m_localConcentrator.ProcessingInterval = processingInterval;
// Initiate unsynchronized subscribe
StringBuilder connectionString = new StringBuilder();
AssemblyInfo assemblyInfo = new AssemblyInfo(typeof(DataSubscriber).Assembly);
connectionString.AppendFormat("trackLatestMeasurements={0}; ", false);
connectionString.AppendFormat("inputMeasurementKeys={{{0}}}; ", filterExpression.ToNonNullString());
connectionString.AppendFormat("dataChannel={{{0}}}; ", dataChannel.ToNonNullString());
connectionString.AppendFormat("includeTime={0}; ", true);
connectionString.AppendFormat("lagTime={0}; ", 10.0D);
connectionString.AppendFormat("leadTime={0}; ", 5.0D);
connectionString.AppendFormat("useLocalClockAsRealTime={0}; ", false);
connectionString.AppendFormat("startTimeConstraint={0}; ", startTime.ToNonNullString());
connectionString.AppendFormat("stopTimeConstraint={0}; ", stopTime.ToNonNullString());
connectionString.AppendFormat("timeConstraintParameters={0}; ", constraintParameters.ToNonNullString());
connectionString.AppendFormat("processingInterval={0}; ", processingInterval);
connectionString.AppendFormat("useMillisecondResolution={0}; ", m_useMillisecondResolution);
connectionString.AppendFormat("assemblyInfo={{source={0}; version={1}.{2}.{3}; buildDate={4:yyyy-MM-dd HH:mm:ss}}}", assemblyInfo.Name, assemblyInfo.Version.Major, assemblyInfo.Version.Minor, assemblyInfo.Version.Build, assemblyInfo.BuildDate);
if (!string.IsNullOrWhiteSpace(waitHandleNames))
{
connectionString.AppendFormat("; waitHandleNames={0}", waitHandleNames);
connectionString.AppendFormat("; waitHandleTimeout={0}", waitHandleTimeout);
}
// Start subscription process
if (Subscribe(false, compactFormat, connectionString.ToString()))
{
// If subscription succeeds, start local concentrator
m_localConcentrator.Start();
return true;
}
return false;
}
/// <summary>
/// Subscribes (or re-subscribes) to a data publisher for an unsynchronized set of data points.
/// </summary>
/// <param name="compactFormat">Boolean value that determines if the compact measurement format should be used. Set to <c>false</c> for full fidelity measurement serialization; otherwise set to <c>true</c> for bandwidth conservation.</param>
/// <param name="throttled">Boolean value that determines if data should be throttled at a set transmission interval or sent on change.</param>
/// <param name="filterExpression">Filtering expression that defines the measurements that are being subscribed.</param>
/// <param name="dataChannel">Desired UDP return data channel connection string to use for data packet transmission. Set to <c>null</c> to use TCP channel for data transmission.</param>
/// <param name="includeTime">Boolean value that determines if time is a necessary component in streaming data.</param>
/// <param name="lagTime">When <paramref name="throttled"/> is <c>true</c>, defines the data transmission speed in seconds (can be sub-second).</param>
/// <param name="leadTime">When <paramref name="throttled"/> is <c>true</c>, defines the allowed time deviation tolerance to real-time in seconds (can be sub-second).</param>
/// <param name="useLocalClockAsRealTime">When <paramref name="throttled"/> is <c>true</c>, defines boolean value that determines whether or not to use the local clock time as real-time. Set to <c>false</c> to use latest received measurement timestamp as real-time.</param>
/// <param name="startTime">Defines a relative or exact start time for the temporal constraint to use for historical playback.</param>
/// <param name="stopTime">Defines a relative or exact stop time for the temporal constraint to use for historical playback.</param>
/// <param name="constraintParameters">Defines any temporal parameters related to the constraint to use for historical playback.</param>
/// <param name="processingInterval">Defines the desired processing interval milliseconds, i.e., historical play back speed, to use when temporal constraints are defined.</param>
/// <param name="waitHandleNames">Comma separated list of wait handle names used to establish external event wait handles needed for inter-adapter synchronization.</param>
/// <param name="waitHandleTimeout">Maximum wait time for external events, in milliseconds, before proceeding.</param>
/// <returns><c>true</c> if subscribe transmission was successful; otherwise <c>false</c>.</returns>
/// <remarks>
/// <para>
/// When the <paramref name="startTime"/> or <paramref name="stopTime"/> temporal processing constraints are defined (i.e., not <c>null</c>), this
/// specifies the start and stop time over which the subscriber session will process data. Passing in <c>null</c> for the <paramref name="startTime"/>
/// and <paramref name="stopTime"/> specifies the subscriber session will process data in standard, i.e., real-time, operation.
/// </para>
/// <para>
/// With the exception of the values of -1 and 0, the <paramref name="processingInterval"/> value specifies the desired historical playback data
/// processing interval in milliseconds. This is basically a delay, or timer interval, over which to process data. Setting this value to -1 means
/// to use the default processing interval while setting the value to 0 means to process data as fast as possible.
/// </para>
/// <para>
/// The <paramref name="startTime"/> and <paramref name="stopTime"/> parameters can be specified in one of the
/// following formats:
/// <list type="table">
/// <listheader>
/// <term>Time Format</term>
/// <description>Format Description</description>
/// </listheader>
/// <item>
/// <term>12-30-2000 23:59:59.033</term>
/// <description>Absolute date and time.</description>
/// </item>
/// <item>
/// <term>*</term>
/// <description>Evaluates to <see cref="DateTime.UtcNow"/>.</description>
/// </item>
/// <item>
/// <term>*-20s</term>
/// <description>Evaluates to 20 seconds before <see cref="DateTime.UtcNow"/>.</description>
/// </item>
/// <item>
/// <term>*-10m</term>
/// <description>Evaluates to 10 minutes before <see cref="DateTime.UtcNow"/>.</description>
/// </item>
/// <item>
/// <term>*-1h</term>
/// <description>Evaluates to 1 hour before <see cref="DateTime.UtcNow"/>.</description>
/// </item>
/// <item>
/// <term>*-1d</term>
/// <description>Evaluates to 1 day before <see cref="DateTime.UtcNow"/>.</description>
/// </item>
/// </list>
/// </para>
/// </remarks>
[Obsolete("Preferred method uses SubscriptionInfo object to subscribe.", false)]
public virtual bool UnsynchronizedSubscribe(bool compactFormat, bool throttled, string filterExpression, string dataChannel = null, bool includeTime = true, double lagTime = 10.0D, double leadTime = 5.0D, bool useLocalClockAsRealTime = false, string startTime = null, string stopTime = null, string constraintParameters = null, int processingInterval = -1, string waitHandleNames = null, int waitHandleTimeout = 0)
{
// Dispose of any previously established local concentrator
DisposeLocalConcentrator();
StringBuilder connectionString = new StringBuilder();
AssemblyInfo assemblyInfo = new AssemblyInfo(typeof(DataSubscriber).Assembly);
connectionString.AppendFormat("trackLatestMeasurements={0}; ", throttled);
connectionString.AppendFormat("inputMeasurementKeys={{{0}}}; ", filterExpression.ToNonNullString());
connectionString.AppendFormat("dataChannel={{{0}}}; ", dataChannel.ToNonNullString());
connectionString.AppendFormat("includeTime={0}; ", includeTime);
connectionString.AppendFormat("lagTime={0}; ", lagTime);
connectionString.AppendFormat("leadTime={0}; ", leadTime);
connectionString.AppendFormat("useLocalClockAsRealTime={0}; ", useLocalClockAsRealTime);
connectionString.AppendFormat("startTimeConstraint={0}; ", startTime.ToNonNullString());
connectionString.AppendFormat("stopTimeConstraint={0}; ", stopTime.ToNonNullString());
connectionString.AppendFormat("timeConstraintParameters={0}; ", constraintParameters.ToNonNullString());
connectionString.AppendFormat("processingInterval={0}; ", processingInterval);
connectionString.AppendFormat("useMillisecondResolution={0}; ", m_useMillisecondResolution);
connectionString.AppendFormat("requestNaNValueFilter={0}; ", m_requestNaNValueFilter);
connectionString.AppendFormat("assemblyInfo={{source={0}; version={1}.{2}.{3}; buildDate={4:yyyy-MM-dd HH:mm:ss}}}", assemblyInfo.Name, assemblyInfo.Version.Major, assemblyInfo.Version.Minor, assemblyInfo.Version.Build, assemblyInfo.BuildDate);
if (!string.IsNullOrWhiteSpace(waitHandleNames))
{
connectionString.AppendFormat("; waitHandleNames={0}", waitHandleNames);
connectionString.AppendFormat("; waitHandleTimeout={0}", waitHandleTimeout);
}
// Make sure not to monitor for data loss any faster than down-sample time on throttled connections - additionally
// you will want to make sure data stream monitor is twice lag-time to allow time for initial points to arrive.
if (throttled && (object)m_dataStreamMonitor != null && m_dataStreamMonitor.Interval / 1000.0D < lagTime)
m_dataStreamMonitor.Interval = (int)(2.0D * lagTime * 1000.0D);
return Subscribe(false, compactFormat, connectionString.ToString());
}
/// <summary>
/// Subscribes (or re-subscribes) to a data publisher for a set of data points.
/// </summary>
/// <param name="remotelySynchronized">Boolean value that determines if subscription should be remotely synchronized - note that data publisher may not allow remote synchronization.</param>
/// <param name="compactFormat">Boolean value that determines if the compact measurement format should be used. Set to <c>false</c> for full fidelity measurement serialization; otherwise set to <c>true</c> for bandwidth conservation.</param>
/// <param name="connectionString">Connection string that defines required and optional parameters for the subscription.</param>
/// <returns><c>true</c> if subscribe transmission was successful; otherwise <c>false</c>.</returns>
public virtual bool Subscribe(bool remotelySynchronized, bool compactFormat, string connectionString)
{
bool success = false;
if (!string.IsNullOrWhiteSpace(connectionString))
{
try
{
// Parse connection string to see if it contains a data channel definition
Dictionary<string, string> settings = connectionString.ParseKeyValuePairs();
UdpClient dataChannel = null;
// Track specified time inclusion for later deserialization
if (settings.TryGetValue("includeTime", out string setting))
m_includeTime = setting.ParseBoolean();
else
m_includeTime = true;
settings.TryGetValue("dataChannel", out setting);
if (!string.IsNullOrWhiteSpace(setting))
{
if ((CompressionModes & CompressionModes.TSSC) > 0)
{
// TSSC is a stateful compression algorithm which will not reliably support UDP
OnStatusMessage(MessageLevel.Warning, "Cannot use TSSC compression mode with UDP - special compression mode disabled");
// Disable TSSC compression processing
CompressionModes &= ~CompressionModes.TSSC;
}
dataChannel = new UdpClient(setting);
dataChannel.ReceiveBufferSize = ushort.MaxValue;
dataChannel.MaxConnectionAttempts = -1;
dataChannel.ConnectAsync();
}
// Assign data channel client reference and attach to needed events
DataChannel = dataChannel;
// Setup subscription packet
using (BlockAllocatedMemoryStream buffer = new BlockAllocatedMemoryStream())
{
DataPacketFlags flags = DataPacketFlags.NoFlags;
byte[] bytes;
if (remotelySynchronized)
flags |= DataPacketFlags.Synchronized;
if (compactFormat)
flags |= DataPacketFlags.Compact;
// Write data packet flags into buffer
buffer.WriteByte((byte)flags);
// Get encoded bytes of connection string
bytes = m_encoding.GetBytes(connectionString);
// Write encoded connection string length into buffer
buffer.Write(BigEndian.GetBytes(bytes.Length), 0, 4);
// Encode connection string into buffer
buffer.Write(bytes, 0, bytes.Length);
// Cache subscribed synchronization state
m_synchronizedSubscription = remotelySynchronized;
// Send subscribe server command with associated command buffer
success = SendServerCommand(ServerCommand.Subscribe, buffer.ToArray());
}
}
catch (Exception ex)
{
OnProcessException(MessageLevel.Error, new InvalidOperationException("Exception occurred while trying to make publisher subscription: " + ex.Message, ex));
}
}
else
{
OnProcessException(MessageLevel.Error, new InvalidOperationException("Cannot make publisher subscription without a connection string."));
}
// Reset decompressor on successful resubscription
if (success)
{
m_tsscResetRequested = true;
}
return success;
}
/// <summary>
/// Unsubscribes from a data publisher.
/// </summary>
/// <returns><c>true</c> if unsubscribe transmission was successful; otherwise <c>false</c>.</returns>
public virtual bool Unsubscribe()
{
// Send unsubscribe server command
return SendServerCommand(ServerCommand.Unsubscribe);
}
/// <summary>
/// Returns the measurements signal IDs that were authorized after the last successful subscription request.
/// </summary>
[AdapterCommand("Gets authorized signal IDs from last subscription request.", "Administrator", "Editor", "Viewer")]
public virtual Guid[] GetAuthorizedSignalIDs()
{
if ((object)m_signalIndexCache != null)
return m_signalIndexCache.AuthorizedSignalIDs;
return new Guid[0];
}
/// <summary>
/// Returns the measurements signal IDs that were unauthorized after the last successful subscription request.
/// </summary>
[AdapterCommand("Gets unauthorized signal IDs from last subscription request.", "Administrator", "Editor", "Viewer")]
public virtual Guid[] GetUnauthorizedSignalIDs()
{
if ((object)m_signalIndexCache != null)
return m_signalIndexCache.UnauthorizedSignalIDs;
return new Guid[0];
}
/// <summary>
/// Resets the counters for the lifetime statistics without interrupting the adapter's operations.
/// </summary>
[AdapterCommand("Resets the counters for the lifetime statistics without interrupting the adapter's operations.", "Administrator", "Editor")]
public virtual void ResetLifetimeCounters()
{
m_lifetimeMeasurements = 0L;
m_totalBytesReceived = 0L;
m_lifetimeTotalLatency = 0L;
m_lifetimeMinimumLatency = 0L;
m_lifetimeMaximumLatency = 0L;
m_lifetimeLatencyMeasurements = 0L;
}
/// <summary>
/// Initiate a meta-data refresh.
/// </summary>
[AdapterCommand("Initiates a meta-data refresh.", "Administrator", "Editor")]
public virtual void RefreshMetadata()
{
SendServerCommand(ServerCommand.MetaDataRefresh, m_metadataFilters);
}
/// <summary>
/// Log a data gap for data gap recovery.
/// </summary>
/// <param name="timeString">The string representing the data gap.</param>
[AdapterCommand("Logs a data gap for data gap recovery.", "Administrator", "Editor")]
public virtual void LogDataGap(string timeString)
{
DateTimeOffset end = default(DateTimeOffset);
string[] split = timeString.Split(';');
if (!m_dataGapRecoveryEnabled)
throw new InvalidOperationException("Data gap recovery is not enabled.");
if (split.Length != 2)
throw new FormatException("Invalid format for time string - ex: 2014-03-27 02:10:47.566;2014-03-27 02:10:59.733");
string startTime = split[0];
string endTime = split[1];
bool parserSuccessful =
DateTimeOffset.TryParse(startTime, CultureInfo.CurrentCulture, DateTimeStyles.AssumeUniversal | DateTimeStyles.AllowInnerWhite, out DateTimeOffset start) &&
DateTimeOffset.TryParse(endTime, CultureInfo.CurrentCulture, DateTimeStyles.AssumeUniversal | DateTimeStyles.AllowInnerWhite, out end);
if (!parserSuccessful)
throw new FormatException("Invalid format for time string - ex: 2014-03-27 02:10:47.566;2014-03-27 02:10:59.733");
m_dataGapRecoverer?.LogDataGap(start, end, true);
}
/// <summary>
/// Remove a data gap from data gap recovery.
/// </summary>
/// <param name="timeString">The string representing the data gap.</param>
[AdapterCommand("Removes a data gap from data gap recovery.", "Administrator", "Editor")]
public virtual string RemoveDataGap(string timeString)
{
DateTimeOffset end = default(DateTimeOffset);
string[] split = timeString.Split(';');
if (!m_dataGapRecoveryEnabled)
throw new InvalidOperationException("Data gap recovery is not enabled.");
if (split.Length != 2)
throw new FormatException("Invalid format for time string - ex: 2014-03-27 02:10:47.566;2014-03-27 02:10:59.733");
string startTime = split[0];
string endTime = split[1];
bool parserSuccessful =
DateTimeOffset.TryParse(startTime, CultureInfo.CurrentCulture, DateTimeStyles.AssumeUniversal | DateTimeStyles.AllowInnerWhite, out DateTimeOffset start) &&
DateTimeOffset.TryParse(endTime, CultureInfo.CurrentCulture, DateTimeStyles.AssumeUniversal | DateTimeStyles.AllowInnerWhite, out end);
if (!parserSuccessful)
throw new FormatException("Invalid format for time string - ex: 2014-03-27 02:10:47.566;2014-03-27 02:10:59.733");
if (m_dataGapRecoverer?.RemoveDataGap(start, end) ?? false)
return "Data gap successfully removed.";
else
return "Data gap not found.";
}
/// <summary>
/// Displays the contents of the outage log.
/// </summary>
/// <returns>The contents of the outage log.</returns>
[AdapterCommand("Displays data gaps queued for data gap recovery.", "Administrator", "Editor", "Viewer")]
public virtual string DumpOutageLog()
{
if (m_dataGapRecoveryEnabled && (object)m_dataGapRecoverer != null)
return Environment.NewLine + m_dataGapRecoverer.DumpOutageLog();
throw new InvalidOperationException("Data gap recovery not enabled");
}
/// <summary>
/// Gets the status of the temporal <see cref="DataSubscriber"/> used by the data gap recovery module.
/// </summary>
/// <returns>Status of the temporal <see cref="DataSubscriber"/> used by the data gap recovery module.</returns>
[AdapterCommand("Gets the status of the temporal subscription used by the data gap recovery module.", "Administrator", "Editor", "Viewer")]
public virtual string GetDataGapRecoverySubscriptionStatus()
{
if (m_dataGapRecoveryEnabled && (object)m_dataGapRecoverer != null)
return m_dataGapRecoverer.TemporalSubscriptionStatus;
return "Data gap recovery not enabled";
}
/// <summary>
/// Spawn meta-data synchronization.
/// </summary>
/// <param name="metadata"><see cref="DataSet"/> to use for synchronization.</param>
/// <remarks>
/// This method makes sure only one meta-data synchronization happens at a time.
/// </remarks>
public void SynchronizeMetadata(DataSet metadata)
{
try
{
m_receivedMetadata = metadata;
m_synchronizeMetadataOperation.RunOnceAsync();
}
catch (Exception ex)
{
// Process exception for logging
OnProcessException(MessageLevel.Warning, new InvalidOperationException("Failed to queue meta-data synchronization: " + ex.Message, ex));
}
}
/// <summary>
/// Sends a server command to the publisher connection with associated <paramref name="message"/> data.
/// </summary>
/// <param name="commandCode"><see cref="ServerCommand"/> to send.</param>
/// <param name="message">String based command data to send to server.</param>
/// <returns><c>true</c> if <paramref name="commandCode"/> transmission was successful; otherwise <c>false</c>.</returns>
public virtual bool SendServerCommand(ServerCommand commandCode, string message)
{
if (!string.IsNullOrWhiteSpace(message))
{
using (BlockAllocatedMemoryStream buffer = new BlockAllocatedMemoryStream())
{
byte[] bytes = m_encoding.GetBytes(message);
buffer.Write(BigEndian.GetBytes(bytes.Length), 0, 4);
buffer.Write(bytes, 0, bytes.Length);
return SendServerCommand(commandCode, buffer.ToArray());
}
}
return SendServerCommand(commandCode);
}
/// <summary>
/// Sends a server command to the publisher connection.
/// </summary>
/// <param name="commandCode"><see cref="ServerCommand"/> to send.</param>
/// <param name="data">Optional command data to send.</param>
/// <returns><c>true</c> if <paramref name="commandCode"/> transmission was successful; otherwise <c>false</c>.</returns>
public virtual bool SendServerCommand(ServerCommand commandCode, byte[] data = null)
{
if ((object)m_commandChannel != null && m_commandChannel.CurrentState == ClientState.Connected)
{
try
{
using (BlockAllocatedMemoryStream commandPacket = new BlockAllocatedMemoryStream())
{
// Write command code into command packet
commandPacket.WriteByte((byte)commandCode);
// Write command buffer into command packet
if ((object)data != null && data.Length > 0)
commandPacket.Write(data, 0, data.Length);
// Send command packet to publisher
m_commandChannel.SendAsync(commandPacket.ToArray(), 0, (int)commandPacket.Length);
m_metadataRefreshPending = commandCode == ServerCommand.MetaDataRefresh;
}
return true;
}
catch (Exception ex)
{
OnProcessException(MessageLevel.Error, new InvalidOperationException($"Exception occurred while trying to send server command \"{commandCode}\" to publisher: {ex.Message}", ex));
}
}
else
OnProcessException(MessageLevel.Error, new InvalidOperationException($"Subscriber is currently unconnected. Cannot send server command \"{commandCode}\" to publisher."));
return false;
}
/// <summary>
/// Attempts to connect to this <see cref="DataSubscriber"/>.
/// </summary>
protected override void AttemptConnection()
{
if (!this.TemporalConstraintIsDefined() && !m_supportsRealTimeProcessing)
return;
long now = m_useLocalClockAsRealTime ? DateTime.UtcNow.Ticks : 0L;
List<DeviceStatisticsHelper<SubscribedDevice>> statisticsHelpers = m_statisticsHelpers;
m_registerStatisticsOperation.RunOnceAsync();
m_expectedBufferBlockSequenceNumber = 0u;
m_commandChannelConnectionAttempts = 0;
m_dataChannelConnectionAttempts = 0;
m_authenticated = m_securityMode == SecurityMode.TLS;
m_subscribed = false;
m_keyIVs = null;
m_totalBytesReceived = 0L;
m_monitoredBytesReceived = 0L;
m_lastBytesReceived = 0;
if (!PersistConnectionForMetadata)
m_commandChannel.ConnectAsync();
else
OnConnected();
if (PersistConnectionForMetadata && CommandChannelConnected)
{
// Attempt authentication if required, remaining steps will happen on successful authentication
if (m_securityMode == SecurityMode.Gateway)
Authenticate(m_sharedSecret, m_authenticationID);
else
SubscribeToOutputMeasurements(true);
}
if (m_useLocalClockAsRealTime && (object)m_subscribedDevicesTimer == null)
{
m_subscribedDevicesTimer = Common.TimerScheduler.CreateTimer(1000);
m_subscribedDevicesTimer.Elapsed += SubscribedDevicesTimer_Elapsed;
}
if ((object)statisticsHelpers != null)
{
m_realTime = 0L;
m_lastStatisticsHelperUpdate = 0L;
foreach (DeviceStatisticsHelper<SubscribedDevice> statisticsHelper in statisticsHelpers)
statisticsHelper.Reset(now);
}
if (m_useLocalClockAsRealTime)
m_subscribedDevicesTimer.Start();
}
/// <summary>
/// Attempts to disconnect from this <see cref="DataSubscriber"/>.
/// </summary>
protected override void AttemptDisconnection()
{
// Unregister device statistics
m_registerStatisticsOperation.RunOnceAsync();
// Stop data stream monitor
if ((object)m_dataStreamMonitor != null)
m_dataStreamMonitor.Enabled = false;
// Disconnect command channel
if (!PersistConnectionForMetadata && (object)m_commandChannel != null)
m_commandChannel.Disconnect();
if ((object)m_subscribedDevicesTimer != null)
m_subscribedDevicesTimer.Stop();
m_metadataRefreshPending = false;
}
/// <summary>
/// Gets a short one-line status of this <see cref="DataSubscriber"/>.
/// </summary>
/// <param name="maxLength">Maximum length of the status message.</param>
/// <returns>Text of the status message.</returns>
public override string GetShortStatus(int maxLength)
{
if ((object)m_commandChannel != null && m_commandChannel.CurrentState == ClientState.Connected)
return $"Subscriber is connected and receiving {(m_synchronizedSubscription ? "synchronized" : "unsynchronized")} data points".CenterText(maxLength);
return "Subscriber is not connected.".CenterText(maxLength);
}
/// <summary>
/// Get message from string based response.
/// </summary>
/// <param name="buffer">Response buffer.</param>
/// <param name="startIndex">Start index of response message.</param>
/// <param name="length">Length of response message.</param>
/// <returns>Decoded response string.</returns>
protected string InterpretResponseMessage(byte[] buffer, int startIndex, int length)
{
return m_encoding.GetString(buffer, startIndex, length);
}
// Restarts the subscriber.
private void Restart()
{
try
{
base.Start();
}
catch (Exception ex)
{
OnProcessException(MessageLevel.Warning, ex);
}
}
private void ProcessServerResponse(byte[] buffer, int length)
{
// Currently this work is done on the async socket completion thread, make sure work to be done is timely and if the response processing
// is coming in via the command channel and needs to send a command back to the server, it should be done on a separate thread...
if (buffer != null && length > 0)
{
try
{
Dictionary<Guid, DeviceStatisticsHelper<SubscribedDevice>> subscribedDevicesLookup;
DeviceStatisticsHelper<SubscribedDevice> statisticsHelper;
ServerResponse responseCode = (ServerResponse)buffer[0];
ServerCommand commandCode = (ServerCommand)buffer[1];
int responseLength = BigEndian.ToInt32(buffer, 2);
int responseIndex = DataPublisher.ClientResponseHeaderSize;
byte[][][] keyIVs;
// Disconnect any established UDP data channel upon successful unsubscribe
if (commandCode == ServerCommand.Unsubscribe && responseCode == ServerResponse.Succeeded)
DataChannel = null;
if (!IsUserCommand(commandCode))
OnReceivedServerResponse(responseCode, commandCode);
else
OnReceivedUserCommandResponse(commandCode, responseCode, buffer, responseIndex, length);
switch (responseCode)
{
case ServerResponse.Succeeded:
switch (commandCode)
{
case ServerCommand.Authenticate:
OnStatusMessage(MessageLevel.Info, $"Su