-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Compression must be applied during deferred schema preparation and enableBatching is enabled #9396
Compression must be applied during deferred schema preparation and enableBatching is enabled #9396
Conversation
…ableBatching is enabled if you do not set an initial schema to the Producer the schema must be prepared at the first message with a Schema. There is a bug and compression is not applied in this case, and the consumer receives an uncompressed message, failing
/pulsar-bot run-failure-tests |
1 similar comment
/pulsar-bot run-failure-tests |
/pulsarbot run-failure-checks |
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eolivelli I think the right fix is to update the isBatchMessagingEnabled()
method in the ProducerImpl
, it should consider the Schema state. If the producer send messages with different schemas, we can't add them to a batch. It's more clear to indicate the batch is disabled while using the multiple schemas for a producer. I want to bring up this is I see we have some confusing variable names such as compress a compressedPayload again
.
@codelipenghui I will try to implement your idea. @aahmed-se and @merlimat you had already commented. What's your opinion? |
@codelipenghui I have checked I don't like that names for variables but I did not find a better name and also I wanted to keep the patch as small as possible |
@merlimat I have addressed your comment, please take a look |
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Show resolved
Hide resolved
/pulsarbot run-failure-checks |
…ableBatching is enabled (#9396) * Compression must be applied during deferred schema preparation and enableBatching is enabled if you do not set an initial schema to the Producer the schema must be prepared at the first message with a Schema. There is a bug and compression is not applied in this case, and the consumer receives an uncompressed message, failing * address Matteo's comments Co-authored-by: Enrico Olivelli <eolivelli@apache.org> (cherry picked from commit 0143850)
…ableBatching is enabled (#9396) * Compression must be applied during deferred schema preparation and enableBatching is enabled if you do not set an initial schema to the Producer the schema must be prepared at the first message with a Schema. There is a bug and compression is not applied in this case, and the consumer receives an uncompressed message, failing * address Matteo's comments Co-authored-by: Enrico Olivelli <eolivelli@apache.org> (cherry picked from commit 0143850)
…ableBatching is enabled (apache#9396) * Compression must be applied during deferred schema preparation and enableBatching is enabled if you do not set an initial schema to the Producer the schema must be prepared at the first message with a Schema. There is a bug and compression is not applied in this case, and the consumer receives an uncompressed message, failing * address Matteo's comments Co-authored-by: Enrico Olivelli <eolivelli@apache.org> (cherry picked from commit 0143850) (cherry picked from commit e72b924)
Motivation
if you do not set an initial schema to the Producer the schema must be prepared at the first message with a Schema.
There is a bug and compression is not applied in this case, and the consumer receives an uncompressed message, failing
Modifications
Ensure that compression is applied in the code path during the schema preparation.
With enableBatching=true the compression is usually applied per batch, but in case of schema preparation the code flows down to a separate path and we have to enforce compression.
Basically if the SchemaState is not 'ready' the message cannot be added to the batch and we fall back to single message mode.
Verifying this change
This change added tests that failed without this fix
Does this pull request potentially affect one of the following parts:
The patch applied to producers that enable batching, compression and do not send an initial schema to the producer.