-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][client] Fix messages in the batch container timed out unexpectedly #21889
[fix][client] Fix messages in the batch container timed out unexpectedly #21889
Conversation
Fixes apache#21884 ### Motivation When `ProducerImpl#run` is called where `pendingMessages` is empty and `batchMessageContainer` is not empty, the whole batch's timestamp is treated as `lastBatchSendNanoTime + batch latency`. Given send timeout as 5 seconds, assuming a batch was flushed at `t1` and the next message was sent after 5 seconds: - t1: `ProducerImpl#batchFlushTask()`, `lastBatchSendNanoTime` is now `t1` - t1 + 0.0011 s: `ProducerImpl#run()`, no pending message and the batch container is empty - t1 + 5.0010 s: `sendAsync()`, the message is added to batch - t1 + 5.0011 s: `ProducerImpl#run()` is called before `batchFlushTask()` https://github.com/apache/pulsar/blob/176bdeacd309e8c1e49358987a1946abd30ba34a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2014-L2015 `createdAt` will be `t1 + 0.001 s`. However, the current time point is `t1 + 5.0011 s`, the interval is 5.001 s, timeout happened. ### Modification Record the timestamp when the 1st message is added to the batch container and use this timestamp instead of `lastBatchSendNanoTime` to compute the `createdAt`. Add `testSendTimerCheckForBatchContainer` to cover this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just left a trivial suggestion.
pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Do you know how to reproduce this issue? In our environment, we have the same issue, but I cannot reproduce this issue.
@nodece It's not hard to reproduce. The open sourced KoP can also reproduce this issue.
I reproduced again just now.
After about 40 seconds since unloading, the timeout error occurred. |
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## master #21889 +/- ##
============================================
- Coverage 73.58% 73.55% -0.03%
- Complexity 32325 32347 +22
============================================
Files 1859 1859
Lines 138263 138373 +110
Branches 15153 15160 +7
============================================
+ Hits 101736 101779 +43
- Misses 28644 28702 +58
- Partials 7883 7892 +9
Flags with carried forward coverage won't be shown. Click here to find out more.
|
…dly (apache#21889) (cherry picked from commit a90e1f1)
…dly (apache#21889) (cherry picked from commit a90e1f1)
Fixes #21884
Motivation
When
ProducerImpl#run
is called wherependingMessages
is empty andbatchMessageContainer
is not empty, the whole batch's timestamp is treated aslastBatchSendNanoTime + batch latency
.Given send timeout as 5 seconds, assuming a batch was flushed at
t1
and the next message was sent after 5 seconds:ProducerImpl#batchFlushTask()
,lastBatchSendNanoTime
is nowt1
ProducerImpl#run()
, no pending message and the batch container is emptysendAsync()
, the message is added to batchProducerImpl#run()
is called beforebatchFlushTask()
pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Lines 2014 to 2015 in 176bdea
createdAt
will bet1 + 0.001 s
. However, the current time point ist1 + 5.0011 s
, the interval is 5.001 s, timeout happened.Modification
Record the timestamp when the 1st message is added to the batch container and use this timestamp instead of
lastBatchSendNanoTime
to compute thecreatedAt
.Add
testSendTimerCheckForBatchContainer
to cover this case.Documentation
doc
doc-required
doc-not-needed
doc-complete