Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segmentation fault for acknowledge_cumulative #114

Closed
JeroenJansze1989 opened this issue Apr 28, 2023 · 3 comments · Fixed by #121
Closed

Segmentation fault for acknowledge_cumulative #114

JeroenJansze1989 opened this issue Apr 28, 2023 · 3 comments · Fixed by #121

Comments

@JeroenJansze1989
Copy link

JeroenJansze1989 commented Apr 28, 2023

Describe the bug
Trying to use the consumer acknowledge_cumulative(msg) functionality. The python shell stops with Segmentation fault (core dumped). Expected is to ack the messages until the current message in the stream or at least to have error handling available to the python interpreter.

To Reproduce
1、Test Conditions

  • Setup a python client
  • Subscribe to a topic
  • Consume 1 or more messages
  • Call consumer.acknowledge_cumulative(msg) or consumer.acknowledge_cumulative(msg.message_id()) with the latest message received.

2、Test code

import pulsar
from _pulsar import InitialPosition
from pulsar import Client, ConsumerBatchReceivePolicy, ConsumerType

pulsar_client = Client(pulsar_host, authentication=authentication_provider.get_authenticator()) # This is a AuthenticationToken 
consumer = pulsar_client.subscribe(
  topic=topic,
  subscription_name=consumer_id,
  initial_position=InitialPosition.Earliest,
  consumer_type=ConsumerType.KeyShared,
)

# Given some message on the topic
message = consumer.receive()
print(message.data()) # b'{"test": "test6"}'
message = consumer.receive()
print(message.data()) # b'{"test": "test8"}'

consumer.acknowledge_cumulative(message)

error log
Starting shell with python3 -q -X faulthandler

Fatal Python error: Segmentation fault

Current thread 0x00007f22216e0000 (most recent call first):
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pulsar/__init__.py", line 1300 in acknowledge_cumulative
  File "<stdin>", line 1 in <module>

Extension modules: charset_normalizer.md, _cffi_backend (total: 2)
Segmentation fault (core dumped)
@BewareMyPower
Copy link
Contributor

The cumulative acknowledgment cannot be used for Key_Shared subscription. I think we need to enhance the error processing at the Python client side. We should return an error instead of crash.


BTW, I cannot reproduce this issue with the following code with Python client 3.1.0 and Pulsar standalone 2.11.0.

from pulsar import Client, ConsumerType, InitialPosition

pulsar_client = Client('pulsar://localhost:6650')
topic = 'my-topic'

producer = pulsar_client.create_producer(topic=topic)
producer.send('msg-0'.encode())
producer.send('msg-1'.encode())
producer.close()

consumer = pulsar_client.subscribe(
    topic=topic,
    subscription_name='sub',
    initial_position=InitialPosition.Earliest,
    consumer_type=ConsumerType.KeyShared,
)

msg = consumer.receive()
print(f'{msg.data()} | {msg.message_id()}')
msg = consumer.receive()
print(f'{msg.data()} | {msg.message_id()}')

consumer.acknowledge_cumulative(msg)

pulsar_client.close()

Output:

b'msg-0' | (5,0,-1,-1)
b'msg-1' | (5,1,-1,-1)

The cumulative acknowledgment just fail without any exception thrown.

@JeroenJansze1989
Copy link
Author

Thanks for checking.

Improving the error handling python side sounds good and perhaps a docs update (https://pulsar.apache.org/api/python/3.1.x/pulsar.Consumer.html#acknowledge_cumulative).

On the reproducibility, perhaps the environment makes a difference, error was seen on Ubuntu 22.04, python 3.10.6

@BewareMyPower
Copy link
Contributor

BewareMyPower commented May 23, 2023

I just rechecked the code and found acknowledge_cumulative uses asynchronous version of the ACK API.

void Consumer_acknowledge(Consumer& consumer, const Message& msg) { consumer.acknowledgeAsync(msg, nullptr); }

So no exception should be thrown even if it failed.

I tested the code here inside a ubuntu:22.04 container and only modified the service URL to pulsar://host.docker.internal:6650 and it worked well. The Python version is also 3.10.6.

BewareMyPower added a commit to BewareMyPower/pulsar-client-python that referenced this issue May 24, 2023
Fixes apache#114

### Motivation

Currently the `acknowledge` and `acknowledge_cumulative` methods are all
asynchronous. Even if any error happened, no exception would be raised.
For example, when acknowledging cumulatively on a consumer whose
consumer type is Shared or KeyShared, no error happens.

### Modifications

- Change these methods to synchronous and raise exceptions if the
  acknowledgment failed.
- Add `PulsarTest.test_acknowledge_failed` to test these failed cases.
- Improve the documents to describe which exceptions could be raised in
  which cases.
merlimat pushed a commit that referenced this issue May 24, 2023
Fixes #114

### Motivation

Currently the `acknowledge` and `acknowledge_cumulative` methods are all
asynchronous. Even if any error happened, no exception would be raised.
For example, when acknowledging cumulatively on a consumer whose
consumer type is Shared or KeyShared, no error happens.

### Modifications

- Change these methods to synchronous and raise exceptions if the
  acknowledgment failed.
- Add `PulsarTest.test_acknowledge_failed` to test these failed cases.
- Improve the documents to describe which exceptions could be raised in
  which cases.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants