Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

'Fast' callbacks can be executed concurrently #957

Closed
mediawolf opened this issue Apr 27, 2020 · 22 comments · Fixed by #1493
Closed

'Fast' callbacks can be executed concurrently #957

mediawolf opened this issue Apr 27, 2020 · 22 comments · Fixed by #1493
Labels
enhancement API or feature enhancement needs investigation

Comments

@mediawolf
Copy link

(checked on commit 2219e02)

I use Subscription.FastDataChangeCallback and Subscription.FastEventCallback callbacks for monitoring items and events. Subcription.PublishingInterval = 1000. If callback handler cannot cope with the pace (e.g. stepping in a debugger or handler logic cannot be executed fast enough) then other invocations can happen concurrently leading to unpredictable processing order.

According to the implementation, Subscription.SaveMessageInCache orders NotificationMessage's using their sequence number under the cache lock. Then it schedules message processing (OnMessageReceived) by using Task.Run which means several scheduled invocations of OnMessageReceived can be executed concurrently.

I can do another re-ordering in the callback using available message sequence number but firstly I would like to understand if it was designed to work so.

@AlinMoldovean AlinMoldovean added the question The issue contains a question. After no activity the issue might be closed. label Apr 27, 2020
@AlinMoldovean
Copy link
Contributor

Hello @mediawolf ,

The intent is to not block the thread used for processing the service response(s) while the notification is processed by the application. (e.g. it updates the UI)

@mediawolf
Copy link
Author

Thanks @AlinMoldovean.

So basically a receiver has to re-order messages again if chronological order is important.
It would be nice to have this moment reflected in the documentation of those properties.

BTW Having the documentation XMLs included into OPCFoundation.NetStandard.Opc.Ua NuGet package would be great.

@mediawolf mediawolf reopened this Apr 29, 2020
@mediawolf
Copy link
Author

Could we have an option to control invocation behavior (synchronous vs asynchronous) for these callbacks? Asynchronous behavior will be opt-in whilst synchronous one gives more control to receiver. For instance, concurrent queue + asynchronous processing on receiver side will preserve chronological order of incoming notifications and won't block processing of the service responses.

@mregen mregen added enhancement API or feature enhancement needs investigation and removed question The issue contains a question. After no activity the issue might be closed. labels Jun 3, 2020
@BrunoJuchli
Copy link

BrunoJuchli commented Nov 23, 2020

I would like to second this request.
We recently experienced miss-ordered notification processing which wreaked havoc.

While I believe I (mostly) understand the reasoning of why it was designed as it currently is, it makes it very likely that you'll end up with sporadic out-of-order updates: you will, unless you re-implement the re-ordering. To back this up: our processing in the callback is minimal - the only thing we do is dispatch it on the right thread. But still, we witness out-order-order processing, so I'm pretty confident that experiencing out-of-order processing is just a question of time.

It doesn't seem to me that this was the original intention, after all, why would Subscription.SaveMessageInCache perform reordering if the consumer can't rely on it?

I conclude that either the callback should be processed synchronously (probably with a warning sign telling that you shouldn't block the thread for too long, best to dispatch...) or the reordering logic should be removed.

Since ordering is not exactly trivial (also consider timeouts, shutdown, connection loss...) I'd very much prefer reordering to stay in the library.

@mregen
I'd like to provide a pull-request, but I'd need your direction on backwards-compatibility:

  • would a breaking change be acceptable? (= just remove the Task.Run, adapt sample applications to incorporate the Task.Run in the callback (*))
  • if not, would it be acceptable to have two callbacks, one async and one sync?
  • can/should the async (current one) be marked as obsolete, so that it can be phased out over time?
  • What kind of warning/documentation do you deem is necessary/appropriate to inform the user about the consequences of blocking on the callback?

(*) The consumer can simply wrap the delegate:

subscription.FastDataChangeCallback = (s, n, t) => Task.Run(() => HandleSubscriptionFastChange(s,n,t));

Also note: I know it's good practice to not call external code (callbacks, event handlers) while under lock - because it might cause deadlocks which are hard to understand for the consumer. However, there's a tradeoff here: how can we not do so while still guaranteeing ordering?
Maybe one ought to add queued-dispatching to the library? Is it worth the effort/complexity?

@mregen
Copy link
Contributor

mregen commented Dec 8, 2020

Hi @BrunoJuchli, sorry for not getting back earlier, busy days! We only briefly discussed this issue last week in our dev sync, and I'm not deeply familiar with that piece of code. From what I heard such an effort had already been tried, but it is very difficult because over the network it cannot be guaranteed that the updates are received in order. The SaveMessageInCache main intention is to handle republish requests properly, the fastcallback is just a quick notifier to provide early access for a consumer of monitored items, the fast callback is out of order by design.

Backward compatibility is important here, so one option I would suggest is to build a layer on top of the callback which stores and forwards monitored items in order.

Or another idea, would it be possible to put the ordered items in a 'concurrent queue' which is used to notify, even by Task.Run. Then a receiver always gets to see whole list of unprocessed items in order? Do you think that's an option?

@BrunoJuchli
Copy link

BrunoJuchli commented Dec 9, 2020

@mregen
Thanks for getting back to me.
As I understand your comment, the ordering of updates in Subscription.SaveMessageInCache is not reliable.
If I have understood this correctly, I would suggest removing reordering entirely from Subscription.SaveMessageInCache.
Then, in a second step, to implement message ordering on top of it.
I guess this won't be a simple job. It'll have to identify missed updates and subsequently delay sending of new updates until the missed update has arrived (or timed out, after a - configurable? timeout). Then there needs to be recovery for the case where an update was missed. This might also need to involve new consumer-facing code, as the consumer might need to be informed about this happening so he can take appropriate action.

I will have to familiarize myself a bit closer with the low-level "details" of OpcUa to propose a well thought through plan/solution.
I haven't understood all the important stuff yet, including the details about asynchrony on the network level, which I'd like to understand a bit better.
From what you write ("over the network it cannot be guaranteed that the updates are received in order") I guess that the OpcUa spec allows the server to send multiple notifications for the same Subscription concurrently using different connections (otherwise this should be simple, lest there be a flaw in the OpcUa Spec -- I might be wrong here, though, because I don't yet fully understand the impacts of the concept of using subscription across sessions).

I'm now reading the online reference about subscriptions and see where it'll take me.
If you have any pointers to the relevant sections in the reference where the concurrency relevant for subscriptions is described I'd be very happy for you to mention them. Especially about the "problem" of the server being allowed to send multiple notifications of the same subscription concurrently (which may be allowed by omission of a contrary specification...).

@BrunoJuchli
Copy link

BrunoJuchli commented Dec 9, 2020

I'm trying to distill the relevant parts of the spec here:

5.13.1.1 Subscription Model - Description

  1. NotificationMessages are uniquely identified by sequence numbers that enable Clients to detect missed Messages. (...)
  1. (...) The sequence number is an unsigned 32-bit integer that is incremented by one for each NotificationMessage sent. The value 0 is never used for the sequence number. The first NotificationMessage sent on a Subscription has a sequence number of 1. If the sequence number rolls over, it rolls over to 1.
  1. (...) Clients are required to acknowledge NotificationMessages as they are received. In the case of a retransmission queue overflow, the oldest sent NotificationMessage gets deleted. (...)
  1. (...) If a Subscription is transferred to another Session, the queued NotificationMessages for this Subscription are moved from the old to the new Session. (...)

Sequence numbers are also used by Clients to acknowledge the receipt of NotificationMessages. Publish requests allow the Client to acknowledge all Notifications up to a specific sequence number and to acknowledge the sequence number of the last NotificationMessage received. One or more gaps may exist in between. Acknowledgements allow the Server to delete NotificationMessages from its retransmission queue.

Clients may ask for retransmission of selected NotificationMessages using the Republish Service. This Service returns the requested Message.

7.11 Subscription Service Set

Subscriptions include features that support detection and recovery of lost Messages. Each NotificationMessage contains a sequence number that allows Clients to detect missed Messages. When there are no Notifications to send within the keep-alive time interval, the Server sends a keep-alive Message that contains the sequence number of the next NotificationMessage sent. If a Client fails to receive a Message after the keep-alive interval has expired, or if it determines that it has missed a Message, it can request the Server to resend one or more Messages.

@BrunoJuchli
Copy link

BrunoJuchli commented Jan 20, 2021

@mregen
Since the FastDataChangeCallback was never actually intended for this usage: is there code which is intended for this usage? i.e. where one's being notified about subscription updates reliably in order, without gaps? Up until now I assumed there wasn't because I couldn't find any, but it's better to ask then to assumed too much :-)

Regarding implementing the re-ordering on top of FastDataChangeCallback:

The sequence number is not known to the handler of the Session.FastDataChangeCallback- without that reordering can't be done "on top".
(@mediawolf your opening post indicates differently - please either correct me if I'm wrong or update your opening post, thanks).

The existing FastDataChangeCallback event doesn't seem suitable to expose the sequence number in a backwards compatible manner:

  • adding another parameter (sequence number) to the interface changes the event definition in an incompatible manner
  • adding a property to the DataChangeNotification seems to break other usages (it's a DataContract) - plus IMHO it's the wrong unit logically (unclean interface)
  • the Subscription itself is modified concurrently (as the method which is invoking FastDataChangeCallback is invoked concurrently) - plus, again, logically this seems the wrong place and would result in an unclean interface. How would one name the relevant property on the Subscription to make clear that it relates to the current invocation of the FastDataChangeCallback?).

What would be possible is to add a second FastDataChangeCallback event (with a different name, of course), which would also communicate the sequence number.
Though this seems to me like a half-baked solution resulting in more complexity than necessary:
Reordering relates to republishing because ordered notifications should also prevent gaps in notifications, this means if a notification is missing in between, one has to wait until it arrives.
This (re-publishing) is done by the Session concurrently. The user of the session is not informed about this. So the re-ordering on top would have to wait an "arbitrary" time for the (assumed) republishing to succeed. If republishing fails we don't know about it but just have to wait... until at some time we'll have to time out. Reason: unknown. This makes error diagnosis hard. Especially when one has multiple sessions, because this library logs all messages into one single file, and as far as I know, I can't easily correlate the sessions to messages.

Also, since subscriptions can be moved from one session to another, that's a problem which I expect would have to be re-solved, as well.

IMHO a more sensible alternative, instead of solving the problem "on top" of FastDataChangeCallback, would be to resolve it inside the Subscription itself and add a new change notification event which guarantees ordering.

@mediawolf
Copy link
Author

@BrunoJuchli
I didn't check if the fast callbacks get enough information to perform resequencing reliably. At that time I've just seen that 'fast' way has that flaw. Moreover, if application is extensively uses default TaskScheduler, then notifications may sit in its queue for enough time to come in non-chronological order.
In one of the post, I've mentioned a cheap solution to add something like Subscription.SynchronousFastCallback property, which is false by default to preserve existing behavior. But if application sets it to 'true' then fast callback will be called synchronously, under internal lock. Basically:

if (SynchronousFastCallback)
  OnMessageReceived(...)
else
  Task.Run(() => OnMessageReceived(...));

Anyway, I believe it's something that should be done, especially when it can be, on the library level and not pushed to application level where developers have less experience with intricacies of OPC UA.

@yfital
Copy link

yfital commented Feb 25, 2021

I've reached here after experiencing a few bad scenarios in production running a system with a large amount of nodes & fast sampling time (~1000 nodes, sampling rates between 50ms to 1s)

We've experienced out of order notifications & unknown slowdowns.
After investigating the code, I've saw the issues and found this post, here are my 2 cents:

  1. I expected the SDK to properly support ordering, e.g. even if a client runs on a separate task, it should be ordered. simple solutions for that can be an ActionBlock for instance (per monitor item or per subscription ID).
    Ordering is only needed per monitored item, there is no reason to halt the entire system to run sync.

  2. I expected the SDK to provide a max parallelism support, we've reached a point where OPC UA is spawning so many tasks that the entire system is practically dead.

Both issues are related to the fact that the subscription code runs Task.Run without discriminations.

Are there any thoughts of solving this? (or does the SDK provide another way for subscription?)

@BrunoJuchli
Copy link

BrunoJuchli commented Mar 2, 2021

Ordering is only needed per monitored item, there is no reason to halt the entire system to run sync.

In our system we have, due to limitations of one of the involved components (a PLC), multiple variables for specifying a single state. So for us "ordering only per monitored item" is a no-go.

On a more general note: concurrency is difficult. Generally one should only use concurrency when the benefit is worth the increased difficulty.
As such the library should provide only as much concurrency as required by OpcUa; you as consumer can then still perform dispatching per monitored item if you want to do so.
Doing the reverse thing, i.e. synchronizing concurrent changes of multiple variables is a much, much more complicated task.
One more argument: depending on the number of monitored items and your hardware specifics, you may reach a point where concurrently handling all the changes is much slower than doing so with limited concurrency.

Also: if the library would not perform concurrent dispatching of subscription changes your problem wouldn't exist in the first place...

@yfital
Copy link

yfital commented Mar 2, 2021

Having a library which runs in seq. over IO is problematic in my eyes, esp. on high loads.

If you need synchronization between items, nothing will help you besides running local logic (on a sync hook)/ disable concurrency at all.

A simple solution here would be to just push all items into an ActionBlock and let us (the clients) set it's concurrency level, that will solve you issue as well

But I do have to say, concurrency (LIMITED!) is blessed, but ordered notifications items is a requirement in most such infrastructures unless you refer to a pub/sub library (which is not the case here).

In anyway,
Having an infra which spawning infinite tasks based on each notification is the main issue here, I think that should be the first fix.
My suggestion is AB + parallelize configuration, the code change is minimal.

@BrunoJuchli
Copy link

BrunoJuchli commented Mar 3, 2021

Having a library which runs in seq. over IO is problematic in my eyes, esp. on high loads.

I fully agree that an application which handles IO sequentially is a performance nightmare given high loads.

There's nothing stopping you from introducing concurrency yourself. If the current implementation was changed to invoke the handler synchronously and you would instead add the Task.Run in the handler yourself, the resulting performance and behavior would be exactly the same.

So I really don't understand why it's problematic when a library performs IO synchronously in such a scenario.
Of course, this depends on scope of the library. If a library is intended to support a high level operation, like for example a VoIP library would supporting taking and making concurrent calls, it would be catastrophic for the library to perform all of the (audio/video) IO synchronously.
In that scenario, however, it's insensible for the consumer code to access said audio/video IO. That should be completely abstracted away from the consumer code to the point where he may chose audio devices (and maybe volume) but nothing more.
If we'd build a low level SIP/RTP library which only abstracts the protocol but would not actually process audio data then it'd be an entirely different story. But still, no need for the library to dispatch received IO, that can entirely be the responsibility of the consuming code. The consuming code will have to handle cases like buffer underrun and overflow which is tightly coupled with performance/concurrency characteristics. More control is better for the consuming code.

@yfital
Copy link

yfital commented Mar 7, 2021

I think we're looking at it as different types of clients.

We're using it as a fully fledged protocol to control an industrial printing machine, including:

  • ~1000 i/o (analog & digital) monitoring
  • A full print protocol, running at ~150ms freq (multiple messages, duplex)
  • Various monitoring on extra "virtual i/o" (e.g. HW logic)

So, my needs are a bit different indeed.

In any case, as we both said, the simplest solution here is either sync or a simple action block (they both take ~3 lines of code change) - If we will require extra parallelism on top of it, we'll handle it in our wrappers.

So back to step 1, is anyone taking it or do you prefer we will open a PR?
And if so, ActionBlock or just sync? (I think Sync is too big of a breaking change if you consider this widely used)

@r-bkr
Copy link

r-bkr commented Apr 14, 2021

We experience the same thing - out of order notifications & unknown slowdowns.
Does anyone plan to take care of this?

@AvenDonn
Copy link
Contributor

Are there any suggestions on how to make something like what @mediawolf mentioned in their reply configurable without possibly breaking the Subscription DataContract?

The easiest thing to do is to add some property to Subscription that controls the behavior, but that would require adding another DataMember. That's a non-breaking change if strict schema validity is not enforced. Will this be acceptable?

mregen pushed a commit that referenced this issue Sep 16, 2021
#1493)

Pertaining to the discussion started in Issue #957, this Pull Request seeks to add capabilities for Subscription to sequentially invoke its MonitoredItem and FastDataChange callbacks to maintain order properly. The key goal is to avoid out-of-order monitored items. To this effect, the Sequential Publishing capability is added to Subscription
Resolves #957 

### SequentialPublishing
A toggle which forces Subscription to raise callbacks only in a sequential order by the SequenceNumber of the incoming message. It limits the number of tasks that can process publish responses to 1, and enforces a strict +1 Sequence Number requirement before releasing a message.

#### Backwards compatibility
The new property has been added as DataMember at the end to maintain backwards compatibility with the DataContract, where such may be used. The relevant copy constructor has also been updated.

The feature is disabled by default, so the existing behavior is the default. Users must explicitly set the property on the Subscription object to enable these new features. 

#### Leveraging existing "time machine" in SaveMessageInCache
There is already a mechanism for re-ordering messages that entered in the wrong order. This change makes use of that existing "time machine" which sorts messages into chronological order (by the sequence number) to only pull off messages with a proper +1 sequence number each time.

#### KeepAlive messages advancing sequence number
KeepAlive messages do not contain notifications, and do not enter the proper branch in the code to be pulled out. They will not interrupt sequential flow. Since the next message in the sequence with data will "re-use" that sequence number, it can be expected to maintain sequence. 

#### Delayed messages
When sequential publishing is enabled, if a message is genuinely missing from the sequence, it will "hold up" the messages until it either arrives or is pulled out of the incoming messages list by being too old. In either case, it is considered "processed" for the sequence purposes at that point and the rest of the messages may proceed. 

The automatic republish request mechanism is also leveraged here for that purpose as well.

#### SequenceNumber roll-over after max value
As specified in the OPC UA spec, at 1ms publishing, this would take almost 50 days of uptime. Naturally at any "reasonable" publishing interval this time is even longer. The matter itself is also not addressed throughout the whole existing code that handled it before this change. (Though some consideration can be seen in some places, such as placing messages at the end of the incoming message linked list if no better place was found) 

#### Locking on m_cache to get the semaphore
I've considered making a separate locking object for the semaphore management, but decided it is not needed. The message worker would need to obtain m_cache lock later anyway, and momentarily obtaining it is not harmful for the overall flow.
Only in cases of serious contention between new PublishResponses coming in and workers attempting to work would this cause any problems.

#### Callbacks that "take a long time"
This change will naturally suffer from callbacks that take a long time to invoke. In our use case, each callback message is passed into an ActionBlock for handling, so our callbacks are fast. But Other clients may invoke much more code on these callbacks.
Sequential publishing means this capacity is possibly bottlenecked. Sequential publishing should be used for its intended purpose and with callbacks that do not hold up their calling thread for a long time. To ensure proper sequential callbacks, they cannot just be started in order, they have to be processed fully in order. 

#### Changing this property while the Subscription is active
There is limited support to on-the-fly changes to this property, and this is not the intended use case. It is meant to be set once and ideally never changed while messages are being processed.

If users need certain items properly sequenced and some items they can accept out-of-order or ignore outdated messages, they can do this by defining two separate Subscription objects, same as they would if they needed different Publishing Intervals. 

#### Why semaphore?
At first I had considered using ActionBlock from TPL DataFlow and limiting its concurrency to the number of desired message workers, but I had opted not to add a dependency on DataFlow only for this purpose. (Referencing all of DataFlow just for ActionBlock's automatic concurrency limiting is wasteful.

I had also considered using a Limited Concurrency Task Scheduler implementation and queueing the message workers on that instead of the normal ThreadPool via Task.Run, but that is similarly more complex than it needs to be.

#### Why async Task? What was wrong with Void?
While being on a new Task from the ThreadPool anyway, the worker would still occupy a task from the ThreadPool if it was using the synchronous Wait() method, which is a blocking call. By using WaitAsync, the ThreadPool task goes "back" to the ThreadPool until the semaphore is released. 

### Disclaimer - I am not an OPC UA expert
Please do not assume I have "done all my homework" with regard to these changes. I have attempted to learn as much as I can in the time I was working on this change, but I fully expect to have missed a few spots and specifics of the OPC UA protocol.

I have done some testing in our application to verify the out-of-order problem is solved, and that no significant delay is introduced by the additional work done to facilitate this.
@BrunoJuchli
Copy link

BrunoJuchli commented Sep 30, 2021

TL;DR

Note that after the fix in #1493 the sporadic out-of-order is still the default behavior.

One can opt-in to the fix on a per subscription basis by setting Subscription.SequentialPublishing = true.


@AvenDonn Thanks for the fix!
Might be a good idea to document the flag in the opening post of #1493 so that people don't have to look for this too long.

@mregen
IMO this is (was) one very dangerous bug. Might be a good idea to document this prominently as a recommended configuration.
Thinking about it, it would make a lot of sense to change the default setting and put a warning on the property "beware there'll be dragons if you opt out of sequential publishing". After all, who wants to maintain a list of config-recommendations/warnings? Who preferes reading them instead of being offered sensible defaults?
I get that there's a tradeoff with backwards compatibility. So maybe add a recommendation in the documentation and leave it like that for a while, then sometime later change the default in a breaking change/major version increment?

@AvenDonn
Copy link
Contributor

@BrunoJuchli, the flag is mentioned prominently in the post, but I've added clarification.

@mregen, any news from the unit test?

@BrunoJuchli
Copy link

@AvenDonn
Thanks, for me it wasn't obvious that the SequentialPublishing title refers to a property (on the Subscription), might have just as well been a typo ;-)

@mregen
Copy link
Contributor

mregen commented Sep 30, 2021

Hi @BrunoJuchli and @AvenDonn, thanks for driving this .. tests are stable now and once we know there are no regression setting it as default can be considered. Please provide feedback if you hit any issues with the new release!

@AvenDonn
Copy link
Contributor

Hey @mregen, good to know. Did you change anything with the tests?

@mregen
Copy link
Contributor

mregen commented Sep 30, 2021

The test is doing positive/negative test with a delayed fast data change callback. Without sequential flag the sequence gets out of order after a few seconds, otherwise not. But its limited to 10 seconds to reduce the overall test runtimes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement API or feature enhancement needs investigation
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants