Skip to content

Commit

Permalink
Adding indexId to ElasticSearch Sink kamelet. Auto-generated if coming
Browse files Browse the repository at this point in the history
from kafka and nothing else defined
  • Loading branch information
Delawen authored and oscerd committed May 11, 2021
1 parent dd55612 commit 7341c4d
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 10 deletions.
22 changes: 15 additions & 7 deletions docs/modules/ROOT/pages/elasticsearch-index-sink.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,26 @@

*Provided by: "Apache Software Foundation"*

Insert data into ElasticSearch. Input data must have JSON format.
This sink stores documents into ElasticSearch.

Input data must have JSON format according to the index used.

If the *indexId* parameter is set, that value will be used as the document ID on ElasticSearch.

If the *indexId* parameter is not set and the source of the kamelet binding is a Kafka broker, it will take the kafka topic, partition and offset of the
element to generate an automatic ID that warrantees that this element is processed only once.

== Configuration Options

The following table summarizes the configuration options available for the `elasticsearch-index-sink` Kamelet:
[width="100%",cols="2,^2,3,^2,^2,^3",options="header"]
|===
| Property| Name| Description| Type| Default| Example
| *clusterName {empty}* *| ElasticSearch Cluster Name| Name of the cluster.| string| |
| *hostAddresses {empty}* *| Host Addresses| Comma separated list with ip:port formatted remote transport addresses to use.| string| |
| *indexName {empty}* *| Index in ElasticSearch| The name of the index to act against.| string| |
| *clusterName {empty}* *| ElasticSearch Cluster Name| Name of the cluster.| string| | `"quickstart"`
| *hostAddresses {empty}* *| Host Addresses| Comma separated list with ip:port formatted remote transport addresses to use.| string| | `"quickstart-es-http:9200"`
| *indexName {empty}* *| Index in ElasticSearch| The name of the index to act against.| string| | `"data"`
| enableSSL| Enable SSL| Do we want to connect using SSL?| boolean| `true`|
| indexId| Index ID| None| string| `"NONE"`|
| password| Password| Password to connect to ElasticSearch.| string| |
| user| Username| Username to connect to ElasticSearch.| string| |
|===
Expand Down Expand Up @@ -48,9 +56,9 @@ spec:
apiVersion: camel.apache.org/v1alpha1
name: elasticsearch-index-sink
properties:
clusterName: "The ElasticSearch Cluster Name"
hostAddresses: "The Host Addresses"
indexName: "The Index in ElasticSearch"
clusterName: "quickstart"
hostAddresses: "quickstart-es-http:9200"
indexName: "data"
----

Expand Down
33 changes: 30 additions & 3 deletions elasticsearch-index-sink.kamelet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,14 @@ spec:
definition:
title: "ElasticSearch Index Sink"
description: |-
Insert data into ElasticSearch. Input data must have JSON format.
This sink stores documents into ElasticSearch.
Input data must have JSON format according to the index used.
If the *indexId* parameter is set, that value will be used as the document ID on ElasticSearch.
If the *indexId* parameter is not set and the source of the kamelet binding is a Kafka broker, it will take the kafka topic, partition and offset of the
element to generate an automatic ID that warrantees that this element is processed only once.
required:
- clusterName
- indexName
Expand All @@ -42,19 +49,27 @@ spec:
type: boolean
default: true
x-descriptors:
- 'urn:alm:descriptor:com.tectonic.ui:checkbox'
- 'urn:alm:descriptor:com.tectonic.ui:checkbox'
hostAddresses:
title: Host Addresses
description: Comma separated list with ip:port formatted remote transport addresses to use.
type: string
example: quickstart-es-http:9200
indexName:
title: Index in ElasticSearch
description: The name of the index to act against.
type: string
example: data
clusterName:
title: ElasticSearch Cluster Name
description: Name of the cluster.
type: string
example: quickstart
indexId:
title: Index ID
description: None
type: string
default: "NONE"
types:
out:
mediaType: application/json
Expand All @@ -70,6 +85,18 @@ spec:
from:
uri: kamelet:source
steps:
- choice:
when:
- simple: "'{{indexId}}' == 'NONE' && ${header[kafka.TOPIC]} != null"
steps:
- set-header:
name: "indexId"
simple: "${header[kafka.TOPIC]}${header[kafka.PARTITION]}${header[kafka.OFFSET]}"
- simple: "'{{indexId}}' != 'NONE'"
steps:
- set-header:
name: "indexId"
simple: "{{indexId}}"
- to:
uri: "kamelet-reify:elasticsearch-rest:{{clusterName}}"
parameters:
Expand All @@ -80,4 +107,4 @@ spec:
user: "{{user}}"
password: "{{password}}"
- marshal:
json: {}
json: { }

0 comments on commit 7341c4d

Please sign in to comment.