Skip to content

Commit

Permalink
[fix][broker] Topic policy can not be work well if replay policy mess…
Browse files Browse the repository at this point in the history
…age has any exception. (#20613)

(cherry picked from commit 200fb56)
  • Loading branch information
zhanghaou authored and liangyepianzhou committed Jul 7, 2023
1 parent 7b6d1c9 commit 0a69f66
Showing 1 changed file with 15 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,11 @@ private void notifyListener(Message<PulsarEvent> msg) {
TopicName topicName = TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName());
if (listeners.get(topicName) != null) {
for (TopicPolicyListener<TopicPolicies> listener : listeners.get(topicName)) {
listener.onUpdate(null);
try {
listener.onUpdate(null);
} catch (Throwable error) {
log.error("[{}] call listener error.", topicName, error);
}
}
}
return;
Expand All @@ -191,7 +195,11 @@ private void notifyListener(Message<PulsarEvent> msg) {
if (listeners.get(topicName) != null) {
TopicPolicies policies = event.getPolicies();
for (TopicPolicyListener<TopicPolicies> listener : listeners.get(topicName)) {
listener.onUpdate(policies);
try {
listener.onUpdate(policies);
} catch (Throwable error) {
log.error("[{}] call listener error.", topicName, error);
}
}
}
}
Expand Down Expand Up @@ -362,7 +370,11 @@ private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, Comp
policiesCache.forEach(((topicName, topicPolicies) -> {
if (listeners.get(topicName) != null) {
for (TopicPolicyListener<TopicPolicies> listener : listeners.get(topicName)) {
listener.onUpdate(topicPolicies);
try {
listener.onUpdate(topicPolicies);
} catch (Throwable error) {
log.error("[{}] call listener error.", topicName, error);
}
}
}
}));
Expand Down

0 comments on commit 0a69f66

Please sign in to comment.