Skip to content

Commit

Permalink
Test to validate client publish request count (#1516)
Browse files Browse the repository at this point in the history
* add test to validate that too many BadTooManyPublishrequest do not cause message flooding
* improve the sequential publishing test to avoid the flaky tests
  • Loading branch information
mregen committed Sep 21, 2021
1 parent 7d310d5 commit 0d34b7d
Showing 1 changed file with 106 additions and 45 deletions.
151 changes: 106 additions & 45 deletions Tests/Opc.Ua.Client.Tests/SubscriptionTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public async Task OneTimeSetUpAsync(TextWriter writer = null)
m_serverFixture.TraceMasks = Utils.TraceMasks.Error | Utils.TraceMasks.Security;
}
m_server = await m_serverFixture.StartAsync(writer ?? TestContext.Out).ConfigureAwait(false);
// start client
await m_clientFixture.LoadClientConfiguration().ConfigureAwait(false);
m_url = new Uri("opc.tcp://localhost:" + m_serverFixture.Port.ToString());
m_session = await m_clientFixture.ConnectAsync(m_url, SecurityPolicies.Basic256Sha256).ConfigureAwait(false);
Expand Down Expand Up @@ -339,61 +340,70 @@ public void LoadSubscription()
}

[Theory, Order(300)]
//This test doesn't deterministically prove sequential publishing, but rather relies on a subscription not being able to handle the message load.
//This test should be re-implemented with a Session that deterministically provides the wrong order of messages to Subscription.
/// <remarks>
/// This test doesn't deterministically prove sequential publishing,
/// but rather relies on a subscription not being able to handle the message load.
/// This test should be re-implemented with a Session that deterministically
/// provides the wrong order of messages to Subscription.
///</remarks>
public void SequentialPublishingSubscription(bool enabled)
{
var subscriptionList = new List<Subscription>();
var subscriptionIds = new UInt32Collection();
var sequenceBroken = new AutoResetEvent(false);
var numOfNotifications = 0L;
const int TestWaitTime = 10000;
const int MonitoredItemsPerSubscription = 500;
const int Subscriptions = 10;

// multiple Subscriptions to enforce multiple queued publish requests
for (int i = 0; i < Subscriptions; i++)
{
var subscription = new Subscription(m_session.DefaultSubscription) {
var s = new Subscription(m_session.DefaultSubscription) {
SequentialPublishing = enabled,
PublishingInterval = 0,
DisableMonitoredItemCache = true, //Not needed
PublishingEnabled = false
};
subscriptionList.Add(subscription);
subscriptionList.Add(s);
}

//Create many monitored items on the server status current time, and track the last reported source timestamp
var list = Enumerable.Range(1, MonitoredItemsPerSubscription).Select(_ => new MonitoredItem(subscription.DefaultItem) {
StartNodeId = new NodeId("Scalar_Simulation_Int32", 2),
SamplingInterval = 0,
}).ToList();
var dict = list.ToDictionary(item => item.ClientHandle, _ => DateTime.MinValue);
var subscription = subscriptionList[0];

subscription.AddItems(list);
var result = m_session.AddSubscription(subscription);
Assert.True(result);
subscription.Create();
var publishInterval = (int)subscription.CurrentPublishingInterval;
TestContext.Out.WriteLine($"CurrentPublishingInterval: {publishInterval}");

//Need to realize test failed, assert needs to be brought to this thread
subscription.FastDataChangeCallback = (_, notification, __) => {
notification.MonitoredItems.ForEach(item => {
Interlocked.Increment(ref numOfNotifications);
if (dict[item.ClientHandle] > item.Value.SourceTimestamp)
{
sequenceBroken.Set(); //Out of order encountered
}
dict[item.ClientHandle] = item.Value.SourceTimestamp;
});
};
}
// Create monitored items on the server
// and track the last reported source timestamp
var list = Enumerable.Range(1, MonitoredItemsPerSubscription).Select(_ => new MonitoredItem(subscription.DefaultItem) {
StartNodeId = new NodeId("Scalar_Simulation_Int32", 2),
SamplingInterval = 0,
}).ToList();
var dict = list.ToDictionary(item => item.ClientHandle, _ => DateTime.MinValue);
subscription.AddItems(list);

UInt32Collection subscriptionIds = new UInt32Collection();
foreach (var subscription in subscriptionList)
foreach (var s in subscriptionList)
{
subscriptionIds.Add(subscription.Id);
var boolResult = m_session.AddSubscription(s);
Assert.True(boolResult);
s.Create();
var publishInterval = (int)s.CurrentPublishingInterval;
TestContext.Out.WriteLine($"CurrentPublishingInterval: {publishInterval}");
subscriptionIds.Add(s.Id);
}

//Need to realize test failed, assert needs to be brought to this thread
subscription.FastDataChangeCallback = (_, notification, __) => {
notification.MonitoredItems.ForEach(item => {
Interlocked.Increment(ref numOfNotifications);
if (dict[item.ClientHandle] > item.Value.SourceTimestamp)
{
TestContext.Out.WriteLine("Out of order encountered");
sequenceBroken.Set();
return;
}
dict[item.ClientHandle] = item.Value.SourceTimestamp;
Thread.Sleep(10);
});
};

var stopwatch = Stopwatch.StartNew();

// start
Expand All @@ -410,31 +420,82 @@ public void SequentialPublishingSubscription(bool enabled)
TestContext.Out.WriteLine($"Ran for: {elapsed:N} seconds");
long totalNotifications = Interlocked.Read(ref numOfNotifications);
double notificationRate = totalNotifications / elapsed;
foreach (var subscription in subscriptionList)
int outstandingMessageWorkers = subscription.OutstandingMessageWorkers;
TestContext.Out.WriteLine($"Id: {subscription.Id} Outstanding workers: {outstandingMessageWorkers}");

// clean up before validating conditions
foreach (var s in subscriptionList)
{
int outstandingMessageWorkers = subscription.OutstandingMessageWorkers;
TestContext.Out.WriteLine($"Id: {subscription.Id} Outstanding workers: {outstandingMessageWorkers}");
var result = m_session.RemoveSubscription(s);
Assert.True(result);
}

TestContext.Out.WriteLine($"Number of notifications: {totalNotifications:N0}");
TestContext.Out.WriteLine($"Notifications rate: {notificationRate:N} per second"); //How fast it processed notifications.
//How fast it processed notifications.
TestContext.Out.WriteLine($"Notifications rate: {notificationRate:N} per second");
//No notifications means nothing worked
Assert.NotZero(totalNotifications);
// catch if expected/unexpected Out-of-sequence occurred
Assert.AreEqual(enabled, !failed);
}

[Test, Order(400)]
public async Task PublishRequestCount()
{
var subscriptionList = new List<Subscription>();
var numOfNotifications = 0L;
const int TestWaitTime = 10000;
const int MonitoredItemsPerSubscription = 50;
const int Subscriptions = 50;
const int MaxServerPublishRequest = 20;

Assert.NotZero(totalNotifications); //No notifications means nothing worked
if (enabled)
for (int i = 0; i < Subscriptions; i++)
{
//catch if Out-of-sequence occurred
Assert.False(failed);
var subscription = new Subscription(m_session.DefaultSubscription) {
PublishingInterval = 0,
DisableMonitoredItemCache = true,
PublishingEnabled = true
};
subscriptionList.Add(subscription);

var list = Enumerable.Range(1, MonitoredItemsPerSubscription).Select(_ => new MonitoredItem(subscription.DefaultItem) {
StartNodeId = new NodeId("Scalar_Simulation_Int32", 2),
SamplingInterval = 0,
}).ToList();
var dict = list.ToDictionary(item => item.ClientHandle, _ => DateTime.MinValue);

subscription.AddItems(list);
var result = m_session.AddSubscription(subscription);
Assert.True(result);
await subscription.CreateAsync();
var publishInterval = (int)subscription.CurrentPublishingInterval;
TestContext.Out.WriteLine($"CurrentPublishingInterval: {publishInterval}");

subscription.FastDataChangeCallback = (_, notification, __) => {
notification.MonitoredItems.ForEach(item => {
Interlocked.Increment(ref numOfNotifications);
});
};
}

var notificationsPerSecond = Subscriptions * MonitoredItemsPerSubscription;
Assert.AreEqual(notificationsPerSecond * (elapsed + 0.5), totalNotifications, notificationsPerSecond);
var stopwatch = Stopwatch.StartNew();

await Task.Delay(1000);

// verify that number of active publishrequests is never exceeded
while (stopwatch.ElapsedMilliseconds < TestWaitTime)
{
// use the sample server default for max publish request count
Assert.GreaterOrEqual(MaxServerPublishRequest, m_session.GoodPublishRequestCount);
await Task.Delay(100);
}

foreach (var subscription in subscriptionList)
{
var result = m_session.RemoveSubscription(subscription);
var result = await m_session.RemoveSubscriptionAsync(subscription);
Assert.True(result);
}
}

#endregion

#region Private Methods
Expand Down

0 comments on commit 0d34b7d

Please sign in to comment.