Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

camel-elasticsearch-rest-kafka-connector indexId #1192

Closed
seravat opened this issue Jun 3, 2021 · 16 comments
Closed

camel-elasticsearch-rest-kafka-connector indexId #1192

seravat opened this issue Jun 3, 2021 · 16 comments

Comments

@seravat
Copy link

seravat commented Jun 3, 2021

Hello,

According to the camel docs for the camel-elasticsearch-rest component: https://camel.apache.org/components/3.4.x/elasticsearch-rest-component.html

When using the Index operation, we can set the document id by using the header "indexId".

Is this header being ignored in the camel-elasticsearch-rest-kafka-connector ?

@oscerd
Copy link
Contributor

oscerd commented Jun 4, 2021

No, you can use the header.

You just need to use the prefix "CamelHeader.", so it will be something like "CamelHeader.indexId" in case you're using the sink connector.

So you'll need to add an header to your Kafka message with that name.

@seravat
Copy link
Author

seravat commented Jun 4, 2021

Thanks for the response Andrea,

I am setting this header before sending to kafka:

exchange.getIn().setHeader("indexId", exchangeId);

So you are saying it should be: ?

exchange.getIn().setHeader("CamelHeader.indexId", exchangeId);

Is that right?

@oscerd
Copy link
Contributor

oscerd commented Jun 4, 2021

Are you using the kafka-connect connector related to the component or something else? Because you should not use plain Camel code in case of Kafka connector usage, so I suspect your question is related to plain Camel.

@seravat
Copy link
Author

seravat commented Jun 4, 2021

So we have some apps sending data to a few kafka topics and then we have a Kafka Connect cluster with camel-kafka-elastic sink connectors feeding data to elastic.

@oscerd
Copy link
Contributor

oscerd commented Jun 4, 2021

If you have the are consuming from a kafka topic through the ES kafka connector sink, you'll need to have a kafka header in your Kafka record, named CamelHeader.indexId, this will be transformed before arriving the ES component to indexId only.

@seravat
Copy link
Author

seravat commented Jun 4, 2021

Tried that already, I must be doing something weird.

So, when I was sending a "indexId" header to the kafka topic, we had this message:

  Value (123 bytes): {"id":"223C079749B9779-0000000000000001","ftpFolder":"/mnt/m2/inflight/systemtest","numberOfRecords":597,"status":"ACTIVE"}
  Timestamp: 1622795526830
  Partition: 3
  Offset: 125
  Headers: ClientFileName=TRAX_SFTR_TRAN_SFTR002_20210511_6074.csv-9fd29f92-8518-4658-985a-4dd734a5be11,ClientFolderId=systemtest,ftp_folder=/mnt/m2/inflight/systemtest,indexId=223C079749B9779-0000000000000001,payload_id=223C079749B9779-0000000000000001,transactionId=MONNETTEDCOLU3166TEST10QA1REG6074
% Reached end of topic sftr-payload [3] at offset 126

If I try to send the "CamelHeader.indexId" header, then nothing related with this exists.

Key (27 bytes): /mnt/m2/inflight/systemtest
  Value (123 bytes): {"id":"223C079749B9779-0000000000000004","ftpFolder":"/mnt/m2/inflight/systemtest","numberOfRecords":597,"status":"ACTIVE"}
  Timestamp: 1622796256243
  Partition: 3
  Offset: 131
  Headers: ClientFileName=TRAX_SFTR_TRAN_SFTR002_20210511_6074.csv-596707e7-e68f-44bd-8c8f-e14945aab5c7,ClientFolderId=systemtest,ftp_folder=/mnt/m2/inflight/systemtest,payload_id=223C079749B9779-0000000000000004,transactionId=MONNETTEDCOLU3166TEST10QA1REG6074

I know it is a camel "issue", any ideas? 🤷🏼

@oscerd
Copy link
Contributor

oscerd commented Jun 4, 2021

What is the camel kafka connector version?

@seravat
Copy link
Author

seravat commented Jun 4, 2021

camel-elasticsearch-rest-kafka-connector:0.7.1.fuse-800004-redhat-00001

@oscerd
Copy link
Contributor

oscerd commented Jun 4, 2021

For Red Hat product you need to ask to Red Hat support.

@oscerd
Copy link
Contributor

oscerd commented Jun 4, 2021

By the way I'll have a look with the same 0.7.1 from upstream. Can you please show the kafka connector configuration too?

@seravat
Copy link
Author

seravat commented Jun 4, 2021

Sure! By the way, we are using the Strimzi/AMQ streams operator

spec:
  class: >-
    org.apache.camel.kafkaconnector.elasticsearchrest.CamelElasticsearchrestSinkConnector
  config:
    key.ignore: false
    topics: sftr-payload
    camel.sink.endpoint.hostAddresses: 'sftr-es-es-http:9200'
    transforms: 'createKey,extractString'
    camel.sink.endpoint.indexName: sftr-payload
    camel.component.elasticsearch-rest.enableSSL: false
    errors.tolerance: all
    camel.component.elasticsearch-rest.user: elastic
    errors.log.enable: true
    key.converter: org.apache.kafka.connect.storage.StringConverter
    camel.sink.endpoint.operation: Index
    camel.sink.endpoint.enableSSL: false
    errors.deadletterqueue.topic.name: dlq-sftr-payload
    errors.deadletterqueue.context.headers.enable: true
    errors.deadletterqueue.topic.replication.factor: 1
    camel.component.elasticsearch-rest.password: 1z9Z36u0K1emWIKBR514oj3T
    camel.sink.path.clusterName: sftr-es
  tasksMax: 2

@valdar
Copy link
Member

valdar commented Jun 4, 2021

Hello @seravat

Thanks for the response Andrea,

I am setting this header before sending to kafka:

I assume you are sending to kafka using a traditional camel route?

exchange.getIn().setHeader("indexId", exchangeId);

So you are saying it should be: ?

exchange.getIn().setHeader("CamelHeader.indexId", exchangeId);

Is that right?

It depends on how you produce event in the topic; the goal is that the produced event on the topic should have an Header named CamelHeader.indexId with value exchangeId.

@seravat
Copy link
Author

seravat commented Jun 4, 2021

I assume you are sending to kafka using a traditional camel route?

Yes, I am using a traditional camel route. It has a processor that sets that header and other stuff, then it uses the kafka component to send the message.

  • When I set the header key "indexId" I can see it in the kafka headers.
  • When I set the header key "CamelHeader.indexId" I cannot see it in the kafka headers.

Maybe camel is removing the header when sending or kafka is ignoring when receiving. Can we send headers to kafka with the "string.string" notation?

bitmoji

@valdar
Copy link
Member

valdar commented Jun 4, 2021

Yes, the default header filter strategy https://github.com/apache/camel/blob/b10d6be0bcb232c76402b200046c95552d1c488a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderFilterStrategy.java remove each header starting with "Camel" or "camel" or "org.apache.camel." or "kafka."

So you should implement a custom HeaderFilterStrategy that let "CamelHeader.indexId" pass and instruct camel-kafka component to use it through the headerFilterStrategy option (see https://camel.apache.org/components/3.4.x/kafka-component.html for more details)

I hope it makes sense, don't hesitate to further ask if not.

@seravat
Copy link
Author

seravat commented Jun 4, 2021

Yes!!! Makes sense, I will test and share 🥇

@seravat
Copy link
Author

seravat commented Jun 7, 2021

It worked! Thanks for the help guys, very appreciated ⭐️ ⭐️ ⭐️ ⭐️ ⭐️

@seravat seravat closed this as completed Jun 7, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants