Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding indexId to ElasticSearch Sink kamelet. #273

Merged
merged 1 commit into from
May 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions docs/modules/ROOT/pages/elasticsearch-index-sink.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,26 @@

*Provided by: "Apache Software Foundation"*

Insert data into ElasticSearch. Input data must have JSON format.
This sink stores documents into ElasticSearch.

Input data must have JSON format according to the index used.

If the *indexId* parameter is set, that value will be used as the document ID on ElasticSearch.

If the *indexId* parameter is not set and the source of the kamelet binding is a Kafka broker, it will take the kafka topic, partition and offset of the
element to generate an automatic ID that warrantees that this element is processed only once.

== Configuration Options

The following table summarizes the configuration options available for the `elasticsearch-index-sink` Kamelet:
[width="100%",cols="2,^2,3,^2,^2,^3",options="header"]
|===
| Property| Name| Description| Type| Default| Example
| *clusterName {empty}* *| ElasticSearch Cluster Name| Name of the cluster.| string| |
| *hostAddresses {empty}* *| Host Addresses| Comma separated list with ip:port formatted remote transport addresses to use.| string| |
| *indexName {empty}* *| Index in ElasticSearch| The name of the index to act against.| string| |
| *clusterName {empty}* *| ElasticSearch Cluster Name| Name of the cluster.| string| | `"quickstart"`
| *hostAddresses {empty}* *| Host Addresses| Comma separated list with ip:port formatted remote transport addresses to use.| string| | `"quickstart-es-http:9200"`
| *indexName {empty}* *| Index in ElasticSearch| The name of the index to act against.| string| | `"data"`
| enableSSL| Enable SSL| Do we want to connect using SSL?| boolean| `true`|
| indexId| Index ID| None| string| `"NONE"`|
| password| Password| Password to connect to ElasticSearch.| string| |
| user| Username| Username to connect to ElasticSearch.| string| |
|===
Expand Down Expand Up @@ -48,9 +56,9 @@ spec:
apiVersion: camel.apache.org/v1alpha1
name: elasticsearch-index-sink
properties:
clusterName: "The ElasticSearch Cluster Name"
hostAddresses: "The Host Addresses"
indexName: "The Index in ElasticSearch"
clusterName: "quickstart"
hostAddresses: "quickstart-es-http:9200"
indexName: "data"

----

Expand Down
33 changes: 30 additions & 3 deletions elasticsearch-index-sink.kamelet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,14 @@ spec:
definition:
title: "ElasticSearch Index Sink"
description: |-
Insert data into ElasticSearch. Input data must have JSON format.
This sink stores documents into ElasticSearch.

Input data must have JSON format according to the index used.

If the *indexId* parameter is set, that value will be used as the document ID on ElasticSearch.

If the *indexId* parameter is not set and the source of the kamelet binding is a Kafka broker, it will take the kafka topic, partition and offset of the
element to generate an automatic ID that warrantees that this element is processed only once.
required:
- clusterName
- indexName
Expand All @@ -42,19 +49,27 @@ spec:
type: boolean
default: true
x-descriptors:
- 'urn:alm:descriptor:com.tectonic.ui:checkbox'
- 'urn:alm:descriptor:com.tectonic.ui:checkbox'
hostAddresses:
title: Host Addresses
description: Comma separated list with ip:port formatted remote transport addresses to use.
type: string
example: quickstart-es-http:9200
indexName:
title: Index in ElasticSearch
description: The name of the index to act against.
type: string
example: data
clusterName:
title: ElasticSearch Cluster Name
description: Name of the cluster.
type: string
example: quickstart
indexId:
title: Index ID
description: None
type: string
default: "NONE"
types:
out:
mediaType: application/json
Expand All @@ -70,6 +85,18 @@ spec:
from:
uri: kamelet:source
steps:
- choice:
when:
- simple: "'{{indexId}}' == 'NONE' && ${header[kafka.TOPIC]} != null"
steps:
- set-header:
name: "indexId"
simple: "${header[kafka.TOPIC]}${header[kafka.PARTITION]}${header[kafka.OFFSET]}"
- simple: "'{{indexId}}' != 'NONE'"
steps:
- set-header:
name: "indexId"
simple: "{{indexId}}"
- to:
uri: "kamelet-reify:elasticsearch-rest:{{clusterName}}"
parameters:
Expand All @@ -80,4 +107,4 @@ spec:
user: "{{user}}"
password: "{{password}}"
- marshal:
json: {}
json: { }