New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix] [broker] Producer is blocked on creation because backlog exceeded on topic, when dedup is enabled and no producer is there #20951
Conversation
@heesung-sn Please add the following content to your PR description and select a checkbox:
|
e66a3a4
to
d4ca29c
Compare
cacf511
to
0261b82
Compare
@@ -195,6 +195,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal | |||
private final TopicName shadowSourceTopic; | |||
|
|||
static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup"; | |||
|
|||
public static boolean isDedupCursorName(String name) { | |||
return name.equals(DEDUPLICATION_CURSOR_NAME); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use DEDUPLICATION_CURSOR_NAME .equals(curosrName)
is better, because the constant variable is never be null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -406,6 +410,11 @@ private void takeSnapshot(Position position) { | |||
if (log.isDebugEnabled()) { | |||
log.debug("[{}] Taking snapshot of sequence ids map", topic.getName()); | |||
} | |||
|
|||
if (!snapshotTaking.compareAndSet(false, true)) { | |||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since one updating cursor metadata task has been skipped(maybe the skipped one has a larger position
), shall we print a log?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this snapshot can be called from other threads too. We better not overflow logs.
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
Outdated
Show resolved
Hide resolved
9180873
to
f2c1805
Compare
@heesung-sn Please rebase to the master branch to resolve the CI issue. |
f2c1805
to
1e76e6e
Compare
Done |
byte[] content = new byte[1024]; | ||
Producer<byte[]> producer = createProducer(client, topic1); | ||
|
||
admin.topicPolicies().setDeduplicationStatus(topic1, dedupTestSet); | ||
Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better use admin to instead of sleep
assertFalse(gotException, "unable to publish due to " + sendException); | ||
|
||
gotException = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't need this line
@heesung-sn Is there anything blocking your committer process? I realized you still don't have permission for the pulsar repo. |
…d on topic, when dedup is enabled and no producer is there
1e76e6e
to
f87b86e
Compare
I got the permission now. I merged this PR. |
…ed on topic, when dedup is enabled and no producer is there (apache#20951) (cherry picked from commit 30073db)
…ed on topic, when dedup is enabled and no producer is there (apache#20951) (cherry picked from commit 30073db)
…ed on topic, when dedup is enabled and no producer is there (apache#20951) (cherry picked from commit 30073db) (cherry picked from commit f68589e)
…ed on topic, when dedup is enabled and no producer is there (apache#20951) (cherry picked from commit 30073db) (cherry picked from commit f68589e)
…ed on topic, when dedup is enabled and no producer is there (apache#20951) (cherry picked from commit 30073db) (cherry picked from commit f68589e)
Motivation
As per backlog quota logic, Producer is blocked on creation when
Especially, producer creation is indefinitely blocked when there brokerDeduplicationEnabled=false(default=false), which disables periodic dedup map snapshots every 2mins.
When checking backlog size, the logic computes the backlog size against the slowest consumer on the topic. Unfortunately, the dedup system consumer(for the dedup map recovery) is usually behind the latest position -- the broker takes deup recovery map snapshot every 1000 entry(by default,
brokerDeduplicationEntriesInterval
) or every 2 mins(by default) whenbrokerDeduplicationEnabled
is enabled(default is false), so before the next snapshot cycle, the dedup system consumer's MarkDeletePos is usually behind, and this can hold backlog more than the quota. As a result, the dedup-enabled topic could show more backlog size and block produce creations, especially when there is no producer and the time-based dedup-map snapshot is disabled, although all of the actual consumers' MarkDeletePos are up-to-date.Modifications
Add
advanceSlowestMessageDeduplicationCursor
in the backlogQuotaCheck failure handler to advance the Slowest MessageDeduplication Cursor when the backlog quota exceeds the threshold.Verifying this change
This change added tests and can be verified as follows:
Added the unit test to cover this logic change.
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: heesung-sn#50