[Client] Fixed freezing of Subscription with sequential publishing#3565
Conversation
- Fixed freezing of Subscription (messages were not moving from Subscription.m_incomingMessages to Subscription.m_messageCache). Was happening when sequentialPublishing is active and first received publish response had SequenceNumber greater when 1. - Constant Subscription.kRepublishMessageTimeout was made public (Subscription.REPUBLISH_MESSAGE_TIMEOUT) so it can be used in tests. - Added unit tests to cover fixed problem. - Added unit test to cover invalid handling of reordered message queue for sequential publishing (UnorderedMessagesWouldBeLostForSequentialPublishingAsync). For now, test was made [Explicit].
|
/azp run |
|
Azure Pipelines successfully started running 1 pipeline(s). |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #3565 +/- ##
===========================================
+ Coverage 51.86% 65.98% +14.11%
===========================================
Files 370 459 +89
Lines 78618 97494 +18876
Branches 13650 16306 +2656
===========================================
+ Hits 40779 64333 +23554
+ Misses 33705 28067 -5638
- Partials 4134 5094 +960 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| } | ||
|
|
||
| [Test] | ||
| [Explicit("Test shows possibility for broken order of notifications during sequential publishing")] |
There was a problem hiding this comment.
Any idea what the root cause of this is?
There was a problem hiding this comment.
Before processing a message in a subscription, the following check must be performed:
(!State.SequentialPublishing || ValidSequentialPublishMessage(ii.Value))
and ValidSequentialPublishMessage:
{
// If sequential publishing is enabled, only release messages in perfect sequence.
return message.SequenceNumber <= m_lastSequenceNumberProcessed + 1 ||
// reconnect / transfer subscription case
m_resyncLastSequenceNumberProcessed ||
// release the first message after wrapping around.
(message.SequenceNumber == 1 && m_lastSequenceNumberProcessed == uint.MaxValue);
}
Here, a message with an outdated SequenceNumber can be accepted.
I tried to simply change it to: (message.SequenceNumber = m_lastSequenceNumberProcessed + 1),
It caused side effects and I abandoned the idea of fixing it right now.
| [Parallelizable] | ||
| public class SubscriptionUnitTests | ||
| { | ||
| #region Utilities |
There was a problem hiding this comment.
Please remove regions.
|
/azp run |
|
Azure Pipelines successfully started running 1 pipeline(s). |
…PCFoundation#3565) * [Client] Fixed freezing of Subscription with sequential publishing - Fixed freezing of Subscription (messages were not moving from Subscription.m_incomingMessages to Subscription.m_messageCache). Was happening when sequentialPublishing is active and first received publish response had SequenceNumber greater when 1. - Constant Subscription.kRepublishMessageTimeout was made public (Subscription.REPUBLISH_MESSAGE_TIMEOUT) so it can be used in tests. - Added unit tests to cover fixed problem. - Added unit test to cover invalid handling of reordered message queue for sequential publishing (UnorderedMessagesWouldBeLostForSequentialPublishingAsync). For now, test was made [Explicit]. * [PR] fix naming convention violation * [PR] removed region in tests (cherry picked from commit 6a09c1c)
Bug
The client subscription fails to move messages from
Subscription.m_incomingMessagestoSubscription.m_messageCache, which leads to a complete freeze of the subscription publishing state.Conditions Required to Reproduce
The issue occurs only when both of the following are met:
Subscription.SequentialPublishing = trueSubscription.SaveMessageInCachereceives the first message with aSequenceNumbergreater than1(availableSequenceNumbersis not relevant in this scenario)Root Cause
When sequential publishing is enabled, the subscription cannot process incoming messages if there is a gap in sequence numbers.
Subscription.SaveMessageInCachealready contains logic to handle gaps between messages. However, it does not handle the specific case where the first received message itself introduces the gap (i.e., when the sequence does not start from1).As a result:
Proposed changes
Subscription.SaveMessageInCacheto properly fill the gap betweenSubscription.m_lastSequenceNumberProcessedandIncomingMessage.SequenceNumberSubscriptionUnitTeststo validate scenarios involving unordered messages passed toSubscription.SaveMessageInCache.Notes
SequenceNumbercan be sent (see testUnorderedMessagesWouldBeLostForSequentialPublishingAsync).In this case, the subscription treats such a message as the latest received message.
This appears to be another bug. For now, the corresponding test is marked as [Explicit] for further investigation.
Types of changes
What types of changes does your code introduce?
Put an
xin the boxes that apply. You can also fill these out after creating the PR.Checklist
Put an
xin the boxes that apply. You can also fill these out after creating the PR. If you're unsure about any of them, don't hesitate to ask. We're here to help! This is simply a reminder of what we are going to look for before merging your code.