Skip to content

Commit

Permalink
Adding AgentProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
PatrickRitchie committed Nov 7, 2023
1 parent 1fbd4a9 commit af634c5
Show file tree
Hide file tree
Showing 15 changed files with 433 additions and 70 deletions.
6 changes: 1 addition & 5 deletions applications/Agents/MTConnect-Agent/MTConnect-Agent.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,8 @@
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\modules\MTConnect.NET-HttpAdapter-Module\MTConnect.NET-HttpAdapter-Module.csproj" />
<ProjectReference Include="..\..\..\modules\MTConnect.NET-HttpServer-Module\MTConnect.NET-HttpServer-Module.csproj" />
<ProjectReference Include="..\..\..\modules\MTConnect.NET-MqttBroker-Module\MTConnect.NET-MqttBroker-Module.csproj" />
<ProjectReference Include="..\..\..\modules\MTConnect.NET-MqttRelay-Module\MTConnect.NET-MqttRelay-Module.csproj" />
<ProjectReference Include="..\..\..\modules\MTConnect.NET-ShdrAdapter-Module\MTConnect.NET-ShdrAdapter-Module.csproj" />
<ProjectReference Include="..\..\..\src\MTConnect.NET-Applications-Agents\MTConnect.NET-Applications-Agents.csproj" />
<ProjectReference Include="..\..\..\src\MTConnect.NET-Python-Processor\MTConnect.NET-Python-Processor.csproj" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\modules\MTConnect.NET-HttpAdapter-Module\MTConnect.NET-HttpAdapter-Module.csproj" />
<ProjectReference Include="..\..\modules\MTConnect.NET-HttpServer-Module\MTConnect.NET-HttpServer-Module.csproj" />
<ProjectReference Include="..\..\modules\MTConnect.NET-MqttBroker-Module\MTConnect.NET-MqttBroker-Module.csproj" />
<ProjectReference Include="..\..\modules\MTConnect.NET-MqttRelay-Module\MTConnect.NET-MqttRelay-Module.csproj" />
<ProjectReference Include="..\..\modules\MTConnect.NET-ShdrAdapter-Module\MTConnect.NET-ShdrAdapter-Module.csproj" />
<ProjectReference Include="..\MTConnect.NET-HTTP\MTConnect.NET-HTTP.csproj" />
<ProjectReference Include="..\MTConnect.NET-JSON-cppagent\MTConnect.NET-JSON-cppagent.csproj" />
<ProjectReference Include="..\MTConnect.NET-JSON\MTConnect.NET-JSON.csproj" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ public class MTConnectAgentApplication : IMTConnectAgentApplication
private IMTConnectObservationBuffer _observationBuffer;
private MTConnectAssetFileBuffer _assetBuffer;
private MTConnectAgentModules _modules;
//private MTConnectControllers _controllers;
//private MTConnectDataSources _dataSources;
private MTConnectAgentProcessors _processors;
protected IAgentConfigurationFileWatcher _agentConfigurationWatcher;
private System.Timers.Timer _metricsTimer;
private bool _started = false;
Expand Down Expand Up @@ -415,6 +414,11 @@ public void StartAgent(IAgentApplicationConfiguration configuration, bool verbos
FileIndex.ToFile(FileIndex.DataItemsFileName, FileIndex.Create(_mtconnectAgent.DataItemIndexes));
}

// Initilialize Processors
_processors = new MTConnectAgentProcessors();
_processors.Load();
_mtconnectAgent.ProcessObservationFunction = _processors.Process;

// Start Agent
_mtconnectAgent.Start();

Expand Down
2 changes: 2 additions & 0 deletions src/MTConnect.NET-Common/Agents/IMTConnectAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ public interface IMTConnectAgent

IEnumerable<IObservationOutput> GetCurrentObservations(string deviceKey, Version mtconnectVersion = null);

IEnumerable<IObservationOutput> GetCurrentObservations(string deviceKey, string dataItemKey, Version mtconnectVersion = null);


IEnumerable<IAsset> GetAssets(Version mtconnectVersion = null);

Expand Down
15 changes: 15 additions & 0 deletions src/MTConnect.NET-Common/Agents/IMTConnectAgentProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) 2023 TrakHound Inc., All Rights Reserved.
// TrakHound Inc. licenses this file to you under the MIT license.

using MTConnect.Assets;
using MTConnect.Observations.Input;

namespace MTConnect.Agents
{
public interface IMTConnectAgentProcessor
{
IObservationInput Process(ProcessObservation observation);

IAsset Process(IAsset asset);
}
}
77 changes: 58 additions & 19 deletions src/MTConnect.NET-Common/Agents/MTConnectAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ public string Sender
/// </summary>
public DateTime DeviceModelChangeTime => _deviceModelChangeTime.ToDateTime();

public Func<ProcessObservation, IObservationInput> ProcessObservationFunction { get; set; }

public Func<IAsset, IAsset> ProcessAssetFunction { get; set; }

#endregion

#region "Events"
Expand Down Expand Up @@ -534,6 +538,46 @@ public IEnumerable<IObservationOutput> GetCurrentObservations(string deviceKey,
return observations;
}

public IEnumerable<IObservationOutput> GetCurrentObservations(string deviceKey, string dataItemKey, Version mtconnectVersion = null)
{
var observations = new List<IObservationOutput>();

var version = mtconnectVersion != null ? mtconnectVersion : MTConnectVersion;

var device = GetDevice(deviceKey, version);
if (device != null)
{
var dataItem = GetDataItem(device.Uuid, dataItemKey);
if (dataItem != null)
{
var hash = $"{device.Uuid}:{dataItem.Id}";

if (dataItem.Category == DataItemCategory.CONDITION)
{
if (_currentConditions.TryGetValue(hash, out var observationInputs))
{
if (!observationInputs.IsNullOrEmpty())
{
foreach (var observationInput in observationInputs)
{
observations.Add(CreateObservation(dataItem, observationInput));
}
}
}
}
else
{
if (_currentObservations.TryGetValue(hash, out var observationInput))
{
observations.Add(CreateObservation(dataItem, observationInput));
}
}
}
}

return observations;
}

private IObservationOutput CreateObservation(IDataItem dataItem, IObservationInput observationInput)
{
var observation = new ObservationOutput();
Expand Down Expand Up @@ -1517,7 +1561,7 @@ public bool AddObservation(string deviceKey, IObservationInput observationInput,
{
ObservationReceived?.Invoke(this, observationInput);

var input = new ObservationInput();
IObservationInput input = new ObservationInput();
input.DeviceKey = deviceKey;
input.DataItemKey = observationInput.DataItemKey;
input.IsUnavailable = observationInput.IsUnavailable;
Expand Down Expand Up @@ -1551,6 +1595,12 @@ public bool AddObservation(string deviceKey, IObservationInput observationInput,
case DataItemRepresentation.TIME_SERIES: if (input.IsUnavailable) input.AddValue(ValueKeys.SampleCount, 0); break;
}

// Process Observation using Processers
if (ProcessObservationFunction != null)
{
input = ProcessObservationFunction(new ProcessObservation(this, dataItem, input));
}

var success = false;
var validationResult = new ValidationResult(true);

Expand All @@ -1566,7 +1616,8 @@ public bool AddObservation(string deviceKey, IObservationInput observationInput,
// Convert Units (if needed)
if ((!convertUnits.HasValue && _configuration.ConvertUnits) || (convertUnits.HasValue && convertUnits.Value))
{
observationInput = ConvertObservationValue(dataItem, input);
input = ConvertObservationValue(dataItem, input);
if (input == null) return false;
}

bool update;
Expand All @@ -1585,7 +1636,7 @@ public bool AddObservation(string deviceKey, IObservationInput observationInput,
if (update)
{
// Call Update to Observation Buffer - HERE
success = OnAddObservation(deviceUuid, dataItem, observationInput);
success = OnAddObservation(deviceUuid, dataItem, input);
if (success)
{
if (_metrics != null)
Expand All @@ -1601,36 +1652,24 @@ public bool AddObservation(string deviceKey, IObservationInput observationInput,
observation.DeviceUuid = deviceUuid;
observation.DataItem = dataItem;
observation.InstanceId = _instanceId;
observation.Timestamp = observationInput.Timestamp.ToDateTime();
observation.AddValues(observationInput.Values);
observation.Timestamp = input.Timestamp.ToDateTime();
observation.AddValues(input.Values);
OnObservationAdded(observation);

//if (ObservationAdded != null)
//{
// var observation = Observation.Create(dataItem);
// observation.DeviceUuid = deviceUuid;
// observation.DataItem = dataItem;
// observation.InstanceId = _instanceId;
// observation.Timestamp = observationInput.Timestamp.ToDateTime();
// observation.AddValues(observationInput.Values);

// ObservationAdded?.Invoke(this, observation);
//}
}
}
else success = true; // Return true if no update needed
}

if (!validationResult.IsValid && InvalidObservationAdded != null)
{
InvalidObservationAdded.Invoke(deviceUuid, observationInput.DataItemKey, validationResult);
InvalidObservationAdded.Invoke(deviceUuid, input.DataItemKey, validationResult);
}

return success;
}
else if (InvalidObservationAdded != null)
{
InvalidObservationAdded.Invoke(deviceUuid, observationInput.DataItemKey, new ValidationResult(false, $"DataItemKey \"{observationInput.DataItemKey}\" not Found in Device"));
InvalidObservationAdded.Invoke(deviceUuid, input.DataItemKey, new ValidationResult(false, $"DataItemKey \"{input.DataItemKey}\" not Found in Device"));
}
}

Expand Down
120 changes: 120 additions & 0 deletions src/MTConnect.NET-Common/Agents/MTConnectAgentProcessors.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (c) 2023 TrakHound Inc., All Rights Reserved.
// TrakHound Inc. licenses this file to you under the MIT license.

using MTConnect.Assets;
using MTConnect.Observations.Input;
using System;
using System.Collections.Concurrent;

namespace MTConnect.Agents
{
public class MTConnectAgentProcessors
{
private static readonly ConcurrentBag<Type> _processorTypes = new ConcurrentBag<Type>();
private static readonly ConcurrentDictionary<string, IMTConnectAgentProcessor> _processors = new ConcurrentDictionary<string, IMTConnectAgentProcessor>();


public void Load()
{
InitializeProcessors();

var processorTypes = _processorTypes.ToArray();
if (!processorTypes.IsNullOrEmpty())
{
foreach (var processorType in processorTypes)
{
try
{
// Create new Instance of the Controller and add to cached dictionary
var processor = (IMTConnectAgentProcessor)Activator.CreateInstance(processorType, new object[] { });

var processorId = Guid.NewGuid().ToString();

_processors.TryAdd(processorId, processor);
}
catch { }
}
}
}


public IObservationInput Process(ProcessObservation observation)
{
IObservationInput outputObservation = null;

if (observation != null && !_processors.IsNullOrEmpty())
{
foreach (var processor in _processors)
{
outputObservation = processor.Value.Process(observation);
}
}


//outputObservation = new ObservationInput();
//outputObservation.DeviceKey = observation.DataItem.Device.Uuid;
//outputObservation.DataItemKey = observation.DataItem.Id;
//outputObservation.Values = observation.Values;
//outputObservation.Timestamp = observation.Timestamp.ToUnixTime();

return outputObservation;
}

public IAsset Process(IAsset asset)
{
var outputAsset = asset;

if (asset != null && !_processors.IsNullOrEmpty())
{
foreach (var processor in _processors)
{
outputAsset = processor.Value.Process(outputAsset);
}
}

return outputAsset;
}

public void Dispose()
{
//if (!_processors.IsNullOrEmpty())
//{
// foreach (var processor in _processors)
// {
// processor.Value.Stop();
// }

// _processors.Clear();
//}
}


private static void InitializeProcessors()
{
_processorTypes.Clear();

var assemblies = Assemblies.Get();
if (!assemblies.IsNullOrEmpty())
{
foreach (var assembly in assemblies)
{
try
{
var assemblyTypes = assembly.GetTypes();
if (!assemblyTypes.IsNullOrEmpty())
{
foreach (var type in assemblyTypes)
{
if (typeof(IMTConnectAgentProcessor).IsAssignableFrom(type) && !type.IsInterface && !type.IsAbstract)
{
_processorTypes.Add(type);
}
}
}
}
catch { }
}
}
}
}
}
Loading

0 comments on commit af634c5

Please sign in to comment.