Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Add header deserialize option on Kafka source #842

Merged
merged 2 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/yaks-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ jobs:
yaks run test/timer-source $YAKS_RUN_OPTIONS
yaks run test/earthquake-source $YAKS_RUN_OPTIONS
yaks run test/rest-openapi-sink $YAKS_RUN_OPTIONS
yaks run test/kafka $YAKS_RUN_OPTIONS
- name: YAKS Report
if: failure()
run: |
Expand Down
23 changes: 20 additions & 3 deletions kamelets/kafka-source.kamelet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ spec:
default: SASL_SSL
saslMechanism:
title: SASL Mechanism
description: The Simple Authentication and Security Layer (SASL) Mechanism used.
description: The Simple Authentication and Security Layer (SASL) Mechanism used.
type: string
default: PLAIN
user:
title: Username
description: Username to authenticate to Kafka
description: Username to authenticate to Kafka
type: string
x-descriptors:
- urn:camel:group:credentials
Expand Down Expand Up @@ -117,10 +117,25 @@ spec:
x-descriptors:
- urn:keda:metadata:consumerGroup
- urn:keda:required
deserializeHeaders:
title: Automatically Deserialize Headers
description: When enabled the Kamelet source will deserialize all message headers to String representation.
type: boolean
x-descriptors:
- 'urn:alm:descriptor:com.tectonic.ui:checkbox'
default: false
dependencies:
- github:apache.camel-kamelets:camel-kamelets-utils:main-SNAPSHOT
- "camel:core"
- "camel:kafka"
- "camel:kamelet"
template:
beans:
- name: kafkaHeaderDeserializer
type: "#class:org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer"
property:
- key: enabled
value: '{{deserializeHeaders}}'
from:
uri: "kafka:{{topic}}"
parameters:
Expand All @@ -134,4 +149,6 @@ spec:
autoOffsetReset: "{{autoOffsetReset}}"
groupId: "{{?consumerGroup}}"
steps:
- to: "kamelet:sink"
- process:
ref: "{{kafkaHeaderDeserializer}}"
- to: "kamelet:sink"
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.camel.kamelets.utils.serialization.kafka;

import java.nio.charset.StandardCharsets;
import java.util.Map;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.TypeConverter;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.support.SimpleTypeConverter;

/**
* Header deserializer used in Kafka source Kamelet. Automatically converts all message headers to String.
* Uses given type converter implementation set on the Camel context to convert values. If no type converter is set
* the deserializer uses its own fallback conversion implementation.
*/
public class KafkaHeaderDeserializer implements Processor {

boolean enabled = false;

private final SimpleTypeConverter defaultTypeConverter = new SimpleTypeConverter(true, KafkaHeaderDeserializer::convert);

@Override
public void process(Exchange exchange) throws Exception {
if (!enabled) {
return;
}

Map<String, Object> headers = exchange.getMessage().getHeaders();

TypeConverter typeConverter = exchange.getContext().getTypeConverter();
if (typeConverter == null) {
typeConverter = defaultTypeConverter;
}

for (Map.Entry<String, Object> header : headers.entrySet()) {
if (shouldDeserialize(header)) {
header.setValue(typeConverter.convertTo(String.class, header.getValue()));
}
}
}

/**
* Fallback conversion strategy supporting null values, String and byte[]. Converts headers to respective
* String representation or null.
* @param type target type, always String in this case.
* @param exchange the exchange containing all headers to convert.
* @param value the current value to convert.
* @return String representation of given value or null if value itself is null.
*/
private static Object convert(Class<?> type, Exchange exchange, Object value) {
if (value == null) {
return null;
}

if (value instanceof String) {
return value;
}

if (value instanceof byte[]) {
return new String((byte[]) value, StandardCharsets.UTF_8);
}

return value.toString();
}

/**
* Exclude special Kafka headers from auto deserialization.
* @param entry
* @return
*/
private boolean shouldDeserialize(Map.Entry<String, Object> entry) {
return !entry.getKey().equals(KafkaConstants.HEADERS) && !entry.getKey().equals(KafkaConstants.MANUAL_COMMIT);
}

public void setEnabled(String enabled) {
this.enabled = Boolean.parseBoolean(enabled);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.camel.kamelets.utils.serialization.kafka;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;

import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.support.DefaultExchange;
import org.apache.camel.support.SimpleTypeConverter;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class KafkaHeaderDeserializerTest {

private DefaultCamelContext camelContext;

private final KafkaHeaderDeserializer processor = new KafkaHeaderDeserializer();

@BeforeEach
void setup() {
this.camelContext = new DefaultCamelContext();
}

@Test
void shouldDeserializeHeaders() throws Exception {
Exchange exchange = new DefaultExchange(camelContext);

exchange.getMessage().setHeader("foo", "bar");
exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8));
exchange.getMessage().setHeader("fooNull", null);
exchange.getMessage().setHeader("number", 1L);

processor.enabled = true;
processor.process(exchange);

Assertions.assertTrue(exchange.getMessage().hasHeaders());
Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo"));
Assertions.assertEquals("barBytes", exchange.getMessage().getHeader("fooBytes"));
Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("fooNull"));
Assertions.assertNull(exchange.getMessage().getHeader("fooNull"));
Assertions.assertEquals("1", exchange.getMessage().getHeader("number"));
}

@Test
void shouldDeserializeHeadersViaTypeConverter() throws Exception {
camelContext.setTypeConverter(new SimpleTypeConverter(true, (type, exchange, value) -> "converted"));

Exchange exchange = new DefaultExchange(camelContext);

exchange.getMessage().setHeader("foo", "bar");
exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8));
exchange.getMessage().setHeader("fooNull", null);

processor.enabled = true;
processor.process(exchange);

Assertions.assertTrue(exchange.getMessage().hasHeaders());
Assertions.assertEquals("converted", exchange.getMessage().getHeader("foo"));
Assertions.assertEquals("converted", exchange.getMessage().getHeader("fooBytes"));
Assertions.assertEquals("converted", exchange.getMessage().getHeader("fooNull"));
}

@Test
void shouldFallbackToDefaultConverter() throws Exception {
camelContext.setTypeConverter(null);
Exchange exchange = new DefaultExchange(camelContext);

exchange.getMessage().setHeader("foo", "bar");
exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8));

processor.enabled = true;
processor.process(exchange);

Assertions.assertTrue(exchange.getMessage().hasHeaders());
Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo"));
Assertions.assertEquals("barBytes", exchange.getMessage().getHeader("fooBytes"));
}

@Test
void shouldNotDeserializeHeadersWhenDisabled() throws Exception {
Exchange exchange = new DefaultExchange(camelContext);

exchange.getMessage().setHeader("foo", "bar");
exchange.getMessage().setHeader("fooBytes", "barBytes".getBytes(StandardCharsets.UTF_8));

processor.enabled = false;
processor.process(exchange);

Assertions.assertTrue(exchange.getMessage().hasHeaders());
Assertions.assertEquals("bar", exchange.getMessage().getHeader("foo"));
Assertions.assertTrue(exchange.getMessage().getHeader("fooBytes") instanceof byte[]);
Assertions.assertEquals(Arrays.toString("barBytes".getBytes(StandardCharsets.UTF_8)), Arrays.toString((byte[]) exchange.getMessage().getHeader("fooBytes")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ spec:
default: SASL_SSL
saslMechanism:
title: SASL Mechanism
description: The Simple Authentication and Security Layer (SASL) Mechanism used.
description: The Simple Authentication and Security Layer (SASL) Mechanism used.
type: string
default: PLAIN
user:
title: Username
description: Username to authenticate to Kafka
description: Username to authenticate to Kafka
type: string
x-descriptors:
- urn:camel:group:credentials
Expand Down Expand Up @@ -117,10 +117,25 @@ spec:
x-descriptors:
- urn:keda:metadata:consumerGroup
- urn:keda:required
deserializeHeaders:
title: Automatically Deserialize Headers
description: When enabled the Kamelet source will deserialize all message headers to String representation.
type: boolean
x-descriptors:
- 'urn:alm:descriptor:com.tectonic.ui:checkbox'
default: false
dependencies:
- github:apache.camel-kamelets:camel-kamelets-utils:main-SNAPSHOT
- "camel:core"
- "camel:kafka"
- "camel:kamelet"
template:
beans:
- name: kafkaHeaderDeserializer
type: "#class:org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer"
property:
- key: enabled
value: '{{deserializeHeaders}}'
from:
uri: "kafka:{{topic}}"
parameters:
Expand All @@ -134,4 +149,6 @@ spec:
autoOffsetReset: "{{autoOffsetReset}}"
groupId: "{{?consumerGroup}}"
steps:
- to: "kamelet:sink"
- process:
ref: "{{kafkaHeaderDeserializer}}"
- to: "kamelet:sink"
28 changes: 28 additions & 0 deletions test/kafka/install.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/sh

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Install Kafka
kubectl create -f https://strimzi.io/install/latest?namespace=default

# Apply the `Kafka` Cluster CR file
kubectl apply -f https://strimzi.io/examples/latest/kafka/kafka-ephemeral-single.yaml

# wait for everything to start
kubectl wait kafka/my-cluster --for=condition=Ready --timeout=300s

# create default topic
kubectl apply -f https://strimzi.io/examples/latest/topic/kafka-topic.yaml
42 changes: 42 additions & 0 deletions test/kafka/kafka-sink-test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# ---------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ---------------------------------------------------------------------------

apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
name: kafka-sink-test
spec:
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
name: timer-source
properties:
period: 5000
contentType: application/json
message: ${message}
sink:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
name: kafka-sink
properties:
bootstrapServers: ${bootstrap.server.host}.${YAKS_NAMESPACE}:${bootstrap.server.port}
user: ${user}
password: ${password}
topic: ${topic}
securityProtocol: ${securityProtocol}