diff --git a/docs/modules/ROOT/pages/elasticsearch-index-sink.adoc b/docs/modules/ROOT/pages/elasticsearch-index-sink.adoc index bd5b26427..200c7173d 100644 --- a/docs/modules/ROOT/pages/elasticsearch-index-sink.adoc +++ b/docs/modules/ROOT/pages/elasticsearch-index-sink.adoc @@ -3,7 +3,14 @@ *Provided by: "Apache Software Foundation"* -Insert data into ElasticSearch. Input data must have JSON format. +This sink stores documents into ElasticSearch. + +Input data must have JSON format according to the index used. + +If the *indexId* parameter is set, that value will be used as the document ID on ElasticSearch. + +If the *indexId* parameter is not set and the source of the kamelet binding is a Kafka broker, it will take the kafka topic, partition and offset of the +element to generate an automatic ID that warrantees that this element is processed only once. == Configuration Options @@ -11,10 +18,11 @@ The following table summarizes the configuration options available for the `elas [width="100%",cols="2,^2,3,^2,^2,^3",options="header"] |=== | Property| Name| Description| Type| Default| Example -| *clusterName {empty}* *| ElasticSearch Cluster Name| Name of the cluster.| string| | -| *hostAddresses {empty}* *| Host Addresses| Comma separated list with ip:port formatted remote transport addresses to use.| string| | -| *indexName {empty}* *| Index in ElasticSearch| The name of the index to act against.| string| | +| *clusterName {empty}* *| ElasticSearch Cluster Name| Name of the cluster.| string| | `"quickstart"` +| *hostAddresses {empty}* *| Host Addresses| Comma separated list with ip:port formatted remote transport addresses to use.| string| | `"quickstart-es-http:9200"` +| *indexName {empty}* *| Index in ElasticSearch| The name of the index to act against.| string| | `"data"` | enableSSL| Enable SSL| Do we want to connect using SSL?| boolean| `true`| +| indexId| Index ID| None| string| `"NONE"`| | password| Password| Password to connect to ElasticSearch.| string| | | user| Username| Username to connect to ElasticSearch.| string| | |=== @@ -48,9 +56,9 @@ spec: apiVersion: camel.apache.org/v1alpha1 name: elasticsearch-index-sink properties: - clusterName: "The ElasticSearch Cluster Name" - hostAddresses: "The Host Addresses" - indexName: "The Index in ElasticSearch" + clusterName: "quickstart" + hostAddresses: "quickstart-es-http:9200" + indexName: "data" ---- diff --git a/elasticsearch-index-sink.kamelet.yaml b/elasticsearch-index-sink.kamelet.yaml index 56d4dff0a..41ad4318c 100755 --- a/elasticsearch-index-sink.kamelet.yaml +++ b/elasticsearch-index-sink.kamelet.yaml @@ -21,7 +21,14 @@ spec: definition: title: "ElasticSearch Index Sink" description: |- - Insert data into ElasticSearch. Input data must have JSON format. + This sink stores documents into ElasticSearch. + + Input data must have JSON format according to the index used. + + If the *indexId* parameter is set, that value will be used as the document ID on ElasticSearch. + + If the *indexId* parameter is not set and the source of the kamelet binding is a Kafka broker, it will take the kafka topic, partition and offset of the + element to generate an automatic ID that warrantees that this element is processed only once. required: - clusterName - indexName @@ -42,19 +49,27 @@ spec: type: boolean default: true x-descriptors: - - 'urn:alm:descriptor:com.tectonic.ui:checkbox' + - 'urn:alm:descriptor:com.tectonic.ui:checkbox' hostAddresses: title: Host Addresses description: Comma separated list with ip:port formatted remote transport addresses to use. type: string + example: quickstart-es-http:9200 indexName: title: Index in ElasticSearch description: The name of the index to act against. type: string + example: data clusterName: title: ElasticSearch Cluster Name description: Name of the cluster. type: string + example: quickstart + indexId: + title: Index ID + description: None + type: string + default: "NONE" types: out: mediaType: application/json @@ -70,6 +85,18 @@ spec: from: uri: kamelet:source steps: + - choice: + when: + - simple: "'{{indexId}}' == 'NONE' && ${header[kafka.TOPIC]} != null" + steps: + - set-header: + name: "indexId" + simple: "${header[kafka.TOPIC]}${header[kafka.PARTITION]}${header[kafka.OFFSET]}" + - simple: "'{{indexId}}' != 'NONE'" + steps: + - set-header: + name: "indexId" + simple: "{{indexId}}" - to: uri: "kamelet-reify:elasticsearch-rest:{{clusterName}}" parameters: @@ -80,4 +107,4 @@ spec: user: "{{user}}" password: "{{password}}" - marshal: - json: {} + json: { }