Skip to content

Commit

Permalink
Enable Subscription to invoke callbacks sequentially and synchronously (
Browse files Browse the repository at this point in the history
#1493)

Pertaining to the discussion started in Issue #957, this Pull Request seeks to add capabilities for Subscription to sequentially invoke its MonitoredItem and FastDataChange callbacks to maintain order properly. The key goal is to avoid out-of-order monitored items. To this effect, the Sequential Publishing capability is added to Subscription
Resolves #957 

### SequentialPublishing
A toggle which forces Subscription to raise callbacks only in a sequential order by the SequenceNumber of the incoming message. It limits the number of tasks that can process publish responses to 1, and enforces a strict +1 Sequence Number requirement before releasing a message.

#### Backwards compatibility
The new property has been added as DataMember at the end to maintain backwards compatibility with the DataContract, where such may be used. The relevant copy constructor has also been updated.

The feature is disabled by default, so the existing behavior is the default. Users must explicitly set the property on the Subscription object to enable these new features. 

#### Leveraging existing "time machine" in SaveMessageInCache
There is already a mechanism for re-ordering messages that entered in the wrong order. This change makes use of that existing "time machine" which sorts messages into chronological order (by the sequence number) to only pull off messages with a proper +1 sequence number each time.

#### KeepAlive messages advancing sequence number
KeepAlive messages do not contain notifications, and do not enter the proper branch in the code to be pulled out. They will not interrupt sequential flow. Since the next message in the sequence with data will "re-use" that sequence number, it can be expected to maintain sequence. 

#### Delayed messages
When sequential publishing is enabled, if a message is genuinely missing from the sequence, it will "hold up" the messages until it either arrives or is pulled out of the incoming messages list by being too old. In either case, it is considered "processed" for the sequence purposes at that point and the rest of the messages may proceed. 

The automatic republish request mechanism is also leveraged here for that purpose as well.

#### SequenceNumber roll-over after max value
As specified in the OPC UA spec, at 1ms publishing, this would take almost 50 days of uptime. Naturally at any "reasonable" publishing interval this time is even longer. The matter itself is also not addressed throughout the whole existing code that handled it before this change. (Though some consideration can be seen in some places, such as placing messages at the end of the incoming message linked list if no better place was found) 

#### Locking on m_cache to get the semaphore
I've considered making a separate locking object for the semaphore management, but decided it is not needed. The message worker would need to obtain m_cache lock later anyway, and momentarily obtaining it is not harmful for the overall flow.
Only in cases of serious contention between new PublishResponses coming in and workers attempting to work would this cause any problems.

#### Callbacks that "take a long time"
This change will naturally suffer from callbacks that take a long time to invoke. In our use case, each callback message is passed into an ActionBlock for handling, so our callbacks are fast. But Other clients may invoke much more code on these callbacks.
Sequential publishing means this capacity is possibly bottlenecked. Sequential publishing should be used for its intended purpose and with callbacks that do not hold up their calling thread for a long time. To ensure proper sequential callbacks, they cannot just be started in order, they have to be processed fully in order. 

#### Changing this property while the Subscription is active
There is limited support to on-the-fly changes to this property, and this is not the intended use case. It is meant to be set once and ideally never changed while messages are being processed.

If users need certain items properly sequenced and some items they can accept out-of-order or ignore outdated messages, they can do this by defining two separate Subscription objects, same as they would if they needed different Publishing Intervals. 

#### Why semaphore?
At first I had considered using ActionBlock from TPL DataFlow and limiting its concurrency to the number of desired message workers, but I had opted not to add a dependency on DataFlow only for this purpose. (Referencing all of DataFlow just for ActionBlock's automatic concurrency limiting is wasteful.

I had also considered using a Limited Concurrency Task Scheduler implementation and queueing the message workers on that instead of the normal ThreadPool via Task.Run, but that is similarly more complex than it needs to be.

#### Why async Task? What was wrong with Void?
While being on a new Task from the ThreadPool anyway, the worker would still occupy a task from the ThreadPool if it was using the synchronous Wait() method, which is a blocking call. By using WaitAsync, the ThreadPool task goes "back" to the ThreadPool until the semaphore is released. 

### Disclaimer - I am not an OPC UA expert
Please do not assume I have "done all my homework" with regard to these changes. I have attempted to learn as much as I can in the time I was working on this change, but I fully expect to have missed a few spots and specifics of the OPC UA protocol.

I have done some testing in our application to verify the out-of-order problem is solved, and that no significant delay is introduced by the additional work done to facilitate this.
  • Loading branch information
AvenDonn committed Sep 16, 2021
1 parent 8551619 commit 92ae1d3
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 7 deletions.
133 changes: 126 additions & 7 deletions Libraries/Opc.Ua.Client/Subscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public Subscription(Subscription template, bool copyEventHandlers)
m_priority = template.m_priority;
m_timestampsToReturn = template.m_timestampsToReturn;
m_maxMessageCount = template.m_maxMessageCount;
m_sequentialPublishing = template.m_sequentialPublishing;
m_defaultItem = (MonitoredItem)template.m_defaultItem.MemberwiseClone();
m_defaultItem = template.m_defaultItem;
m_handle = template.m_handle;
Expand Down Expand Up @@ -150,6 +151,8 @@ private void Initialize()
m_timestampsToReturn = TimestampsToReturn.Both;
m_maxMessageCount = 10;
m_outstandingMessageWorkers = 0;
m_sequentialPublishing = false;
m_lastSequenceNumberProcessed = 0;
m_messageCache = new LinkedList<NotificationMessage>();
m_monitoredItems = new SortedDictionary<uint, MonitoredItem>();
m_deletedItems = new List<MonitoredItem>();
Expand All @@ -176,11 +179,14 @@ public void Dispose()
/// <summary>
/// An overrideable version of the Dispose.
/// </summary>
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2213:DisposableFieldsShouldBeDisposed", MessageId = "m_publishTimer")]
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2213:DisposableFieldsShouldBeDisposed", MessageId = nameof(m_publishTimer))]
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2213:DisposableFieldsShouldBeDisposed", MessageId = nameof(m_messageWorkersSemaphore))]
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
Utils.SilentDispose(m_messageWorkersSemaphore);
m_messageWorkersSemaphore = null;
Utils.SilentDispose(m_publishTimer);
m_publishTimer = null;
}
Expand Down Expand Up @@ -363,6 +369,35 @@ public bool DisableMonitoredItemCache
set => m_disableMonitoredItemCache = value;
}

/// <summary>
/// Gets or sets the behavior of waiting for sequential order in handling incoming messages.
/// </summary>
/// <value>
/// <c>true</c> if incoming messages are handled sequentially; <c>false</c> otherwise.
/// </value>
/// <remarks>
/// Setting <see cref="SequentialPublishing"/> to <c>true</c> means incoming messages are processed in a "single-threaded" manner and callbacks will not be invoked in parallel.
/// </remarks>
[DataMember(Order = 13)]
public bool SequentialPublishing
{
get
{
lock (m_cache)
{
return m_sequentialPublishing;
}
}
set
{
lock (m_cache)
{
m_sequentialPublishing = value;
ManageMessageWorkerSemaphore();
}
}
}

/// <summary>
/// Gets or sets the fast data change callback.
/// </summary>
Expand Down Expand Up @@ -705,6 +740,8 @@ public void Create()

AdjustCounts(ref revisedKeepAliveCount, ref revisedLifetimeCounter);

ManageMessageWorkerSemaphore();

m_session.CreateSubscription(
null,
m_publishingInterval,
Expand Down Expand Up @@ -1240,17 +1277,25 @@ public IList<MonitoredItem> DeleteItems()

if (next != null)
{
//If the message being removed is supposed to be the next message, advance it to release anything waiting on it to be processed
if (entry.SequenceNumber == m_lastSequenceNumberProcessed + 1)
{
if (!entry.Processed)
{
Utils.Trace("Subscription {0} skipping PublishResponse Sequence Number {1}", Id, entry.SequenceNumber);
}

m_lastSequenceNumberProcessed = entry.SequenceNumber;
}

m_incomingMessages.Remove(node);
}

node = next;
}

// process messages.
Task.Run(() => {
Interlocked.Increment(ref m_outstandingMessageWorkers);
OnMessageReceived(null);
});
Task.Run(OnMessageReceived);
}

// send notification that publishing has recovered.
Expand Down Expand Up @@ -1664,10 +1709,33 @@ private void AdjustCounts(ref uint keepAliveCount, ref uint lifetimeCount)
/// <summary>
/// Processes the incoming messages.
/// </summary>
private void OnMessageReceived(object state)
private async Task OnMessageReceived()
{
SemaphoreSlim semaphore; //Avoid semaphore being replaced for this instance while running, retain reference locally.
lock (m_cache)
{
//Semaphore is maintained under m_cache lock, avoid semaphore swap issues when possible. The wait call will still sync the message workers properly.
semaphore = m_messageWorkersSemaphore;
}

var needSemaphore = semaphore != null; //Later used to know if releasing the semaphore is needed. Assumed entered if needed.
if (needSemaphore)
{
try
{
await semaphore.WaitAsync().ConfigureAwait(false);
}
catch (ObjectDisposedException)
{
Utils.Trace("Message Workers semaphore replaced, worker released");
needSemaphore = false;
//Semaphore was replaced to change the number of maximum allowed workers - proceed without it, momentarily more workers than allowed may exist.
//Note for sequential publishing, this can only happen if sequential publishing is enabled or disabled, so it will not interrupt it. Changing max workers while sequential publishing is enabled will not trigger a semaphore change.
}
}
try
{
Interlocked.Increment(ref m_outstandingMessageWorkers);
Session session = null;
uint subscriptionId = 0;
EventHandler callback = null;
Expand All @@ -1683,7 +1751,8 @@ private void OnMessageReceived(object state)
for (LinkedListNode<IncomingMessage> ii = m_incomingMessages.First; ii != null; ii = ii.Next)
{
// update monitored items with unprocessed messages.
if (ii.Value.Message != null && !ii.Value.Processed)
if (ii.Value.Message != null && !ii.Value.Processed &&
(!m_sequentialPublishing || ii.Value.SequenceNumber <= m_lastSequenceNumberProcessed + 1)) //If sequential publishing is enabled, only release messages in perfect sequence.
{
if (messagesToProcess == null)
{
Expand All @@ -1700,6 +1769,12 @@ private void OnMessageReceived(object state)

m_messageCache.AddLast(ii.Value.Message);
ii.Value.Processed = true;

//Keep the last sequence number processed going up
if (ii.Value.SequenceNumber > m_lastSequenceNumberProcessed)
{
m_lastSequenceNumberProcessed = ii.Value.SequenceNumber;
}
}

// check for missing messages.
Expand Down Expand Up @@ -1823,6 +1898,26 @@ private void OnMessageReceived(object state)
finally
{
Interlocked.Decrement(ref m_outstandingMessageWorkers);
if (needSemaphore && semaphore != null)
{
//Release semaphore taken earlier
try
{
semaphore.Release(); //Release the right semaphore
}
catch (ObjectDisposedException)
{
//Ignore, disposed and handling finished
}
catch (SemaphoreFullException e)
{
Utils.Trace(e, "Released semaphore too many times");
}
catch (Exception e)
{
Utils.Trace(e, "Error while finishing processing of incoming messages.");
}
}
}
}

Expand Down Expand Up @@ -2074,6 +2169,27 @@ private void SaveEvents(NotificationMessage message, EventNotificationList notif
monitoredItem.SaveValueInCache(eventFields);
}
}

/// <summary>
/// Manages the semaphore used to limit message workers for handling incoming messages
/// </summary>
private void ManageMessageWorkerSemaphore()
{
lock (m_cache)
{
if (m_sequentialPublishing)
{
if (m_messageWorkersSemaphore == null) //Only create the semaphore if it isn't already created. (Not already in sequential publishing mode)
m_messageWorkersSemaphore = new SemaphoreSlim(1);//Sequential publishing means only one worker can be active, or else sequence can be violated.
}
else //Not in sequential publishing mode - no need for semaphore.
{
//Semaphore is disposed if needed.
m_messageWorkersSemaphore?.Dispose();
m_messageWorkersSemaphore = null;
}
}
}
#endregion

#region Private Fields
Expand Down Expand Up @@ -2113,6 +2229,9 @@ private void SaveEvents(NotificationMessage message, EventNotificationList notif
private FastDataChangeNotificationEventHandler m_fastDataChangeCallback;
private FastEventNotificationEventHandler m_fastEventCallback;
private int m_outstandingMessageWorkers;
private SemaphoreSlim m_messageWorkersSemaphore;
private bool m_sequentialPublishing;
private uint m_lastSequenceNumberProcessed;

/// <summary>
/// A message received from the server cached until is processed or discarded.
Expand Down
2 changes: 2 additions & 0 deletions Libraries/Opc.Ua.Client/SubscriptionAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public async Task CreateAsync(CancellationToken ct = default)

AdjustCounts(ref revisedMaxKeepAliveCount, ref revisedLifetimeCount);

ManageMessageWorkerSemaphore();

var response = await m_session.CreateSubscriptionAsync(
null,
m_publishingInterval,
Expand Down
99 changes: 99 additions & 0 deletions Tests/Opc.Ua.Client.Tests/SubscriptionTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
Expand Down Expand Up @@ -336,6 +337,104 @@ public void LoadSubscription()
Assert.True(result);

}

[Theory, Order(300)]
//This test doesn't deterministically prove sequential publishing, but rather relies on a subscription not being able to handle the message load.
//This test should be re-implemented with a Session that deterministically provides the wrong order of messages to Subscription.
public void SequentialPublishingSubscription(bool enabled)
{
var subscriptionList = new List<Subscription>();
var sequenceBroken = new AutoResetEvent(false);
var numOfNotifications = 0L;
const int TestWaitTime = 10000;
const int MonitoredItemsPerSubscription = 500;
const int Subscriptions = 10;

for (int i = 0; i < Subscriptions; i++)
{
var subscription = new Subscription(m_session.DefaultSubscription) {
SequentialPublishing = enabled,
PublishingInterval = 0,
DisableMonitoredItemCache = true, //Not needed
PublishingEnabled = false
};
subscriptionList.Add(subscription);

//Create many monitored items on the server status current time, and track the last reported source timestamp
var list = Enumerable.Range(1, MonitoredItemsPerSubscription).Select(_ => new MonitoredItem(subscription.DefaultItem) {
StartNodeId = new NodeId("Scalar_Simulation_Int32", 2),
SamplingInterval = 0,
}).ToList();
var dict = list.ToDictionary(item => item.ClientHandle, _ => DateTime.MinValue);

subscription.AddItems(list);
var result = m_session.AddSubscription(subscription);
Assert.True(result);
subscription.Create();
var publishInterval = (int)subscription.CurrentPublishingInterval;
TestContext.Out.WriteLine($"CurrentPublishingInterval: {publishInterval}");

//Need to realize test failed, assert needs to be brought to this thread
subscription.FastDataChangeCallback = (_, notification, __) => {
notification.MonitoredItems.ForEach(item => {
Interlocked.Increment(ref numOfNotifications);
if (dict[item.ClientHandle] > item.Value.SourceTimestamp)
{
sequenceBroken.Set(); //Out of order encountered
}
dict[item.ClientHandle] = item.Value.SourceTimestamp;
});
};
}

UInt32Collection subscriptionIds = new UInt32Collection();
foreach (var subscription in subscriptionList)
{
subscriptionIds.Add(subscription.Id);
}

var stopwatch = Stopwatch.StartNew();

// start
m_session.SetPublishingMode(null, true, subscriptionIds, out var results, out var diagnosticInfos);

// Wait for out-of-order to occur
var failed = sequenceBroken.WaitOne(TestWaitTime);

// stop
m_session.SetPublishingMode(null, false, subscriptionIds, out results, out diagnosticInfos);

//Log information
var elapsed = stopwatch.Elapsed.TotalMilliseconds / 1000.0;
TestContext.Out.WriteLine($"Ran for: {elapsed:N} seconds");
long totalNotifications = Interlocked.Read(ref numOfNotifications);
double notificationRate = totalNotifications / elapsed;
foreach (var subscription in subscriptionList)
{
int outstandingMessageWorkers = subscription.OutstandingMessageWorkers;
TestContext.Out.WriteLine($"Id: {subscription.Id} Outstanding workers: {outstandingMessageWorkers}");
}
TestContext.Out.WriteLine($"Number of notifications: {totalNotifications:N0}");
TestContext.Out.WriteLine($"Notifications rate: {notificationRate:N} per second"); //How fast it processed notifications.

Assert.NotZero(totalNotifications); //No notifications means nothing worked
if (enabled)
{
//catch if Out-of-sequence occurred
Assert.False(failed);
}

var notificationsPerSecond = Subscriptions * MonitoredItemsPerSubscription;
Assert.AreEqual(notificationsPerSecond * (elapsed + 0.5), totalNotifications, notificationsPerSecond);

foreach (var subscription in subscriptionList)
{
var result = m_session.RemoveSubscription(subscription);
Assert.True(result);
}
}

#endregion

#region Private Methods
Expand Down

0 comments on commit 92ae1d3

Please sign in to comment.