New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: Add header deserialize option on Kafka source #842
Conversation
Cc @lburgazzoli |
...c/main/java/org/apache/camel/kamelets/utils/serialization/kafka/KafkaHeaderDeserializer.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
for (Map.Entry<String, Object> header : headers.entrySet()) { | ||
header.setValue(typeConverter.convertTo(String.class, header.getValue())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are a number of headers that the kafka component adds i.e. kafka.OFFSET and other things like that, I guess those should not be converted automatically
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have excluded kafka.HEADERS and CamelKafkaManualCommit by default now as these are not trivial to deserialize to String. As a user I would expect other headers to be deserialized by default
Added a few comments :) |
8a9480a
to
c37e905
Compare
- Adds utility class to auto deserialize message headers from byte[] to String - Option must be explicitly enabled on the source Kamelet - Exclude non String Kafka headers from deserialization (kafka.HEADERS and CamelKafkaManualCommit)
d879a96
to
b6125bf
Compare
b6125bf
to
eacc9e3
Compare
@lburgazzoli I am happy now with this. Mind having another look? |
@christophd LGTM, I would go a second round of review around the conversion of the headers added bu the component, i.e. all those with a |
Is there something more to add? |
@oscerd @christophd is is fine with me to merge Points to further investigation after merge could be:
|
Thanks. |
This PR uses a new utility class
org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer
inkafka-source.kamelet
. The CI tests on this PR are expected to fail because of this until the utility class is available in main_SNAPSHOT.Please have a look at the PR on my fork to see the successful tests CI workflow: christophd#4