New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix call sync method in an async callback when enabling geo replicator. #12590
Fix call sync method in an async callback when enabling geo replicator. #12590
Conversation
After enabled geo-replication, we are not able to produce messages to a partitioned non-persistent topic. Reproduce step: 1. Start a geo-replication instance with 2 clusters, I have 3 brokers for each cluster 2. Create a non-persistent partitioned topic such as 10 partitions 3. Use pulsar-perf to publish messages to the partitioned topic 4. Verify if the message produce throughput is 0, such as `bin/pulsar-perf produce -s 2048 -r 100 -bm 1 non-persistent://public/default/test` The root cause is there are 2 places calling a sync method in the async method callback. So the fix is: 1. Async the `validatePartitionedTopicAsync` method 2. Avoid call get cluster sync method when getting the replication client Integration tests added.
@codelipenghui:Thanks for providing doc info! |
@codelipenghui:Thanks for your contribution. For this PR, do we need to update docs? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
if GH actions has not enough resources to boot multiple clusters we can split this patch into two parts and commit only the fix to branch-2.9
…en creating partitioned-topic, this will cause dead thread problem.
@codelipenghui do you need some help here ? |
@eolivelli Thanks, @congbobo184 is helping on the test. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@merlimat Could you please help review the PR? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@codelipenghui
can you please take a look to my latest comments ?
@@ -122,7 +118,7 @@ public void setPolicies(NamespaceName ns, Function<Policies, Policies> function) | |||
} | |||
|
|||
public static boolean pathIsFromNamespace(String path) { | |||
return path.startsWith(BASE_POLICIES_PATH); | |||
return path.startsWith(BASE_POLICIES_PATH) && path.substring(BASE_POLICIES_PATH.length() + 1).contains("/"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explain better this line ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may receive a notification from the metastore /admin/policies/public
, not a complete namespace path
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see.
works for me, probably adding a comment will help future readers of this code
topicName.getPartitionedTopicName(), numPartitions, true); | ||
}) | ||
.exceptionally(throwable -> { | ||
log.error("Failed to create partition topic in cluster {}.", cluster, throwable); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we fail the operation ?
log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex.getCause()); | ||
nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> { | ||
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); | ||
topicFuture.completeExceptionally(ex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we run this after the execution of "topics.remove" ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No effect before and after
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good, just checking
* up/master: (55 commits) [broker] remove useless method "PersistentTopic#getPersistentTopic" (apache#12655) [Python Schema] Python schema support custom Avro configurations for Enum type (apache#12642) Allow to configure different implementations for Pulsar functions state store (apache#12646) Remove replicator global test from the quarantine group (apache#12648) [Java Client] Remove invalid call to Thread.currentThread().interrupt(); (apache#12652) k8s runtime: force deletion to avoid hung function worker during connector restart (apache#12504) [Broker] Optimize exception information for schemas (apache#12647) Close Zk database on unit tests (apache#12649) Fix call sync method in an async callback when enabling geo replicator. (apache#12590) [pulsar-broker] Add git branch information for PulsarVersion (apache#12541) PulsarAdmin: Fix last exit code storage (apache#12581) Add @test annotation to test methods (apache#12640) Upgrade debezium to 1.7.1 (apache#12644) [ML] Avoid passing OpAddEntry across a thread boundary in asyncAddEntry (apache#12606) [Functions] Prevent NPE while stopping a non started Pulsar LogAppender (apache#12643) Update io-debezium-source.md (apache#12638) Add missing cmds on pulsar-admin document page (apache#12634) Clean up the metadata of the non-persistent partitioned topics. (apache#12550) modify check waitingForPingResponse with volatile (apache#12615) [pulsar-admin] Check backlog quota policy for namespace (apache#12512) ...
* up/master: (55 commits) [broker] remove useless method "PersistentTopic#getPersistentTopic" (apache#12655) [Python Schema] Python schema support custom Avro configurations for Enum type (apache#12642) Allow to configure different implementations for Pulsar functions state store (apache#12646) Remove replicator global test from the quarantine group (apache#12648) [Java Client] Remove invalid call to Thread.currentThread().interrupt(); (apache#12652) k8s runtime: force deletion to avoid hung function worker during connector restart (apache#12504) [Broker] Optimize exception information for schemas (apache#12647) Close Zk database on unit tests (apache#12649) Fix call sync method in an async callback when enabling geo replicator. (apache#12590) [pulsar-broker] Add git branch information for PulsarVersion (apache#12541) PulsarAdmin: Fix last exit code storage (apache#12581) Add @test annotation to test methods (apache#12640) Upgrade debezium to 1.7.1 (apache#12644) [ML] Avoid passing OpAddEntry across a thread boundary in asyncAddEntry (apache#12606) [Functions] Prevent NPE while stopping a non started Pulsar LogAppender (apache#12643) Update io-debezium-source.md (apache#12638) Add missing cmds on pulsar-admin document page (apache#12634) Clean up the metadata of the non-persistent partitioned topics. (apache#12550) modify check waitingForPingResponse with volatile (apache#12615) [pulsar-admin] Check backlog quota policy for namespace (apache#12512) ...
…r. (#12590) After enabled geo-replication, we are not able to produce messages to a partitoned non-persistent topic. Reproduce step: 1. Start a geo-replication instance with 2 clusters, I have 3 brokers for each cluster 2. Create a non-persistent partitioned topic such as 10 partitions 3. Use pulsar-perf to publish messages to the partitioned topic 4. Verify if the message produce throughput is 0, such as `bin/pulsar-perf produce -s 2048 -r 100 -bm 1 non-persistent://public/default/test` The root cause is there are 2 places calling a sync method in the async method callback. So the fix is: 1. Async the `validatePartitionedTopicAsync` method 2. Avoid call get cluster sync method when getting the replication client Integration tests added. (cherry picked from commit a2b7cae)
…r. (apache#12590) After enabled geo-replication, we are not able to produce messages to a partitioned non-persistent topic. Reproduce step: 1. Start a geo-replication instance with 2 clusters, I have 3 brokers for each cluster 2. Create a non-persistent partitioned topic such as 10 partitions 3. Use pulsar-perf to publish messages to the partitioned topic 4. Verify if the message produce throughput is 0, such as `bin/pulsar-perf produce -s 2048 -r 100 -bm 1 non-persistent://public/default/test` The root cause is there are 2 places calling a sync method in the async method callback. So the fix is: 1. Async the `validatePartitionedTopicAsync` method 2. Avoid call get cluster sync method when getting the replication client Integration tests added.
After enabled geo-replication, we are not able to produce messages to a partitioned non-persistent topic.
Reproduce step:
bin/pulsar-perf produce -s 2048 -r 100 -bm 1 non-persistent://public/default/test
The root cause is there are 2 places calling a sync method in the async method callback.
So the fix is:
validatePartitionedTopicAsync
methodIntegration tests added.
Documentation
Check the box below and label this PR (if you have committer privilege).
Need to update docs?
doc-required
(If you need help on updating docs, create a doc issue)
no-need-doc
Bug fixes
doc
(If this PR contains doc changes)