Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Topic policy can not be work well if replay policy message has any exception. #20613

Merged
merged 4 commits into from
Jun 30, 2023

Conversation

zhanghaou
Copy link
Contributor

Motivation

If listener.onUpdate has any exception, the future can not be complete. We should catch the exception and make the method more safety, otherwise thread would be block.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jun 20, 2023
@zhanghaou zhanghaou changed the title [fix][broker] Topic policy can not be work well if replay policy message has any excetion. [fix][broker] Topic policy can not be work well if replay policy message has any exception. Jun 20, 2023
}
}));
}));
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throwable is better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

policiesCache.forEach(((topicName, topicPolicies) -> {
if (listeners.get(topicName) != null) {
for (TopicPolicyListener<TopicPolicies> listener : listeners.get(topicName)) {
listener.onUpdate(topicPolicies);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we easily catch the onUpdate method's exception with the warn log and then normally complete the future?

IMO, Because it is an implementation problem(bug) in the listener logic, we can't let one bad apple spoils the whole bunch. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we complete directly, it may introduce data inconsistency? Complete directly or throws exception depends on the importance of this method. From the source pr(#11136) of this code, I think this method is very important.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the listener throws an exception, it should be a bug. Do you have any ability to auto-fix it by retry?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to fix this bug instead of ignoring it.

}));
} catch (Throwable e) {
log.error("[{}] Failed to replay policy message.", reader.getSystemTopic().getTopicName(), e);
future.completeExceptionally(e);
Copy link
Member

@mattisonchao mattisonchao Jun 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The callback will invoke before cleanCacheAndCloseReader, which may cause some inconsistent status. :)

  1. Considering someone who checks the cache and reader status in the future callback.
  2. Considering completeExceptionally throw exception. (logically)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleanCacheAndCloseReader had delete from here.

} catch (Throwable e) {
log.error("[{}] Failed to replay policy message.", reader.getSystemTopic().getTopicName(), e);
future.completeExceptionally(e);
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not close the reader here because the caller will close it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, right. We shouldn't close the reader here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleanCacheAndCloseReader had delete from here.

Copy link
Contributor

@Technoboy- Technoboy- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't close the reader there

@zhanghaou
Copy link
Contributor Author

We shouldn't close the reader there

cleanCacheAndCloseReader had delete from here. And I will make a new pr to fix some other use case.

@zhanghaou
Copy link
Contributor Author

We shouldn't close the reader there

cleanCacheAndCloseReader had delete from here. And I will make a new pr to fix some other use case.

#20640

@Technoboy-
Copy link
Contributor

I think we need to process like this :

policiesCache.forEach(((topicName, topicPolicies) -> {
                    if (listeners.get(topicName) != null) {
                        for (TopicPolicyListener<TopicPolicies> listener : listeners.get(topicName)) {
                            try {
                                listener.onUpdate(topicPolicies);
                            } catch (Throwable error) {
                                log.error("[{}] call listener error.", topicName, error);
                            }
                        }
                    }
                }));

@zhanghaou
Copy link
Contributor Author

I think we need to process like this :

policiesCache.forEach(((topicName, topicPolicies) -> {
                    if (listeners.get(topicName) != null) {
                        for (TopicPolicyListener<TopicPolicies> listener : listeners.get(topicName)) {
                            try {
                                listener.onUpdate(topicPolicies);
                            } catch (Throwable error) {
                                log.error("[{}] call listener error.", topicName, error);
                            }
                        }
                    }
                }));

why?

@Technoboy-
Copy link
Contributor

I think we need to process like this :

policiesCache.forEach(((topicName, topicPolicies) -> {
                    if (listeners.get(topicName) != null) {
                        for (TopicPolicyListener<TopicPolicies> listener : listeners.get(topicName)) {
                            try {
                                listener.onUpdate(topicPolicies);
                            } catch (Throwable error) {
                                log.error("[{}] call listener error.", topicName, error);
                            }
                        }
                    }
                }));

why?

If the listener has an error, the reader cannot be closed here, because the reader is at the namespace level. Nor can other listeners not receive notifications because of a problem with one topic listener. So, only log output can be printed here

@zhanghaou
Copy link
Contributor Author

@Technoboy- Makes sense. Thanks for your explanation. I also caught exceptions in the notifyListener() method to prevent a listener callback from failing to terminate the remaining listener callbacks. Please take a look again.

@codecov-commenter
Copy link

Codecov Report

Merging #20613 (2871cc9) into master (2b01f83) will increase coverage by 3.33%.
The diff coverage is 22.64%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #20613      +/-   ##
============================================
+ Coverage     33.58%   36.92%   +3.33%     
+ Complexity    12127    12118       -9     
============================================
  Files          1613     1690      +77     
  Lines        126241   129192    +2951     
  Branches      13770    14091     +321     
============================================
+ Hits          42396    47698    +5302     
+ Misses        78331    75232    -3099     
- Partials       5514     6262     +748     
Flag Coverage Δ
inttests 24.14% <19.81%> (-0.05%) ⬇️
systests 25.05% <16.98%> (?)
unittests 32.03% <56.75%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...ava/org/apache/pulsar/admin/cli/CmdNamespaces.java 0.00% <0.00%> (ø)
...in/java/org/apache/pulsar/admin/cli/CmdTopics.java 0.00% <0.00%> (ø)
.../java/org/apache/pulsar/client/cli/CmdProduce.java 25.90% <9.09%> (-0.76%) ⬇️
.../service/SystemTopicBasedTopicPoliciesService.java 57.14% <50.00%> (+0.87%) ⬆️
...roker/service/persistent/MessageDeduplication.java 40.17% <50.00%> (ø)
...he/pulsar/client/impl/MultiTopicsConsumerImpl.java 32.81% <50.00%> (+0.44%) ⬆️
.../pulsar/broker/stats/BrokerOperabilityMetrics.java 100.00% <100.00%> (ø)
...org/apache/pulsar/broker/stats/DimensionStats.java 85.45% <100.00%> (ø)

... and 400 files with indirect coverage changes

@Technoboy- Technoboy- merged commit 200fb56 into apache:master Jun 30, 2023
85 of 89 checks passed
Technoboy- pushed a commit that referenced this pull request Jul 3, 2023
Technoboy- pushed a commit that referenced this pull request Jul 4, 2023
liangyepianzhou pushed a commit that referenced this pull request Jul 7, 2023
…age has any exception. (#20613)

(cherry picked from commit 200fb56)
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Jul 17, 2023
…age has any exception. (apache#20613)

(cherry picked from commit 200fb56)
(cherry picked from commit 0a69f66)
coderzc pushed a commit that referenced this pull request Aug 22, 2023
…age has any exception. (#20613)

(cherry picked from commit 200fb56)
@coderzc coderzc added the cherry-picked/branch-2.9 Archived: 2.9 is end of life label Aug 22, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants