Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to set Map property via the kafka connector configuration #323

Closed
rgannu opened this issue Jul 17, 2020 · 12 comments
Closed

How to set Map property via the kafka connector configuration #323

rgannu opened this issue Jul 17, 2020 · 12 comments

Comments

@rgannu
Copy link
Contributor

rgannu commented Jul 17, 2020

This is definitely not a bug but I do not know how to set map via properties/json in the kafka sink connector configuration.
https://camel.apache.org/camel-kafka-connector/latest/connectors/camel-rabbitmq-kafka-sink-connector.html

Can you please explain how to set the camel.sink.endpoint.clientProperties ?
The error says I need to set as a Bean. In that case can you please explain how to do ?

"camel.sink.endpoint.clientProperties": "#bean:clientPropertiesBean",

I get the following error while starting the sink connector configuration.

Caused by: org.apache.camel.ResolveEndpointFailedException: Failed to resolve endpoint: rabbitmq://rpas-exchange?allowCustomHeaders=false&autoDelete=false&clientProperties=#bean:clientPropertiesBean&exchangeType=topic&hostname=rabbitmq&lazyStartProducer=true&password=xxxxxx&portNumber=5672&routingKey=analytics&username=guest due to: No bean could be found in the registry for: clientPropertiesBean of type: java.util.Map
        at org.apache.camel.impl.engine.AbstractCamelContext.doGetEndpoint(AbstractCamelContext.java:874)
        at org.apache.camel.impl.engine.AbstractCamelContext.getEndpoint(AbstractCamelContext.java:764)
        at org.apache.camel.support.CamelContextHelper.resolveEndpoint(CamelContextHelper.java:122)
        at org.apache.camel.reifier.SendReifier.resolveEndpoint(SendReifier.java:43)
        at org.apache.camel.reifier.SendReifier.createProcessor(SendReifier.java:36)
        at org.apache.camel.reifier.ProcessorReifier.makeProcessor(ProcessorReifier.java:766)
        at org.apache.camel.reifier.ProcessorReifier.addRoutes(ProcessorReifier.java:511)
        at org.apache.camel.reifier.RouteReifier.doCreateRoute(RouteReifier.java:391)
        ... 15 more
Caused by: org.apache.camel.NoSuchBeanException: No bean could be found in the registry for: clientPropertiesBean of type: java.util.Map
        at org.apache.camel.support.CamelContextHelper.mandatoryLookupAndConvert(CamelContextHelper.java:252)
        at org.apache.camel.support.EndpointHelper.resolveReferenceParameter(EndpointHelper.java:264)
        at org.apache.camel.support.EndpointHelper.resolveReferenceParameter(EndpointHelper.java:245)
        at org.apache.camel.support.DefaultComponent.resolveAndRemoveReferenceParameter(DefaultComponent.java:622)
        at org.apache.camel.component.rabbitmq.RabbitMQComponent.createEndpoint(RabbitMQComponent.java:195)
        at org.apache.camel.component.rabbitmq.RabbitMQComponent.createEndpoint(RabbitMQComponent.java:35)
        at org.apache.camel.support.DefaultComponent.createEndpoint(DefaultComponent.java:233)
        at org.apache.camel.impl.engine.AbstractCamelContext.doGetEndpoint(AbstractCamelContext.java:842)
        ... 22 more
@davsclaus
Copy link
Contributor

Oh yeah that is a good question that we could look at making easier to configure.

So you can use

camel.beans.xxx[key]=value
camel.beans.xxx[key2]=value2

style to define the map bean with the name xxx

So it should be something like

camel.beans.clientPropertiesBean[key]=value
camel.beans.clientPropertiesBean[key2]=value2

@davsclaus
Copy link
Contributor

Also we should update the docs about this.

The camel kafka connector runs via camel-main that has a set of options (however many of listed currently would not be so relevant for camel kafka connector) - but its a place in the docs where we should document the camel.beans style
https://camel.apache.org/components/latest/others/main.html

@davsclaus
Copy link
Contributor

Created a ticket
https://issues.apache.org/jira/browse/CAMEL-15389

@rgannu
Copy link
Contributor Author

rgannu commented Aug 28, 2020

Hi @davsclaus
I think you got carried away when I asked about how to configure camel beans.
Fine for me to document via the ticket https://issues.apache.org/jira/browse/CAMEL-15389

Still the initial problem for me exists.
i.e., How I can set a map of headers via the sink connector configuration ?
I had a look into the source code of camel sink connector and I can see that it only processes the header from the kafka message (sink record) and processes it. (https://github.com/apache/camel-kafka-connector/blob/master/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java#L140)

My proposal is to give support there to add additional headers, so that sink connector implementation classes (like CamelRabbitMQSinkTask) can if they want add the additional headers. This would be quite useful for any sink connectors which want to add additional headers on the fly via sink connector configuration itself.
To add the additional headers, I could also write kafka SMT but that means that I am extracting the message every time in the SMT and it could be less performant.

I will submit a PR for this. Please review the same and give your valuable comments.

@davsclaus
Copy link
Contributor

Hi @rgannu

That sounds good

rgannu added a commit to rgannu/camel-kafka-connector that referenced this issue Sep 2, 2020
apache#323 Added support for additional headers and properties support

The additional headers will be added on top of the existing headers
from the message.

The properties added are only basic AMQP properties as defined in the
{@link com.rabbitmq.client.AMQP.BasicProperties.Builder}. When the
message contains already these properties then will be considered and
these additional properties will be ignored.
@rgannu
Copy link
Contributor Author

rgannu commented Sep 2, 2020

@oscerd
Copy link
Contributor

oscerd commented Sep 2, 2020

I do not think that is something rleated to camel plain, there wasn't any need of adding an issue in the JIRA for this, this issue was enough.

@oscerd
Copy link
Contributor

oscerd commented Sep 2, 2020

Ok, now I'm getting what you're doing. You need to modify the camel component to have new options added to the connector. Also since this is new stuff, this would be done only on camel 3.6.0. Anyway, that commit is just a modification of properties, the under the hood component won't do anything related to those options.

@oscerd
Copy link
Contributor

oscerd commented Sep 2, 2020

Also, at the next build, the connector configuration will be overriden.

rgannu added a commit to rgannu/camel-kafka-connector that referenced this issue Sep 7, 2020
apache#323 Added support for additional headers and properties support

The additional headers will be added on top of the existing headers
from the message.

The properties added are only basic AMQP properties as defined in the
{@link com.rabbitmq.client.AMQP.BasicProperties.Builder}. When the
message contains already these properties then will be considered and
these additional properties will be ignored.
@tadayosi
Copy link
Member

tadayosi commented Mar 9, 2021

Just for checking, is it already done? Can we close this?

@oscerd oscerd closed this as completed Mar 9, 2021
@rgannu
Copy link
Contributor Author

rgannu commented Mar 9, 2021

Just for confirming here so that others who step into this problem can work. Things to do for passing map data via connector JSON configuration.

Works like a gem. Thanks @oscerd for suggesting the camel-archetype.

@barov
Copy link

barov commented Jun 11, 2022

The headers are not visible in RabbitMQ with the next configuration.

"camel.sink.endpoint.allowCustomHeaders": "true",
"camel.sink.endpoint.additionalHeaders": "#bean:addHeaders",
"camel.beans.addHeaders": "#class:camel.SimpleDataHolderBean",
"camel.beans.addHeaders.mapData[CONTENT_TYPE]": "plain/text"

Can i give the headers value in a string?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants