Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Camel Kafka Connector - Azure EventHub Source #1358

Open
anuragp89 opened this issue Apr 19, 2022 · 8 comments
Open

Camel Kafka Connector - Azure EventHub Source #1358

anuragp89 opened this issue Apr 19, 2022 · 8 comments

Comments

@anuragp89
Copy link

Hi Team,

I have successfully setup Camel Kafka Connector - Azure EventHub Source 1.0 build running on kubernetes environment. But every time a new message becomes available in Event Hub, the message received in target kafka topic still shows encrypted payload like this :

{"schema":{"type":"bytes","optional":false},"payload":"ImV5SjBiM0JwWXlJNmV5SnVZVzFsYzNCaFkyVWlPaUp6Y0VKMk1TNHdJaXdpWldSblpVNXZaR1ZFWlhOamNtbHdkRzl5SWpvaVZHVnpkRjlIY205MWNFbEVMMHhwYm1VdE1pSXNJbWR5YjNWd1NXUWlPaUpVWlhOMFgwZHliM1Z3U1VRaUxDSmxaR2RsVG05a1pVbGtJam9pVEdsdVpTMHlJaXdpZEhsd1pTSTZJazVFUVZSQkluMHNJbkJoZVd4dllXUWlPbnNpZEdsdFpYTjBZVzF3SWpveE5qVXdNekF6T0RJMU16QTBMQ0p0WlhSeWFXTnpJanBiZXlKdVlXMWxJam9pVFdGamFHbHVaUzAyTDFSbGJYQmxjbUYwZFhKbElpd2lkR2x0WlhOMFlXMXdJam94TmpVd016QXpPREkwTWprNUxDSmtZWFJoVkhsd1pTSTZJa1pzYjJGMElpd2lkbUZzZFdVaU9qZ3VNSDFkTENKelpYRWlPak45ZlE9PSI="}

Below is the connector configuration I am using :

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: camel-azure-eventhub-source-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: org.apache.camel.kafkaconnector.azureeventhubssource.CamelAzureeventhubssourceSourceConnector
tasksMax: 1
config:
topics: iot
camel.kamelet.azure-eventhubs-source.namespaceName: kafkalinkns
camel.kamelet.azure-eventhubs-source.eventhubName: kafkainput
camel.kamelet.azure-eventhubs-source.sharedAccessName: iothubroutes_mfi-iot-hub
camel.kamelet.azure-eventhubs-source.sharedAccessKey: *****
camel.kamelet.azure-eventhubs-source.blobAccountName: kafkaconnectdemo
camel.kamelet.azure-eventhubs-source.blobContainerName: eventhubkafkaconnect
camel.kamelet.azure-eventhubs-source.blobAccessKey: *****
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
errors.log.enable: true
errors.log.include.messages: true

I believe it is something related to how the messages are encrypted in Event Hub internally, but ideally the clients when consuming the messages with shared access key get the plain text messages. Does it require a special value.converter for this scenario, I couldn't find any configuration setting to fix it. Please help.

Thanks,
Anurag

@anuragp89
Copy link
Author

anuragp89 commented Apr 19, 2022

After changing the value.converter as "org.apache.kafka.connect.converters.ByteArrayConverter" I am observing that the messages received on the topic are still Base64 encoded message. When I run " base64 --decode " command I can see the exact json message.

With my earlier configuration with value.converter as "org.apache.kafka.connect.json.JsonConverter", I verified the payload field is twice Base64 encoded. So if I run " base64 --decode " command on the payload, I get the correct plain text json format message.

@jakubmalek
Copy link
Contributor

jakubmalek commented Sep 6, 2022

The issue seems to be related to enforced json format in azure-eventhubs-source.kamelet.yaml
The base64 encoding is applied in MarshalProcessor with JacksonDataFormat in here.

It would be nice to be able to disable default marshalling configuration or provide data-format that doesn't do anything.

@oscerd
Copy link
Contributor

oscerd commented Sep 6, 2022

This is done because in kamelets world we are trying to transform the output to a sink kamelet to JSON, for easily manipulating it. Kamelets are building block for integrating source and sink and camel-kafka-connector is consuming them for creating kafka-connect connector based on them. it's not something we plan to make configurable.

@jakubmalek
Copy link
Contributor

jakubmalek commented Sep 7, 2022

Hi @oscerd,
Thanks for replying. I understand that, but the problem is that Event Hub consumer is posting message body in bytes. So, even though the payload could already be JSON it's being converted into Base64 by the Jackson mapper. In terms of configuration, I know you can set the date-format with camel.source.marshal property, but as far as I know, there is no data format that supports keeping original format. So in my case, I either have to write my own Kafka Connect converter decoding base64 or implement Camel data format that writes original bytes back to output stream.

From my perspective the common use case for the connector is to keep the original message format. It shouldn't matter whether it's JSON, XML or plain text. But on top of that, you should have option to configure Camel data format or apply transformation in Kafka Connect transform/converter API.

@jakubmalek
Copy link
Contributor

I run simple test with overridden kamelets/azure-eventhubs-source.kamelet.yaml file in the classpath.
I've removed this section:

 - marshal:
      json: {}

It worked as expected, the byte array event data from Event Hub was passed to connector "as it is", and it was put in same format to Kafka topic with org.apache.kafka.connect.converters.ByteArrayConverter.

My suggestion is to remove pre-configured marshalling configuration from connector kamelet files and allow option to configure them e.g. camel.source.marshal: json-jackson.

@oscerd
Copy link
Contributor

oscerd commented Sep 8, 2022

There is no option for marshalling with that option in kamelets. I'm fine with removing it, but we need to explicitly add the output format information in the metadata. I'll link the issue here.

@jakubmalek
Copy link
Contributor

Thanks @oscerd , generally I prefer to fix the problem with PR, rather than asking. But it looks like it's more about general design of the component. I'm curious to know what is the issue with metadata though, I didn't see any problem with my test.

@oscerd
Copy link
Contributor

oscerd commented Sep 8, 2022

Metadata is a bit misleading, sorry. I just to meant to say that this line:
https://github.com/apache/camel-kamelets/blob/main/kamelets/azure-eventhubs-source.kamelet.yaml#L85

need to change to octet-stream.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants