-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add authoritative flag for topic policy to avoid redirect loop #11051
Add authoritative flag for topic policy to avoid redirect loop #11051
Conversation
1. Add authoritative flag for topic policy to avoid redirect loop 2. Prevent set topic policy on a non-existing topic 3. Prevent set topic policy on a partition of a partitioned topic 4. Redirect to the broker which is owner of the partition-0 for a partitioned topic when setting topic policy 5. Don't remove policy cache when the topic removed from the broker, this will lead to the topic come back, but can't find the topic policy, since the namespace does not removed from the broker, we will not read from the system topic again. For this case we already handled when the broker does not provide service for that namespace, the topic policy cache under the namespace will be removed.
|
||
protected void internalGetRetention(AsyncResponse asyncResponse, boolean applied){ | ||
RetentionPolicies retentionPolicies = getTopicPolicies(topicName) | ||
protected RetentionPolicies internalGetRetention(boolean applied) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use CompletableFuture ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with you, I noticed the current implementation is not in a consistent way, I have a local change for handling this case, but don't want to use this PR because not easy to review. So for this PR I don't want to change any existing internal implementation of the topic policy, just make sure we can avoid the redirect loop.
@@ -390,7 +390,6 @@ public void clean(TopicName topicName) { | |||
//change persistent://tenant/namespace/xxx-partition-0 to persistent://tenant/namespace/xxx | |||
realTopicName = TopicName.get(topicName.getPartitionedTopicName()); | |||
} | |||
policiesCache.remove(realTopicName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We must at least clean up this cache when deleting topics, right?
If there are other topics in the Namespace that have been used, but some topics have been deleted, these deleted topics will be stored in the memory forever. We should find a way to put it in the cache when Own Bundle
is triggered by the same topic, instead of keeping it in the cache without deleting it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the OwnerShip of Topic is transferred, we can not delete the cache, but when Topic is deleted, should this cache be deleted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, the issue is when the topic been unloaded, we will not able to get the cache again if the namespace bundle ownership does not change.
For the topic deletion case, I think we can write a message with key(topic name) and value of null to the system topic, this will be compacted and we can also handle the null value message as the delete topic policy operation at the TopicPolicyService, I can push a separate PR to fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change make sense to me.
But I left a couple of comments
PTAL
, namespacePolicy, pulsar().getConfiguration().getProperties()); | ||
} | ||
res.complete(offloadPolicies); | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we can a specific exception here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eolivelli We should catch all the exceptions here because we don't want to return a CompletableFuture but also throw exceptions. It should be handled by the CompletableFuture
if (topicPolicies == null){ | ||
topicPolicies = new TopicPolicies(); | ||
topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new); | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we catch a specific Exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as the above comment.
@@ -201,6 +201,7 @@ public void testCacheCleanup() throws Exception { | |||
assertNotNull(listMap.get(topicName).get(0)); | |||
|
|||
admin.topics().deletePartitionedTopic(topic, true); | |||
admin.namespaces().unload(NAMESPACE1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the test for checking the cache cleanup, you can check the conversation with @315157973, the topic policy cache is based on the namespace, so we need to unload the namespaces to make sure the cache been reloaded, in the conversation with @315157973 I have mentioned I need to create a separate PR to handle the topic policy deletion when deleting the topic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Not able to cherry-pick to branch-2.7, will create a PR based on branch-2.7 soon. |
* Add authoritative flag for topic policy to avoid redirect loop 1. Add authoritative flag for topic policy to avoid redirect loop 2. Prevent set topic policy on a non-existing topic 3. Prevent set topic policy on a partition of a partitioned topic 4. Redirect to the broker which is owner of the partition-0 for a partitioned topic when setting topic policy 5. Don't remove policy cache when the topic removed from the broker, this will lead to the topic come back, but can't find the topic policy, since the namespace does not removed from the broker, we will not read from the system topic again. For this case we already handled when the broker does not provide service for that namespace, the topic policy cache under the namespace will be removed. (cherry picked from commit 0b67438)
…e#11051) * Add authoritative flag for topic policy to avoid redirect loop 1. Add authoritative flag for topic policy to avoid redirect loop 2. Prevent set topic policy on a non-existing topic 3. Prevent set topic policy on a partition of a partitioned topic 4. Redirect to the broker which is owner of the partition-0 for a partitioned topic when setting topic policy 5. Don't remove policy cache when the topic removed from the broker, this will lead to the topic come back, but can't find the topic policy, since the namespace does not removed from the broker, we will not read from the system topic again. For this case we already handled when the broker does not provide service for that namespace, the topic policy cache under the namespace will be removed.
Motivation
this will lead to the topic come back, but can't find the topic policy,
since the namespace does not remove from the broker, we will not read from
the system topic again. For this case, we already handled when the broker does not
provide service for that namespace, the topic policy cache under the namespace will be removed.