Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue #7556][pulsar-client] Ensure parallel invocations of MultiTopicsConsumerImpl::subscribeAsync with same topic name do not produce an error. #7691

Conversation

sandrzejczak
Copy link
Contributor

Fixes #7556

Motivation

We use PatternMultiTopicsConsumerImpl in our solution, but we need to subscribe to new topics instantly after their creation. Default behavior of PatternMultiTopicsConsumerImpl is too slow and too costly for our use case.
This is why we've created an external mechanism which notifies us about new topics. We then subscribe to them using MultiTopicsConsumerImpl#subscribeAsync.
However, if this method is invoked multiple times with same topicName simultaneously, an error may occur and topic's consumers may get closed. It happens, because multiple invocations can pass the initial check in topicNameValid(String topicName), as the topics map does not contain any entry for the topic yet. More detailed description is available at issue's page: #7556

Modifications

Code in MultiTopicsConsumerImpl#doSubscribeTopicPartitions now checks if topics map already contains an entry for a given topic. It does so by checking the return value of putIfAbsent method. If it does already contain an entry, then subscribeResult future is completed exceptionally and the method returns. It prevents subsequent checkState(currentAllTopicsPartitionsNumber == numTopics, "...") invocation from throwing an exception which would cause topic's consumers to get closed.

Verifying this change

Added new unit test to MultiTopicsConsumerImplTest class.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)
  • If a feature is not applicable for documentation, explain why? (not applicable)
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation. (not applicable)

@sijie sijie added this to the 2.7.0 milestone Jul 29, 2020
@sijie sijie added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Jul 29, 2020
@sijie
Copy link
Member

sijie commented Aug 6, 2020

/pulsarbot run-failure-checks

@sijie sijie merged commit c54a47e into apache:master Aug 6, 2020
huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this pull request Aug 24, 2020
…c with same topic name do not hang. Fixes  apache#7556. (apache#7691)

Fixes apache#7556

### Motivation

We use PatternMultiTopicsConsumerImpl in our solution, but we need to subscribe to new topics instantly after their creation. Default behavior of PatternMultiTopicsConsumerImpl is too slow and too costly for our use case.
This is why we've created an external mechanism which notifies us about new topics. We then subscribe to them using MultiTopicsConsumerImpl#subscribeAsync.
However, if this method is invoked multiple times with same topicName simultaneously, an error may occur and topic's consumers may get closed. It happens, because multiple invocations can pass the initial check in topicNameValid(String topicName), as the `topics` map does not contain any entry for the topic yet. More detailed description is available at issue's page: apache#7556

### Modifications

Code in MultiTopicsConsumerImpl#doSubscribeTopicPartitions now checks if `topics` map already contains an entry for a given topic. It does so by checking the return value of putIfAbsent method. If it does already contain an entry, then subscribeResult future is completed exceptionally and the method returns. It prevents subsequent `checkState(currentAllTopicsPartitionsNumber == numTopics, "...")` invocation from throwing an exception which would cause topic's consumers to get closed.

### Verifying this change

Added new unit test to MultiTopicsConsumerImplTest class.
lbenc135 pushed a commit to lbenc135/pulsar that referenced this pull request Sep 5, 2020
…c with same topic name do not hang. Fixes  apache#7556. (apache#7691)

Fixes apache#7556

### Motivation

We use PatternMultiTopicsConsumerImpl in our solution, but we need to subscribe to new topics instantly after their creation. Default behavior of PatternMultiTopicsConsumerImpl is too slow and too costly for our use case.
This is why we've created an external mechanism which notifies us about new topics. We then subscribe to them using MultiTopicsConsumerImpl#subscribeAsync.
However, if this method is invoked multiple times with same topicName simultaneously, an error may occur and topic's consumers may get closed. It happens, because multiple invocations can pass the initial check in topicNameValid(String topicName), as the `topics` map does not contain any entry for the topic yet. More detailed description is available at issue's page: apache#7556

### Modifications

Code in MultiTopicsConsumerImpl#doSubscribeTopicPartitions now checks if `topics` map already contains an entry for a given topic. It does so by checking the return value of putIfAbsent method. If it does already contain an entry, then subscribeResult future is completed exceptionally and the method returns. It prevents subsequent `checkState(currentAllTopicsPartitionsNumber == numTopics, "...")` invocation from throwing an exception which would cause topic's consumers to get closed.

### Verifying this change

Added new unit test to MultiTopicsConsumerImplTest class.
lbenc135 pushed a commit to lbenc135/pulsar that referenced this pull request Sep 5, 2020
…c with same topic name do not hang. Fixes  apache#7556. (apache#7691)

Fixes apache#7556

### Motivation

We use PatternMultiTopicsConsumerImpl in our solution, but we need to subscribe to new topics instantly after their creation. Default behavior of PatternMultiTopicsConsumerImpl is too slow and too costly for our use case.
This is why we've created an external mechanism which notifies us about new topics. We then subscribe to them using MultiTopicsConsumerImpl#subscribeAsync.
However, if this method is invoked multiple times with same topicName simultaneously, an error may occur and topic's consumers may get closed. It happens, because multiple invocations can pass the initial check in topicNameValid(String topicName), as the `topics` map does not contain any entry for the topic yet. More detailed description is available at issue's page: apache#7556

### Modifications

Code in MultiTopicsConsumerImpl#doSubscribeTopicPartitions now checks if `topics` map already contains an entry for a given topic. It does so by checking the return value of putIfAbsent method. If it does already contain an entry, then subscribeResult future is completed exceptionally and the method returns. It prevents subsequent `checkState(currentAllTopicsPartitionsNumber == numTopics, "...")` invocation from throwing an exception which would cause topic's consumers to get closed.

### Verifying this change

Added new unit test to MultiTopicsConsumerImplTest class.
lbenc135 pushed a commit to lbenc135/pulsar that referenced this pull request Sep 5, 2020
…c with same topic name do not hang. Fixes  apache#7556. (apache#7691)

Fixes apache#7556

### Motivation

We use PatternMultiTopicsConsumerImpl in our solution, but we need to subscribe to new topics instantly after their creation. Default behavior of PatternMultiTopicsConsumerImpl is too slow and too costly for our use case.
This is why we've created an external mechanism which notifies us about new topics. We then subscribe to them using MultiTopicsConsumerImpl#subscribeAsync.
However, if this method is invoked multiple times with same topicName simultaneously, an error may occur and topic's consumers may get closed. It happens, because multiple invocations can pass the initial check in topicNameValid(String topicName), as the `topics` map does not contain any entry for the topic yet. More detailed description is available at issue's page: apache#7556

### Modifications

Code in MultiTopicsConsumerImpl#doSubscribeTopicPartitions now checks if `topics` map already contains an entry for a given topic. It does so by checking the return value of putIfAbsent method. If it does already contain an entry, then subscribeResult future is completed exceptionally and the method returns. It prevents subsequent `checkState(currentAllTopicsPartitionsNumber == numTopics, "...")` invocation from throwing an exception which would cause topic's consumers to get closed.

### Verifying this change

Added new unit test to MultiTopicsConsumerImplTest class.
wolfstudy pushed a commit that referenced this pull request Oct 30, 2020
…c with same topic name do not hang. Fixes  #7556. (#7691)

Fixes #7556

### Motivation

We use PatternMultiTopicsConsumerImpl in our solution, but we need to subscribe to new topics instantly after their creation. Default behavior of PatternMultiTopicsConsumerImpl is too slow and too costly for our use case.
This is why we've created an external mechanism which notifies us about new topics. We then subscribe to them using MultiTopicsConsumerImpl#subscribeAsync.
However, if this method is invoked multiple times with same topicName simultaneously, an error may occur and topic's consumers may get closed. It happens, because multiple invocations can pass the initial check in topicNameValid(String topicName), as the `topics` map does not contain any entry for the topic yet. More detailed description is available at issue's page: #7556

### Modifications

Code in MultiTopicsConsumerImpl#doSubscribeTopicPartitions now checks if `topics` map already contains an entry for a given topic. It does so by checking the return value of putIfAbsent method. If it does already contain an entry, then subscribeResult future is completed exceptionally and the method returns. It prevents subsequent `checkState(currentAllTopicsPartitionsNumber == numTopics, "...")` invocation from throwing an exception which would cause topic's consumers to get closed.

### Verifying this change

Added new unit test to MultiTopicsConsumerImplTest class.

(cherry picked from commit c54a47e)
merlimat pushed a commit to merlimat/pulsar that referenced this pull request Dec 19, 2020
…c with same topic name do not hang. Fixes  apache#7556. (apache#7691)

Fixes apache#7556

### Motivation

We use PatternMultiTopicsConsumerImpl in our solution, but we need to subscribe to new topics instantly after their creation. Default behavior of PatternMultiTopicsConsumerImpl is too slow and too costly for our use case.
This is why we've created an external mechanism which notifies us about new topics. We then subscribe to them using MultiTopicsConsumerImpl#subscribeAsync.
However, if this method is invoked multiple times with same topicName simultaneously, an error may occur and topic's consumers may get closed. It happens, because multiple invocations can pass the initial check in topicNameValid(String topicName), as the `topics` map does not contain any entry for the topic yet. More detailed description is available at issue's page: apache#7556

### Modifications

Code in MultiTopicsConsumerImpl#doSubscribeTopicPartitions now checks if `topics` map already contains an entry for a given topic. It does so by checking the return value of putIfAbsent method. If it does already contain an entry, then subscribeResult future is completed exceptionally and the method returns. It prevents subsequent `checkState(currentAllTopicsPartitionsNumber == numTopics, "...")` invocation from throwing an exception which would cause topic's consumers to get closed.

### Verifying this change

Added new unit test to MultiTopicsConsumerImplTest class.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
release/2.6.2 type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Possibility to break a state in MultiTopicsConsumerImpl class during subscribe new topics
2 participants