-
Notifications
You must be signed in to change notification settings - Fork 47
Description
Describe the bug
Sending a message with compression, batching enabled and deliver_after results in a corrupted message. Disabling compression resolves the problem.
To Reproduce
Steps to reproduce the behavior:
Create a Pulsar Consumer subscribed to some topic
Start the consumer
Create a Pulsar Producer on the same topic with LZ4 compression enabled (see Additional context)
Call the producer.send_async method with deliver_after set to a valid timedelta (see Additional context)
The Consumer will now log something similar to:
ERROR [140072584582912] ConsumerImpl:534 | [persistent://public/default/test, test, 0] Failed to decompress message with 6 at 96981:3
ERROR [140072584582912] ConsumerImpl:546 | [persistent://public/default/test, test, 0] Discarding corrupted message at 96981:3
Expected behavior
Instead of throwing an error, the message should reach the consumer uncorrupted.
Screenshots
Desktop (please complete the following information):
- OS: Ubuntu 20.04.2 LTS
- CPU: Intel i7 8750h
- RAM: 16 GB DDR4
- Disk: 1 TB NVMe SSD (with over 100 GB free on the partition)
- Nvidia dGPU (20th gen)
Additional context
Using the Python Pulsar client version 2.7.0 (Python 3.8) and running Pulsar version 2.6.0 via Docker
Consumer options:
{
"consumer_type": pulsar.ConsumerType.Shared,
"initial_position": pulsar.InitialPosition.Earliest,
}
Producer options:
{
"compression_type": pulsar.CompressionType.LZ4,
"batching_enabled": True,
"block_if_queue_full": True,
"send_timeout_millis": 0
}
Producer call:
producer.send_async(
content=b'test',
callback=lambda res, msg_id: print(res),
partition_key=None,
deliver_after=datetime.timedelta(seconds=5.0)
)