Skip to content

Commit

Permalink
Change UaPublisher trigger from ManualResetEvent to Timer (#1399)
Browse files Browse the repository at this point in the history
* Change UaPublisher trigger from ManualResetEvent to Timer
  • Loading branch information
AlinMoldovean committed May 12, 2021
1 parent 923597e commit f332d87
Showing 1 changed file with 73 additions and 48 deletions.
121 changes: 73 additions & 48 deletions Libraries/Opc.Ua.PubSub/UaPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;

namespace Opc.Ua.PubSub
{
Expand All @@ -43,7 +43,9 @@ internal class UaPublisher : IUaPublisher
private const int kMinPublishingInterval = 10;
private object m_lock = new object();
// event used to trigger publish
private ManualResetEvent m_shutdownEvent;

private Timer m_PublishingTimer;
private ElapsedEventHandler m_periodicPublishHandler = null;

private IUaPubSubConnection m_pubSubConnection;
private WriterGroupDataType m_writerGroupConfiguration;
Expand Down Expand Up @@ -111,8 +113,13 @@ protected virtual void Dispose(bool disposing)
if (disposing)
{
Stop();

// free managed resources
m_shutdownEvent.Dispose();
if (m_PublishingTimer != null)
{
Utils.SilentDispose(m_PublishingTimer);
m_PublishingTimer = null;
}
}
}
#endregion
Expand All @@ -124,86 +131,104 @@ protected virtual void Dispose(bool disposing)
/// </summary>
public void Start()
{
StartPublishingTimer();
Utils.Trace("The UaPublisher for WriterGroup '{0}' was started.", m_writerGroupConfiguration.Name);
}

/// <summary>
/// Stop the publishing thread.
/// </summary>
public virtual void Stop()
{
StopPublishingTimer();
Utils.Trace("The UaPublisher for WriterGroup '{0}' was stopped.", m_writerGroupConfiguration.Name);
}
#endregion

#region Private Methods

/// <summary>
/// Start the publish timer.
/// </summary>
private void StartPublishingTimer()
{
int sleepCycle = 0;

lock (m_lock)
{
m_shutdownEvent.Reset();
if (m_PublishingTimer != null)
{
m_PublishingTimer.Dispose();
m_PublishingTimer = null;
}

m_PublishingTimer = new Timer();

Task.Run(() => {
PublishData();
});
if (m_writerGroupConfiguration != null)
{
sleepCycle = Convert.ToInt32(m_writerGroupConfiguration.PublishingInterval);
}
}

if (sleepCycle < kMinPublishingInterval)
{
sleepCycle = kMinPublishingInterval;
}

lock (m_lock)
{
m_PublishingTimer.Elapsed += m_periodicPublishHandler;
m_PublishingTimer.Interval = sleepCycle;
m_PublishingTimer.Enabled = true;
}
Utils.Trace("The UaPublisher for WriterGroup '{0}' was started.", m_writerGroupConfiguration.Name);
}

/// <summary>
/// stop the publishing thread.
/// Stop the publish timer.
/// </summary>
public virtual void Stop()
private void StopPublishingTimer()
{
lock (m_lock)
{
m_shutdownEvent.Set();
if (m_PublishingTimer != null)
{
m_PublishingTimer.Elapsed -= m_periodicPublishHandler;
m_PublishingTimer.Enabled = false;
}
}
Utils.Trace("The UaPublisher for WriterGroup '{0}' was stopped.", m_writerGroupConfiguration.Name);
}
#endregion

#region Private Methods
/// <summary>
/// Sets private members to default values.
/// </summary>
private void Initialize()
{
m_shutdownEvent = new ManualResetEvent(true);
m_periodicPublishHandler = new ElapsedEventHandler(PeriodicTimerPublishData);
}

/// <summary>
/// Periodically checks if there is data to publish.
/// </summary>
private void PublishData()
private void PeriodicTimerPublishData(object source, ElapsedEventArgs e)
{
try
{
do
lock (m_lock)
{
int sleepCycle = 0;

lock (m_lock)
if (m_pubSubConnection.CanPublish(m_writerGroupConfiguration))
{
if (m_writerGroupConfiguration != null)
// call on a new thread
Task.Run(() =>
{
sleepCycle = Convert.ToInt32(m_writerGroupConfiguration.PublishingInterval);
}
}

if (sleepCycle < kMinPublishingInterval)
{
sleepCycle = kMinPublishingInterval;
}

if (m_shutdownEvent.WaitOne(sleepCycle))
{
Utils.Trace(Utils.TraceMasks.Information, "UaPublisher: Publish Thread Exited Normally.");
break;
}

lock (m_lock)
{
if (m_pubSubConnection.CanPublish(m_writerGroupConfiguration))
{
// call on a new thread
Task.Run(() => {
PublishMessages();
});
}
PublishMessages();
});
}
}
while (true);
}
catch (Exception e)
catch (Exception ex)
{
// Unexpected exception in publish thread!
Utils.Trace(e, "UaPublisher: Publish Thread Exited Unexpectedly");
Utils.Trace(ex, "UaPublisher: PeriodicPublishData Exited Unexpectedly");
}
}

Expand Down

0 comments on commit f332d87

Please sign in to comment.