Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elasticsearch Index Sink: User/password optional, fixed headers #424

Merged
merged 9 commits into from
Jul 14, 2021
11 changes: 5 additions & 6 deletions docs/modules/ROOT/pages/elasticsearch-index-sink.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ This sink stores documents into ElasticSearch.

Input data must have JSON format according to the index used.

If the *indexId* parameter is set, that value will be used as the document ID on ElasticSearch.
- `indexId` / `ce-indexId`: as the index ID for Elasticsearch

If the *indexId* parameter is not set and the source of the kamelet binding is a Kafka broker, it will take the kafka topic, partition and offset of the
element to generate an automatic ID that warrantees that this element is processed only once.
If the header won't be set the exchange ID will be used as index.

If the *indexName* parameter is not set and the source of the kamelet binding is a Kafka broker, it will take the kafka topic as the indexName.
- `indexName` / `ce-indexName`: as the index Name for Elasticsearch

If the header won't be set the exchange ID will be used as index name.

== Configuration Options

Expand All @@ -25,8 +26,6 @@ The following table summarizes the configuration options available for the `elas
| *clusterName {empty}* *| ElasticSearch Cluster Name| Name of the cluster.| string| | `"quickstart"`
| *hostAddresses {empty}* *| Host Addresses| Comma separated list with ip:port formatted remote transport addresses to use.| string| | `"quickstart-es-http:9200"`
| enableSSL| Enable SSL| Do we want to connect using SSL?| boolean| `true`|
| indexId| Index ID| None| string| `"NONE"`|
| indexName| Index in ElasticSearch| The name of the index to act against.| string| `"NONE"`| `"data"`
| password| Password| Password to connect to ElasticSearch.| string| |
| user| Username| Username to connect to ElasticSearch.| string| |
|===
Expand Down
7 changes: 5 additions & 2 deletions docs/modules/ROOT/pages/infinispan-source.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The following table summarizes the configuration options available for the `infi
[width="100%",cols="2,^2,3,^2,^2,^3",options="header"]
|===
| Property| Name| Description| Type| Default| Example
| *cacheName {empty}* *| Cache Name| The name of the Infinispan cache to use| String| |
| *hosts {empty}* *| Hosts| Specifies the host of the cache on Infinispan instance| String| |
| *password {empty}* *| Password| Password to connect to Infinispan.| string| |
| *username {empty}* *| Username| Username to connect to Infinispan.| string| |
Expand Down Expand Up @@ -47,6 +48,7 @@ spec:
apiVersion: camel.apache.org/v1alpha1
name: infinispan-source
properties:
cacheName: "The Cache Name"
hosts: "The Hosts"
password: "The Password"
username: "The Username"
Expand Down Expand Up @@ -75,7 +77,7 @@ The procedure described above can be simplified into a single execution of the `

[source,shell]
----
kamel bind infinispan-source -p "source.hosts=The Hosts" -p "source.password=The Password" -p "source.username=The Username" channel/mychannel
kamel bind infinispan-source -p "source.cacheName=The Cache Name" -p "source.hosts=The Hosts" -p "source.password=The Password" -p "source.username=The Username" channel/mychannel
----

This will create the KameletBinding under the hood and apply it to the current namespace in the cluster.
Expand All @@ -98,6 +100,7 @@ spec:
apiVersion: camel.apache.org/v1alpha1
name: infinispan-source
properties:
cacheName: "The Cache Name"
hosts: "The Hosts"
password: "The Password"
username: "The Username"
Expand Down Expand Up @@ -127,7 +130,7 @@ The procedure described above can be simplified into a single execution of the `

[source,shell]
----
kamel bind infinispan-source -p "source.hosts=The Hosts" -p "source.password=The Password" -p "source.username=The Username" kafka.strimzi.io/v1beta1:KafkaTopic:my-topic
kamel bind infinispan-source -p "source.cacheName=The Cache Name" -p "source.hosts=The Hosts" -p "source.password=The Password" -p "source.username=The Username" kafka.strimzi.io/v1beta1:KafkaTopic:my-topic
----

This will create the KameletBinding under the hood and apply it to the current namespace in the cluster.
Expand Down
94 changes: 52 additions & 42 deletions elasticsearch-index-sink.kamelet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ spec:

Input data must have JSON format according to the index used.

If the *indexId* parameter is set, that value will be used as the document ID on ElasticSearch.
- `indexId` / `ce-indexId`: as the index ID for Elasticsearch

If the *indexId* parameter is not set and the source of the kamelet binding is a Kafka broker, it will take the kafka topic, partition and offset of the
element to generate an automatic ID that warrantees that this element is processed only once.
If the header won't be set the exchange ID will be used as index.

If the *indexName* parameter is not set and the source of the kamelet binding is a Kafka broker, it will take the kafka topic as the indexName.
- `indexName` / `ce-indexName`: as the index Name for Elasticsearch

If the header won't be set the exchange ID will be used as index name.
required:
- clusterName
- hostAddresses
Expand All @@ -46,6 +47,9 @@ spec:
title: Password
description: Password to connect to ElasticSearch.
type: string
format: password
x-descriptors:
- urn:alm:descriptor:com.tectonic.ui:password
enableSSL:
title: Enable SSL
description: Do we want to connect using SSL?
Expand All @@ -58,22 +62,11 @@ spec:
description: Comma separated list with ip:port formatted remote transport addresses to use.
type: string
example: quickstart-es-http:9200
indexName:
title: Index in ElasticSearch
description: The name of the index to act against.
type: string
example: data
default: 'NONE'
clusterName:
title: ElasticSearch Cluster Name
description: Name of the cluster.
type: string
example: quickstart
indexId:
title: Index ID
description: None
type: string
default: 'NONE'
types:
out:
mediaType: application/json
Expand All @@ -89,30 +82,47 @@ spec:
from:
uri: kamelet:source
steps:
- choice:
when:
- simple: "'{{indexId}}' == 'NONE' && ${header[kafka.TOPIC]} != null"
steps:
- set-header:
name: "indexId"
simple: "${header[kafka.TOPIC]}${header[kafka.PARTITION]}${header[kafka.OFFSET]}"
- simple: "'{{indexId}}' != 'NONE'"
steps:
- set-header:
name: "indexId"
simple: "{{indexId}}"
- choice:
when:
- simple: "'{{indexName}}' == 'NONE'"
steps:
- set-property:
name: esIndexName
simple: "${header[kafka.TOPIC]}"
otherwise:
steps:
- set-property:
name: esIndexName
simple: "{{indexName}}"
- to-d: "kamelet-reify:elasticsearch-rest:{{clusterName}}?hostAddresses=RAW({{hostAddresses}})&operation=INDEX&indexName=${exchangeProperty.esIndexName}&enableSSL={{enableSSL}}&user={{user}}&password={{password}}"
- marshal:
json: { }
- choice:
when:
- simple: "${header[indexId]}"
steps:
- set-header:
name: "indexId"
simple: "${header[indexId]}"
- simple: "${header[ce-indexId]}"
steps:
- set-header:
name: "indexId"
simple: "${header[ce-indexId]}"
otherwise:
steps:
- set-header:
name: "indexId"
simple: "${exchangeId}"
- choice:
when:
- simple: "${header[indexName]}"
steps:
- set-header:
name: "indexName"
simple: "${header[indexName]}"
- simple: "${header[ce-indexName]}"
steps:
- set-header:
name: "indexName"
simple: "${header[ce-indexName]}"
otherwise:
steps:
- set-header:
name: "indexName"
simple: "${exchangeId}"
- to:
uri: "kamelet-reify:elasticsearch-rest:{{clusterName}}"
parameters:
operation: "INDEX"
hostAddresses: "{{hostAddresses}}"
enableSSL: "{{enableSSL}}"
user: "{{?user}}"
password: "{{?password}}"
- marshal:
json: { }
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ spec:

Input data must have JSON format according to the index used.

If the *indexId* parameter is set, that value will be used as the document ID on ElasticSearch.
- `indexId` / `ce-indexId`: as the index ID for Elasticsearch

If the *indexId* parameter is not set and the source of the kamelet binding is a Kafka broker, it will take the kafka topic, partition and offset of the
element to generate an automatic ID that warrantees that this element is processed only once.
If the header won't be set the exchange ID will be used as index.

If the *indexName* parameter is not set and the source of the kamelet binding is a Kafka broker, it will take the kafka topic as the indexName.
- `indexName` / `ce-indexName`: as the index Name for Elasticsearch

If the header won't be set the exchange ID will be used as index name.
required:
- clusterName
- hostAddresses
Expand All @@ -46,6 +47,9 @@ spec:
title: Password
description: Password to connect to ElasticSearch.
type: string
format: password
x-descriptors:
- urn:alm:descriptor:com.tectonic.ui:password
enableSSL:
title: Enable SSL
description: Do we want to connect using SSL?
Expand All @@ -58,22 +62,11 @@ spec:
description: Comma separated list with ip:port formatted remote transport addresses to use.
type: string
example: quickstart-es-http:9200
indexName:
title: Index in ElasticSearch
description: The name of the index to act against.
type: string
example: data
default: 'NONE'
clusterName:
title: ElasticSearch Cluster Name
description: Name of the cluster.
type: string
example: quickstart
indexId:
title: Index ID
description: None
type: string
default: 'NONE'
types:
out:
mediaType: application/json
Expand All @@ -89,30 +82,47 @@ spec:
from:
uri: kamelet:source
steps:
- choice:
when:
- simple: "'{{indexId}}' == 'NONE' && ${header[kafka.TOPIC]} != null"
steps:
- set-header:
name: "indexId"
simple: "${header[kafka.TOPIC]}${header[kafka.PARTITION]}${header[kafka.OFFSET]}"
- simple: "'{{indexId}}' != 'NONE'"
steps:
- set-header:
name: "indexId"
simple: "{{indexId}}"
- choice:
when:
- simple: "'{{indexName}}' == 'NONE'"
steps:
- set-property:
name: esIndexName
simple: "${header[kafka.TOPIC]}"
otherwise:
steps:
- set-property:
name: esIndexName
simple: "{{indexName}}"
- to-d: "kamelet-reify:elasticsearch-rest:{{clusterName}}?hostAddresses=RAW({{hostAddresses}})&operation=INDEX&indexName=${exchangeProperty.esIndexName}&enableSSL={{enableSSL}}&user={{user}}&password={{password}}"
- marshal:
json: { }
- choice:
when:
- simple: "${header[indexId]}"
steps:
- set-header:
name: "indexId"
simple: "${header[indexId]}"
- simple: "${header[ce-indexId]}"
steps:
- set-header:
name: "indexId"
simple: "${header[ce-indexId]}"
otherwise:
steps:
- set-header:
name: "indexId"
simple: "${exchangeId}"
- choice:
when:
- simple: "${header[indexName]}"
steps:
- set-header:
name: "indexName"
simple: "${header[indexName]}"
- simple: "${header[ce-indexName]}"
steps:
- set-header:
name: "indexName"
simple: "${header[ce-indexName]}"
otherwise:
steps:
- set-header:
name: "indexName"
simple: "${exchangeId}"
- to:
uri: "kamelet-reify:elasticsearch-rest:{{clusterName}}"
parameters:
operation: "INDEX"
hostAddresses: "{{hostAddresses}}"
enableSSL: "{{enableSSL}}"
user: "{{?user}}"
password: "{{?password}}"
- marshal:
json: { }