-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BUG] Catch topic policy not hit exception in handleSubscribe #10341
[BUG] Catch topic policy not hit exception in handleSubscribe #10341
Conversation
Is It possibile to add a test case? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change looks good to me, could you please help add a test to avoid regression
} | ||
} else if (exception.getCause() instanceof BrokerServiceException) { | ||
log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}", | ||
remoteAddress, topicName, subscriptionName, | ||
consumerId, exception.getCause().getMessage()); | ||
consumerId, exception.getCause().getMessage(), exception); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was done on purpose to avoid stack traces for "known" or expected exceptions.
@hangc0276 Can you address Matteo's comment? |
@sijie Ok, i will address it. |
### Modification 1. upgrade pulsar dependency to include the following bug fix - apache/pulsar#10706 : fix load __consumer_offset topic timeout on bundle onLoad stage - apache/pulsar#10341 : fix create reader failed on bundle onLoad stage
…#10341) ### Motivation When running KOP with topic policy, it get the following exception. ``` 17:09:46.777 [mock-pulsar-bk:org.apache.pulsar.broker.service.ServerCnx@1010] WARN org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:55980][persistent://pulsar/test/localhost:15000/__change_events-partition-0][multiTopicsReader-c3d8054591] Failed to create consumer: consumerId=0, Topic policies cache have not init. java.util.concurrent.CompletionException: org.apache.pulsar.broker.service.BrokerServiceException$TopicPoliciesCacheNotInitException: Topic policies cache have not init. at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:953) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_172] at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.lambda$addEntry$32(BookkeeperSchemaStorage.java:566) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT] at org.apache.bookkeeper.client.PulsarMockLedgerHandle.lambda$asyncAddEntry$5(PulsarMockLedgerHandle.java:194) ~[testmocks-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) [?:1.8.0_172] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) [?:1.8.0_172] at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) [?:1.8.0_172] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_172] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_172] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172] Caused by: org.apache.pulsar.broker.service.BrokerServiceException$TopicPoliciesCacheNotInitException: Topic policies cache have not init. at org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.getTopicPolicies(SystemTopicBasedTopicPoliciesService.java:148) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT] at org.apache.pulsar.broker.service.persistent.PersistentTopic.checkSubscriptionTypesEnable(PersistentTopic.java:2872) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT] at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:648) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT] at org.apache.pulsar.broker.service.ServerCnx.lambda$null$12(ServerCnx.java:966) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT] at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) ~[?:1.8.0_172] ... 15 more 17:09:46.801 [pulsar-client-io-34-1:org.apache.pulsar.client.impl.ClientCnx@682] WARN org.apache.pulsar.client.impl.ClientCnx - [id: 0xb9d000f5, L:/127.0.0.1:55980 - R:localhost/127.0.0.1:15002] Received error from server: Topic policies cache have not init. 17:09:46.804 [pulsar-client-io-34-1:org.apache.pulsar.client.impl.ConsumerImpl@770] WARN org.apache.pulsar.client.impl.ConsumerImpl - [persistent://pulsar/test/localhost:15000/__change_events-partition-0][multiTopicsReader-c3d8054591] Failed to subscribe to topic on localhost/127.0.0.1:15002 17:09:46.804 [pulsar-client-io-34-1:org.apache.pulsar.client.impl.ConnectionHandler@102] WARN org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/test/localhost:15000/__change_events-partition-0] [multiTopicsReader-c3d8054591] Could not get connection to broker: Topic policies cache have not init. -- Will try again in 0.1 s ``` The reason is that it doesn't catch exception in getTopicPolicies, which will lead to subscribe failed. ```Java public boolean checkSubscriptionTypesEnable(SubType subType) throws Exception { TopicName topicName = TopicName.get(topic); if (brokerService.pulsar().getConfiguration().isTopicLevelPoliciesEnabled()) { TopicPolicies topicPolicies = brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)); if (topicPolicies == null) { return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType); } else { if (topicPolicies.getSubscriptionTypesEnabled().isEmpty()) { return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType); } return topicPolicies.getSubscriptionTypesEnabled().contains(subType); } } else { return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType); } } ``` ### Changes 1. catch the exception in `checkSubscriptionTypesEnable` 2. expose exception stack in `servercnx#handleSubscribe`
…#10341) ### Motivation When running KOP with topic policy, it get the following exception. ``` 17:09:46.777 [mock-pulsar-bk:org.apache.pulsar.broker.service.ServerCnx@1010] WARN org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:55980][persistent://pulsar/test/localhost:15000/__change_events-partition-0][multiTopicsReader-c3d8054591] Failed to create consumer: consumerId=0, Topic policies cache have not init. java.util.concurrent.CompletionException: org.apache.pulsar.broker.service.BrokerServiceException$TopicPoliciesCacheNotInitException: Topic policies cache have not init. at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:953) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_172] at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.lambda$addEntry$32(BookkeeperSchemaStorage.java:566) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT] at org.apache.bookkeeper.client.PulsarMockLedgerHandle.lambda$asyncAddEntry$5(PulsarMockLedgerHandle.java:194) ~[testmocks-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) [?:1.8.0_172] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) [?:1.8.0_172] at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) [?:1.8.0_172] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_172] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_172] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172] Caused by: org.apache.pulsar.broker.service.BrokerServiceException$TopicPoliciesCacheNotInitException: Topic policies cache have not init. at org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.getTopicPolicies(SystemTopicBasedTopicPoliciesService.java:148) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT] at org.apache.pulsar.broker.service.persistent.PersistentTopic.checkSubscriptionTypesEnable(PersistentTopic.java:2872) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT] at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:648) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT] at org.apache.pulsar.broker.service.ServerCnx.lambda$null$12(ServerCnx.java:966) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT] at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) ~[?:1.8.0_172] ... 15 more 17:09:46.801 [pulsar-client-io-34-1:org.apache.pulsar.client.impl.ClientCnx@682] WARN org.apache.pulsar.client.impl.ClientCnx - [id: 0xb9d000f5, L:/127.0.0.1:55980 - R:localhost/127.0.0.1:15002] Received error from server: Topic policies cache have not init. 17:09:46.804 [pulsar-client-io-34-1:org.apache.pulsar.client.impl.ConsumerImpl@770] WARN org.apache.pulsar.client.impl.ConsumerImpl - [persistent://pulsar/test/localhost:15000/__change_events-partition-0][multiTopicsReader-c3d8054591] Failed to subscribe to topic on localhost/127.0.0.1:15002 17:09:46.804 [pulsar-client-io-34-1:org.apache.pulsar.client.impl.ConnectionHandler@102] WARN org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/test/localhost:15000/__change_events-partition-0] [multiTopicsReader-c3d8054591] Could not get connection to broker: Topic policies cache have not init. -- Will try again in 0.1 s ``` The reason is that it doesn't catch exception in getTopicPolicies, which will lead to subscribe failed. ```Java public boolean checkSubscriptionTypesEnable(SubType subType) throws Exception { TopicName topicName = TopicName.get(topic); if (brokerService.pulsar().getConfiguration().isTopicLevelPoliciesEnabled()) { TopicPolicies topicPolicies = brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)); if (topicPolicies == null) { return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType); } else { if (topicPolicies.getSubscriptionTypesEnabled().isEmpty()) { return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType); } return topicPolicies.getSubscriptionTypesEnabled().contains(subType); } } else { return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType); } } ``` ### Changes 1. catch the exception in `checkSubscriptionTypesEnable` 2. expose exception stack in `servercnx#handleSubscribe`
…#10341) ### Motivation When running KOP with topic policy, it get the following exception. ``` 17:09:46.777 [mock-pulsar-bk:org.apache.pulsar.broker.service.ServerCnx@1010] WARN org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:55980][persistent://pulsar/test/localhost:15000/__change_events-partition-0][multiTopicsReader-c3d8054591] Failed to create consumer: consumerId=0, Topic policies cache have not init. java.util.concurrent.CompletionException: org.apache.pulsar.broker.service.BrokerServiceException$TopicPoliciesCacheNotInitException: Topic policies cache have not init. at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:953) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_172] at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_172] at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.lambda$addEntry$32(BookkeeperSchemaStorage.java:566) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT] at org.apache.bookkeeper.client.PulsarMockLedgerHandle.lambda$asyncAddEntry$5(PulsarMockLedgerHandle.java:194) ~[testmocks-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) [?:1.8.0_172] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) [?:1.8.0_172] at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) [?:1.8.0_172] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_172] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_172] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172] Caused by: org.apache.pulsar.broker.service.BrokerServiceException$TopicPoliciesCacheNotInitException: Topic policies cache have not init. at org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.getTopicPolicies(SystemTopicBasedTopicPoliciesService.java:148) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT] at org.apache.pulsar.broker.service.persistent.PersistentTopic.checkSubscriptionTypesEnable(PersistentTopic.java:2872) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT] at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:648) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT] at org.apache.pulsar.broker.service.ServerCnx.lambda$null$12(ServerCnx.java:966) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT] at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) ~[?:1.8.0_172] ... 15 more 17:09:46.801 [pulsar-client-io-34-1:org.apache.pulsar.client.impl.ClientCnx@682] WARN org.apache.pulsar.client.impl.ClientCnx - [id: 0xb9d000f5, L:/127.0.0.1:55980 - R:localhost/127.0.0.1:15002] Received error from server: Topic policies cache have not init. 17:09:46.804 [pulsar-client-io-34-1:org.apache.pulsar.client.impl.ConsumerImpl@770] WARN org.apache.pulsar.client.impl.ConsumerImpl - [persistent://pulsar/test/localhost:15000/__change_events-partition-0][multiTopicsReader-c3d8054591] Failed to subscribe to topic on localhost/127.0.0.1:15002 17:09:46.804 [pulsar-client-io-34-1:org.apache.pulsar.client.impl.ConnectionHandler@102] WARN org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/test/localhost:15000/__change_events-partition-0] [multiTopicsReader-c3d8054591] Could not get connection to broker: Topic policies cache have not init. -- Will try again in 0.1 s ``` The reason is that it doesn't catch exception in getTopicPolicies, which will lead to subscribe failed. ```Java public boolean checkSubscriptionTypesEnable(SubType subType) throws Exception { TopicName topicName = TopicName.get(topic); if (brokerService.pulsar().getConfiguration().isTopicLevelPoliciesEnabled()) { TopicPolicies topicPolicies = brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)); if (topicPolicies == null) { return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType); } else { if (topicPolicies.getSubscriptionTypesEnabled().isEmpty()) { return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType); } return topicPolicies.getSubscriptionTypesEnabled().contains(subType); } } else { return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType); } } ``` ### Changes 1. catch the exception in `checkSubscriptionTypesEnable` 2. expose exception stack in `servercnx#handleSubscribe`
Motivation
When running KOP with topic policy, it get the following exception.
The reason is that it doesn't catch exception in getTopicPolicies, which will lead to subscribe failed.
Changes
checkSubscriptionTypesEnable
servercnx#handleSubscribe