-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[pulsar-broker] Fix bug that namespace policies does not take effect due to NPE #5408
Conversation
retest this please |
if (dispatcher != null && dispatcher.getRateLimiter().isPresent()) { | ||
dispatcher.getRateLimiter().get().updateDispatchRate(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make it even more sure, can we do like:
Dispatcher dispatcher = persistentSubscription.getDispatcher();
if (dispatcher != null) {
dispatcher.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate);
}
@@ -1673,7 +1676,7 @@ private boolean shouldTopicBeRetained() { | |||
}); | |||
subscriptions.forEach((subName, sub) -> { | |||
sub.getConsumers().forEach(Consumer::checkPermissions); | |||
if (sub.getDispatcher().getRateLimiter().isPresent()) { | |||
if (sub.getDispatcher() != null && sub.getDispatcher().getRateLimiter().isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, protect against dispatcher turning null
subscription.getDispatcher().initializeDispatchRateLimiterIfNeeded(policies)); | ||
subscriptions.forEach((name, subscription) -> { | ||
if (subscription.getDispatcher() != null) { | ||
subscription.getDispatcher().initializeDispatchRateLimiterIfNeeded(policies); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, protect against dispatcher turning null in between checks
@merlimat Addressed your comments. PTAL |
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
run Integration tests |
2 similar comments
run Integration tests |
run Integration tests |
rerun integration tests |
2 similar comments
rerun integration tests |
rerun integration tests |
Motivation
When produce/consume permissions for a role on a namespace are revoked, producers and consumers connected to the topic under the namespace using that role should be disconnected from the broker. However, I noticed that producers/consumers can stay connected in some topics even if the permissions are revoked.
As a result of the investigation, I found that NPE was thrown in the following line:
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
Line 273 in 094ebf7
In subscriptions that consumer has never connected to since the broker started, the dispatcher has not been initialized, so
subscription.getDispatcher()
returns null.As a result, the process of
PersistentTopic#onPoliciesUpdate()
is aborted.pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
Lines 1668 to 1682 in 094ebf7
Modifications
Before calling a dispatcher method, make sure that the dispatcher is not null.