Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Subscription to invoke callbacks sequentially and synchronously #1493

Merged

Conversation

AvenDonn
Copy link
Contributor

@AvenDonn AvenDonn commented Aug 25, 2021

Pertaining to the discussion started in Issue #957, this Pull Request seeks to add capabilities for Subscription to sequentially invoke its MonitoredItem and FastDataChange callbacks to maintain order properly. The key goal is to avoid out-of-order monitored items. To this effect, the Sequential Publishing capability is added to Subscription
Resolves #957

SequentialPublishing

Subscription.SequentialPublishing (default false as before)
A toggle which forces Subscription to raise callbacks only in a sequential order by the SequenceNumber of the incoming message.
It limits the number of tasks that can process publish responses to 1, and enforces a strict +1 Sequence Number requirement before releasing a message.

How to activate: On a Subscription instance, set SequentialPublishing to true.

Technical details

I have written some notes below on my technical considerations. I recommend examining the code before reading this section, as this section serves to answer some of the questions you may have while reviewing the code.

Backwards compatibility

The new properties have been added as DataMembers at the end to maintain backwards compatibility with the DataContract, where such may be used. The relevant copy constructor has also been updated.

Both features are disabled by default, so the existing behavior is the default. Users must explicitly set these properties on the Subscription object to enable these new features.

Leveraging existing "time machine" in SaveMessageInCache

There is already a mechanism for re-ordering messages that entered in the wrong order. This change makes use of that existing "time machine" which sorts messages into chronological order (by the sequence number) to only pull off messages with a proper +1 sequence number each time.

KeepAlive messages advancing sequence number

KeepAlive messages do not contain notifications, and do not enter the proper branch in the code to be pulled out. They will not interrupt sequential flow. Since the next message in the sequence with data will "re-use" that sequence number, it can be expected to maintain sequence.

Delayed messages

When sequential publishing is enabled, if a message is genuinely missing from the sequence, it will "hold up" the messages until it either arrives or is pulled out of the incoming messages list by being too old. In either case, it is considered "processed" for the sequence purposes at that point and the rest of the messages may proceed.

The automatic republish request mechanism is also leveraged here for that purpose as well.

SequenceNumber roll-over after max value

As specified in the OPC UA spec, at 1ms publishing, this would take almost 50 days of uptime. Naturally at any "reasonable" publishing interval this time is even longer. The matter itself is also not addressed throughout the whole existing code that handled it before this change. (Though some consideration can be seen in some places, such as placing messages at the end of the incoming message linked list if no better place was found)

Locking on m_cache to get the semaphore

I've considered making a separate locking object for the semaphore management, but decided it is not needed. The message worker would need to obtain m_cache lock later anyway, and momentarily obtaining it is not harmful for the overall flow.
Only in cases of serious contention between new PublishResponses coming in and workers attempting to work would this cause any problems.

Callbacks that "take a long time"

This change will naturally suffer from callbacks that take a long time to invoke. In our use case, each callback message is passed into an ActionBlock for handling, so our callbacks are fast. But Other clients may invoke much more code on these callbacks.
Sequential publishing means this capacity is possibly bottlenecked. Sequential publishing should be used for its intended purpose and with callbacks that do not hold up their calling thread for a long time. To ensure proper sequential callbacks, they cannot just be started in order, they have to be processed fully in order.

Changing this property while the Subscription is active

There is limited support to on-the-fly changes to this property, and this is not the intended use case. It is meant to be set once and ideally never changed while messages are being processed.

If users need certain items properly sequenced and some items they can accept out-of-order or ignore outdated messages, they can do this by defining two separate Subscription objects, same as they would if they needed different Publishing Intervals.

Why semaphore?

At first I had considered using ActionBlock from TPL DataFlow and limiting its concurrency to the number of desired message workers, but I had opted not to add a dependency on DataFlow only for this purpose. (Referencing all of DataFlow just for ActionBlock's automatic concurrency limiting is wasteful.

I had also considered using a Limited Concurrency Task Scheduler implementation and queueing the message workers on that instead of the normal ThreadPool via Task.Run, but that is similarly more complex than it needs to be.

Why async Task? What was wrong with Void?

While being on a new Task from the ThreadPool anyway, the worker would still occupy a task from the ThreadPool if it was using the synchronous Wait() method, which is a blocking call. By using WaitAsync, the ThreadPool task goes "back" to the ThreadPool until the semaphore is released.

Disclaimer - I am not an OPC UA expert

Please do not assume I have "done all my homework" with regard to these changes. I have attempted to learn as much as I can in the time I was working on this change, but I fully expect to have missed a few spots and specifics of the OPC UA protocol.

I have done some testing in our application to verify the out-of-order problem is solved, and that no significant delay is introduced by the additional work done to facilitate this.

… workers wait for a proper +1 sequence of messages before dispatching to callbacks
…ssage workers while subscription is active.

Unified toggling sequential publishing to also automatically limit active message workers to 1. (Can't have sequential publishing if two threads can call callbacks out of order)
@CLAassistant
Copy link

CLAassistant commented Aug 25, 2021

CLA assistant check
All committers have signed the CLA.

@lgtm-com
Copy link

lgtm-com bot commented Aug 25, 2021

This pull request introduces 1 alert when merging 35bce2c into 4a2f13e - view on LGTM.com

new alerts:

  • 1 for Dereferenced variable may be null

@AvenDonn AvenDonn marked this pull request as ready for review August 25, 2021 16:02
@mregen
Copy link
Contributor

mregen commented Sep 1, 2021

Hi @AvenDonn , thanks for your contribution and the thorough explanations. Since your contribution could be a breaking change we need to reserve some time to review and test. Please stay tuned...

@lgtm-com
Copy link

lgtm-com bot commented Sep 7, 2021

This pull request introduces 1 alert when merging efc6f34 into 76afe82 - view on LGTM.com

new alerts:

  • 1 for Dereferenced variable may be null

@codecov
Copy link

codecov bot commented Sep 7, 2021

Codecov Report

Merging #1493 (76afe82) into master (76afe82) will not change coverage.
The diff coverage is n/a.

❗ Current head 76afe82 differs from pull request most recent head 6cd2de8. Consider uploading reports for the commit 6cd2de8 to get more accurate results
Impacted file tree graph

@@           Coverage Diff           @@
##           master    #1493   +/-   ##
=======================================
  Coverage   51.93%   51.93%           
=======================================
  Files         307      307           
  Lines       58332    58332           
=======================================
  Hits        30292    30292           
  Misses      28040    28040           

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 76afe82...6cd2de8. Read the comment docs.

Copy link
Contributor

@mregen mregen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @AvenDonn , I see your approach here, it looks ok, but relies on the tasks to enter the semaphore in the order they were blocked. However, docs state:

There is no guaranteed order, such as first-in, first-out (FIFO) or last-in, first-out (LIFO), for blocked threads to enter the semaphore.

Libraries/Opc.Ua.Client/Subscription.cs Outdated Show resolved Hide resolved
Libraries/Opc.Ua.Client/Subscription.cs Show resolved Hide resolved
Libraries/Opc.Ua.Client/Subscription.cs Outdated Show resolved Hide resolved
Libraries/Opc.Ua.Client/Subscription.cs Outdated Show resolved Hide resolved
Libraries/Opc.Ua.Client/Subscription.cs Show resolved Hide resolved
Libraries/Opc.Ua.Client/Subscription.cs Outdated Show resolved Hide resolved
@AvenDonn
Copy link
Contributor Author

AvenDonn commented Sep 8, 2021

Hello @mregen, I have implemented your requested changes.

However, I think there may be some missing coverage in testing this change for compliance with the protocol.
The tool testing the compliance should repeat the tests for monitored item values with sequential publishing enabled. I'm not sure how to make sure such coverage is present and would appreciate your help.

However, I'm not sure it's very relevant to test arbitrary limits on max message workers.

@lgtm-com
Copy link

lgtm-com bot commented Sep 8, 2021

This pull request introduces 1 alert when merging 349a20b into 76afe82 - view on LGTM.com

new alerts:

  • 1 for Dereferenced variable may be null

@mregen
Copy link
Contributor

mregen commented Sep 9, 2021

Hi @AvenDonn , currently our code coverage is just trying to execute as much code as possible, but is mostly not trying to execute special cases. I added a case to AddSubscription which you could decorate with specific tests for maxMessage workers and sequential publishing.
Overall I think the code looks pretty good already and is definitely helping to improve the sequential ordering. If actual testing goes well we can merge it for the next release.

@mregen mregen added this to the 1.4.367 milestone Sep 9, 2021
@lgtm-com
Copy link

lgtm-com bot commented Sep 9, 2021

This pull request introduces 1 alert when merging d30979e into 76afe82 - view on LGTM.com

new alerts:

  • 1 for Dereferenced variable may be null

@AvenDonn
Copy link
Contributor Author

AvenDonn commented Sep 9, 2021

Hey @mregen, sounds good and glad you like the code.
I've noted the unit test change and I'm not sure how to integrate an actual logical test for the actual value of the notification here, but given no previous coverage of it existed I'm not sure it's my place to try and bodge together some kind of coverage. The important part is setting these new properties and making sure nothing else is affected, which this does.

Glad to hear this is a candidate for the next release!

@mregen
Copy link
Contributor

mregen commented Sep 13, 2021

Hi @AvenDonn , while discussing the fix a question came up.
What is the rationale for specifying the max number of threads seperately in the config, when the SequentialMessaging flag already sets it to one?

Thanks,
Martin

@AvenDonn
Copy link
Contributor Author

Hi @AvenDonn , while discussing the fix a question came up.
What is the rationale for specifying the max number of threads seperately in the config, when the SequentialMessaging flag already sets it to one?

Thanks,
Martin

Good question @mregen.
I've considered other clients and their possible wishes. While we (HP Indigo) only need sequential publishing and only on some of our subscriptions, other clients may have different needs.

The ability to control the number of worker tasks spawned by Subscription seems useful, as we've had that concern here as well. (There are some cases where we would really prefer to limit it to a low number and possibly suffer slower callbacks)

Since I am providing the mechanism via a semaphore, it was trivial to provide both features via the same mechanism and the same relatively simple code. So why not?

However, this makes no sense for sequential publishing. I've intended Sequential Publishing mode to be a promise to the client to receive callbacks in order. Thus parallelism makes no sense for it and it is set to a max of 1.

So maybe there are clients that wish to only limit the number of workers, not use sequential publishing with all of its caveats. (For example, sequential publishing only make sense over high-reliability networks where loss of messages a very rare occurrence, but limiting the number of workers applies to anything which may receive a high load of messages but not wish to dedicate so many tasks to processing them)

If however you or others feel this feature unnecessarily complicates the mechanism, it can be trivially removed by just removing the public property, or properly removed entirely. Not very difficult to do if it is deemed necessary.

Let me know the verdict and I'll remove it if needed.

@mregen
Copy link
Contributor

mregen commented Sep 14, 2021

Hi @AvenDonn, thanks for your explanation... we had a discussion here and I think if we can reduce the test coverage for now its a good thing, so we can focus on testing the sequential publishing case only. Even better were if you had been able to provide a test case. At a certain point we would like to be in a state where we only accept contributions with a corresponding test. Way to go...
So could you pease remove the config for the number of workers?

@mregen
Copy link
Contributor

mregen commented Sep 14, 2021

One more thing @AvenDonn : I was wondering what happens if a user sets SequentialPublishing and DisableMonitoredItemCache. Will the mechanism to recover with a republish still work?

@AvenDonn
Copy link
Contributor Author

@mregen I'm working on removing the max message workers feature now. I'm also going to see if I can write a test case. If I can mock a session to always provide messages out of order, I can verify sequential publishing fixes the order.

As for your question, DisableMonitoredItemCache has no effect on the feature. The tracking is done independently of the callback being invoked on the MonitoredItem. The intention is indeed that the FastDataChangeCallback and FastEventCallback are also affected.

Tracking of sequence happens in OnMessageReceived as messages are pulled out of the m_incomingMessages linked-list (Into which they were inserted in sequence by SaveMessageInCache)
This tracking is done independently of whether callbacks are invoked. If the message was pulled out of the list, it's considered processed. (As before, indicated by the ii.Value.Processed = true; assignment.)

When in sequential publishing mode, the condition to enter this branch is true only if the sequence number is under +1 of the last seen sequence number. (!m_sequentialPublishing || ii.Value.SequenceNumber <= m_lastSequenceNumberProcessed + 1)

If sequential publishing was enabled mid-way, all messages lower than the always-tracked m_lastSequenceNumberProcessed will still be allowed to go through. All subsequent messages must obey strict +1 sequence.
If sequential publishing is disabled mid-way, then messages are always allowed to be processed, and m_lastSequenceNumberProcessed is still updated whenever ii.Value.SequenceNumber goes higher.

This is done by this code:

//Keep the last sequence number processed going up
if (ii.Value.SequenceNumber > m_lastSequenceNumberProcessed)
{
    m_lastSequenceNumberProcessed = ii.Value.SequenceNumber;
}

In sequential publishing mode, both conditions can only be true if the sequence number is +1.
In non-sequential-publishing (default) mode this value is only increased, to ensure that when sequential publishing is enabled, the next sequence message will actually increment it.

@AvenDonn AvenDonn changed the title Enable Subscription to invoke callbacks sequentially and synchronously + control number of active message worker tasks Enable Subscription to invoke callbacks sequentially and synchronously Sep 14, 2021
@AvenDonn
Copy link
Contributor Author

Hello @mregen. I have removed the arbitrary message workers control feature and I have also attempted to include some kind of unit test.

I am not satisfied with what I came up with for the unit test, as it is not deterministic and simply relies on running long enough for out-of-sequence to happen purely due to callbacks taking a long time.
I haven't found a convenient way to mock Session to spew messages out-of-sequence and observe Subscription adjust them properly. Even if I did that, the existing mechanism still properly takes care of it.

This test case does somewhat cover the multi-threading issue causing scenario, but doesn't very well illustrate the benefit of the strict +1 sequence number enforcement.

I'm not sure what's the best course of action here. If you feel the test is too convoluted, please remove it. I've made it an easily revertible dedicated commit.
I'd appreciate with some help designing a better unit test for this case.

@lgtm-com
Copy link

lgtm-com bot commented Sep 14, 2021

This pull request introduces 2 alerts when merging db666fd into 2b54136 - view on LGTM.com

new alerts:

  • 1 for Dereferenced variable may be null
  • 1 for Missing Dispose call on local IDisposable

Copy link
Contributor

@mregen mregen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I debugged through the new code and I think its allworking as expected with minimal risk for regression, I may update the tests slightly, as it seems to be of not much value when we cannot inject publishrequests with non consecutive sequence number pattern. Great work @AvenDonn !

@mregen
Copy link
Contributor

mregen commented Sep 15, 2021

Actually the new test is quite handy to validate high load and fast data callbacks...

@mregen
Copy link
Contributor

mregen commented Sep 16, 2021

:shipit:

@lgtm-com
Copy link

lgtm-com bot commented Sep 16, 2021

This pull request introduces 1 alert when merging 6cd2de8 into 2b54136 - view on LGTM.com

new alerts:

  • 1 for Missing Dispose call on local IDisposable

@mregen mregen merged commit 92ae1d3 into OPCFoundation:master Sep 16, 2021
@mregen
Copy link
Contributor

mregen commented Sep 17, 2021

Hi @AvenDonn , there is still some flakyness with the test case, as if on macOS we seem to hit the trigger condition sometimes which means there is a non sequential callback.

e.g. here https://opcfoundation.visualstudio.com/opcua-netstandard/_build/results?buildId=2340&view=results

any idea?

@AvenDonn
Copy link
Contributor Author

Hi @AvenDonn , there is still some flakyness with the test case, as if on macOS we seem to hit the trigger condition sometimes which means there is a non sequential callback.

e.g. here https://opcfoundation.visualstudio.com/opcua-netstandard/_build/results?buildId=2340&view=results

any idea?

Hey @mregen, one of your changes "broke" a metric I'm relying on, the number of outstanding message workers. It did that because it disabled publishing before measuring it. That's not related to why it's failing though.

I see you've also removed the Thread.Sleep I've added in the callback code. This was meant to slow down the callback enough such that several callback invocations are running simultaneously. That was what was supposed to cause it to fail.

I see each subscription tracks its items independently, so this isn't some odd collision of client handles somehow. But I wouldn't put it past some kind of possible aliasing issue coming from the server.
Why did you choose to create multiple subscriptions? To increase the chances of the issue occurring? Did you note the issue occurs when running without sequential publishing in this mode? Because I haven't been able to reproduce it given the current test.

Could you please test more with the original test I submitted?

@mregen
Copy link
Contributor

mregen commented Sep 20, 2021

Thanks @AvenDonn , I will check your original tests again. They were definitely running for too long and it was hard to tell if they 'catched' an issue, so I basically repurposed them to become sort of a load tests. Then the sleeps are counterproductive.
Having more subscriptions the idea is too have more publish requests queued on the server side, which may eventually lead to race condition.

@AvenDonn
Copy link
Contributor Author

Thanks @mregen. The server is not really the problem and adding more subscriptions won't add to the chance of reproducing since the ordering is on a per subscription basis anyway.

The trouble we've encountered came from the client, and was most likely stemming from concurrent execution of callbacks and possibly weird context switching and lock handling.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

'Fast' callbacks can be executed concurrently
3 participants