Skip to content

Commit

Permalink
chore: Kafka source uses local bean for header deserialization
Browse files Browse the repository at this point in the history
  • Loading branch information
christophd committed Mar 21, 2022
1 parent 2539572 commit eacc9e3
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 20 deletions.
14 changes: 8 additions & 6 deletions kamelets/kafka-source.kamelet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ spec:
- "camel:kafka"
- "camel:kamelet"
template:
beans:
- name: kafkaHeaderDeserializer
type: "#class:org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer"
property:
- key: enabled
value: '{{deserializeHeaders}}'
from:
uri: "kafka:{{topic}}"
parameters:
Expand All @@ -143,10 +149,6 @@ spec:
autoOffsetReset: "{{autoOffsetReset}}"
groupId: "{{?consumerGroup}}"
steps:
- set-property:
name: deserializeHeaders
constant: "{{deserializeHeaders}}"
- bean: "org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer"
- remove-property:
name: deserializeHeaders
- process:
ref: "{{kafkaHeaderDeserializer}}"
- to: "kamelet:sink"
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.Map;

import org.apache.camel.Exchange;
import org.apache.camel.ExchangeProperty;
import org.apache.camel.Processor;
import org.apache.camel.TypeConverter;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.support.SimpleTypeConverter;
Expand All @@ -31,12 +31,15 @@
* Uses given type converter implementation set on the Camel context to convert values. If no type converter is set
* the deserializer uses its own fallback conversion implementation.
*/
public class KafkaHeaderDeserializer {
public class KafkaHeaderDeserializer implements Processor {

boolean enabled = false;

private final SimpleTypeConverter defaultTypeConverter = new SimpleTypeConverter(true, KafkaHeaderDeserializer::convert);

public void process(@ExchangeProperty("deserializeHeaders") boolean deserializeHeaders, Exchange exchange) throws Exception {
if (!deserializeHeaders) {
@Override
public void process(Exchange exchange) throws Exception {
if (!enabled) {
return;
}

Expand Down Expand Up @@ -86,4 +89,8 @@ private static Object convert(Class<?> type, Exchange exchange, Object value) {
private boolean shouldDeserialize(Map.Entry<String, Object> entry) {
return !entry.getKey().equals(KafkaConstants.HEADERS) && !entry.getKey().equals(KafkaConstants.MANUAL_COMMIT);
}

public void setEnabled(String enabled) {
this.enabled = Boolean.parseBoolean(enabled);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ void shouldDeserializeHeaders() throws Exception {
exchange.getMessage().setHeader("fooNull", null);
exchange.getMessage().setHeader("number", 1L);

processor.process(true, exchange);
processor.enabled = true;
processor.process(exchange);

Assertions.assertTrue(exchange.getMessage().hasHeaders());
Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo"));
Expand All @@ -68,7 +69,8 @@ void shouldDeserializeHeadersViaTypeConverter() throws Exception {
exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8));
exchange.getMessage().setHeader("fooNull", null);

processor.process(true, exchange);
processor.enabled = true;
processor.process(exchange);

Assertions.assertTrue(exchange.getMessage().hasHeaders());
Assertions.assertEquals("converted", exchange.getMessage().getHeader("foo"));
Expand All @@ -84,7 +86,8 @@ void shouldFallbackToDefaultConverter() throws Exception {
exchange.getMessage().setHeader("foo", "bar");
exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8));

processor.process(true, exchange);
processor.enabled = true;
processor.process(exchange);

Assertions.assertTrue(exchange.getMessage().hasHeaders());
Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo"));
Expand All @@ -98,7 +101,8 @@ void shouldNotDeserializeHeadersWhenDisabled() throws Exception {
exchange.getMessage().setHeader("foo", "bar");
exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8));

processor.process(false, exchange);
processor.enabled = false;
processor.process(exchange);

Assertions.assertTrue(exchange.getMessage().hasHeaders());
Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ spec:
- "camel:kafka"
- "camel:kamelet"
template:
beans:
- name: kafkaHeaderDeserializer
type: "#class:org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer"
property:
- key: enabled
value: '{{deserializeHeaders}}'
from:
uri: "kafka:{{topic}}"
parameters:
Expand All @@ -143,10 +149,6 @@ spec:
autoOffsetReset: "{{autoOffsetReset}}"
groupId: "{{?consumerGroup}}"
steps:
- set-property:
name: deserializeHeaders
constant: "{{deserializeHeaders}}"
- bean: "org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer"
- remove-property:
name: deserializeHeaders
- process:
ref: "{{kafkaHeaderDeserializer}}"
- to: "kamelet:sink"

0 comments on commit eacc9e3

Please sign in to comment.