-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
All batch is redelivered even if some messages are acked #345
Comments
I have not specified that I use batch, is batch enabled by default? |
Batching is enabled on the producer by default, but batch index acknowledgement on the consumer is not. I assume that's the case mainly for backwards compatibility reasons. You need to enable it by setting As a workaround, you can create the consumer directly using the Java client: val pulsar4sConsumer: Future[pulsar4s.Consumer[T]] =
javaClient
.newConsumer(implicitly[Schema[T]])
.enableBatchIndexAcknowledgement(true)
.subscribeAsync()
.asScala
.map(new pulsar4s.DefaultConsumer(_)) |
Ok, thanks for the workaround! |
I'm closing this because it's an issue with the underlying Java client, not pulsar4s. It probably doesn't make sense for pulsar4s to have different default behavior than the Java client. |
When a consumer receives a batch, when some messages are nacked and others acked, all the batch is redelivered.
It seems related to PIP-54. After reading pulsar java client,
ConsumerConfigurationData
as abatchIndexAckEnabled
which has default to false.I think we all need to support it in pulsar4s
ConsumerConfig
.@gmethvin I saw you created the first issue: apache/pulsar#5969, WDYT?
The text was updated successfully, but these errors were encountered: