Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

regex subscription not working for new topics in Python #7168

Closed
baynes opened this issue Jun 4, 2020 · 2 comments · Fixed by #7206
Closed

regex subscription not working for new topics in Python #7168

baynes opened this issue Jun 4, 2020 · 2 comments · Fixed by #7206
Labels
help wanted type/bug The PR fixed a bug or issue reported a bug

Comments

@baynes
Copy link

baynes commented Jun 4, 2020

Describe the bug
If a new topic is added that matches a regex subscription then if the client is written in Java it detects it and adds a cursor but if it is written in Python it does not.

To Reproduce
Steps to reproduce the behavior:

Run the following as a Phython3 client:

#!/usr/bin/python3
import pulsar
import re

client = pulsar.Client("pulsar://localhost:6650")
initial_position=pulsar.InitialPosition.Earliest )
consumer = client.subscribe(re.compile('.*'), subscription_name='my-sub' )

while True:
    msg = consumer.receive()
    print("Received message '%s'" % msg.data())
    consumer.acknowledge(msg)

client.close()

In another window run this command to get a Java client for comparison:

/opt/pulsar/bin/pulsar-client consume --regex '.*' -s all -n 0

In another window send a message to a new topic:

/opt/pulsar//bin/pulsar-client produce addtopic -m 'm1'

Wait a minute for the clients to detect the new topic.
Send another message to the same topic.

/opt/pulsar//bin/pulsar-client produce addtopic -m 'm2'

The Java client receives the message but the Python one does not.

Kill and restart the clients. Send a message to the same topic.

/opt/pulsar//bin/pulsar-client produce addtopic -m 'm3'

Both clients receive the message.

Expected behavior
Both clients should receive the second message.

Screenshots
NA

Desktop (please complete the following information):
Centos 7
Pulsar 2.5.1
Python pulsar-client 2.5.2

Additional context
initial_position=pulsar.InitialPosition.Earliest does not help

@baynes baynes added the type/bug The PR fixed a bug or issue reported a bug label Jun 4, 2020
@baynes baynes changed the title regex subscripton not working for new topics in Python regex subscription not working for new topics in Python Jun 4, 2020
@jiazhai
Copy link
Member

jiazhai commented Jun 5, 2020

Thanks @baynes for reporting this issue. Since it is able to reproduce, mark it as help-wanted firstly. Any help on fix this issue is very appreciated.

@BewareMyPower
Copy link
Contributor

TL;DR It's because there's a deadlock in C++ client. I'll push a PR soon.

Just because C++ stand library doesn't have something like ConcurrentHashMap, the current implementation used a mutex_ to share a std::map.
However, the mutex_ is also acquired in receive, so if you called receive, the mutex_ is acquired and held until a new message was pushed to the internal message queue. Then if the topic discovery timer found new topics, the callback to add new topics also needed to acquire the mutex_, which leads to a deadlock.

The deadlock problem is similar to my PR before, see the change of PartitionedConsumerImpl::receive in #6732.

sijie pushed a commit that referenced this issue Jun 9, 2020
…auto discovery (#7206)

Fixes #7168 

### Motivation

When a pattern consumer is blocked by `receive()`, the `mutex_` will be held until new messages arrived. If the auto discovery timer task found new topics and tried to subscribe them, `mutex_` must be acquired first, then the deadlock happened.

### Modifications

- Release the `mutex_` after the consumer's state was verified.
- Change unit tests to verify that new topics could be subscribed when the consumer's blocked by `receive(Message&)` or `receive(Message&, int)` methods.

### Verifying this change

- [ ] Make sure that the change passes the CI checks.

This change is already covered by existing tests, such as *BasicEndToEndTest.testPatternMultiTopicsConsumerAutoDiscovery*.
cdbartholomew pushed a commit to kafkaesque-io/pulsar that referenced this issue Jul 24, 2020
…auto discovery (apache#7206)

Fixes apache#7168 

### Motivation

When a pattern consumer is blocked by `receive()`, the `mutex_` will be held until new messages arrived. If the auto discovery timer task found new topics and tried to subscribe them, `mutex_` must be acquired first, then the deadlock happened.

### Modifications

- Release the `mutex_` after the consumer's state was verified.
- Change unit tests to verify that new topics could be subscribed when the consumer's blocked by `receive(Message&)` or `receive(Message&, int)` methods.

### Verifying this change

- [ ] Make sure that the change passes the CI checks.

This change is already covered by existing tests, such as *BasicEndToEndTest.testPatternMultiTopicsConsumerAutoDiscovery*.
huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this issue Aug 24, 2020
…auto discovery (apache#7206)

Fixes apache#7168 

### Motivation

When a pattern consumer is blocked by `receive()`, the `mutex_` will be held until new messages arrived. If the auto discovery timer task found new topics and tried to subscribe them, `mutex_` must be acquired first, then the deadlock happened.

### Modifications

- Release the `mutex_` after the consumer's state was verified.
- Change unit tests to verify that new topics could be subscribed when the consumer's blocked by `receive(Message&)` or `receive(Message&, int)` methods.

### Verifying this change

- [ ] Make sure that the change passes the CI checks.

This change is already covered by existing tests, such as *BasicEndToEndTest.testPatternMultiTopicsConsumerAutoDiscovery*.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants