From 13a8e0e4be705a5ce894fc7eea0034314b2d5af2 Mon Sep 17 00:00:00 2001 From: Andrea Cosentino Date: Fri, 20 Oct 2023 17:28:30 +0200 Subject: [PATCH] Added Kafka Apicurio Registry Sink Kamelet (#1697) * Added Kafka Apicurio Registry Sink Kamelet Signed-off-by: Andrea Cosentino * Added Kafka Apicurio Registry Sink Kamelet Signed-off-by: Andrea Cosentino --------- Signed-off-by: Andrea Cosentino --- docs/modules/ROOT/nav.adoc | 1 + ...rio-registry-not-secured-sink.kamelet.yaml | 115 ++++++++++++++++++ ...rio-registry-not-secured-sink.kamelet.yaml | 115 ++++++++++++++++++ 3 files changed, 231 insertions(+) create mode 100644 kamelets/kafka-apicurio-registry-not-secured-sink.kamelet.yaml create mode 100644 library/camel-kamelets/src/main/resources/kamelets/kafka-apicurio-registry-not-secured-sink.kamelet.yaml diff --git a/docs/modules/ROOT/nav.adoc b/docs/modules/ROOT/nav.adoc index 227dd455c..3e344f5ef 100644 --- a/docs/modules/ROOT/nav.adoc +++ b/docs/modules/ROOT/nav.adoc @@ -123,6 +123,7 @@ * xref:json-schema-validator-action.adoc[] * xref:json-serialize-action.adoc[] * xref:jsonata-action.adoc[] +* xref:kafka-apicurio-registry-not-secured-sink.adoc[] * xref:kafka-apicurio-registry-not-secured-source.adoc[] * xref:kafka-azure-schema-registry-source.adoc[] * xref:kafka-manual-commit-action.adoc[] diff --git a/kamelets/kafka-apicurio-registry-not-secured-sink.kamelet.yaml b/kamelets/kafka-apicurio-registry-not-secured-sink.kamelet.yaml new file mode 100644 index 000000000..ebb2604d1 --- /dev/null +++ b/kamelets/kafka-apicurio-registry-not-secured-sink.kamelet.yaml @@ -0,0 +1,115 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- +apiVersion: camel.apache.org/v1 +kind: Kamelet +metadata: + name: kafka-apicurio-registry-not-secured-sink + annotations: + camel.apache.org/kamelet.support.level: "Stable" + camel.apache.org/catalog.version: "4.1.0-SNAPSHOT" + camel.apache.org/kamelet.icon: "" + camel.apache.org/provider: "Apache Software Foundation" + camel.apache.org/kamelet.group: "Kafka" + camel.apache.org/kamelet.namespace: "Kafka" + labels: + camel.apache.org/kamelet.type: "sink" +spec: + definition: + title: "Kafka Not Secured with Apicurio Registry Sink" + description: |- + Send data to Kafka topics on an insecure broker with Apicurio Registry. + + The Kamelet is able to understand the following headers to be set: + + - `key` / `ce-key`: as message key + + - `partition-key` / `ce-partitionkey`: as message partition key + + Both the headers are optional. + + Three headers, if specified, will be deduplicated with different names, kafka.key will be duplicated into kafka.KEY, kafka.topic into kafka.TOPIC and kafka.override_topic into kafka.OVERRIDE_TOPIC + required: + - topic + - bootstrapServers + - apicurioRegistryUrl + type: object + properties: + topic: + title: Topic Names + description: Comma separated list of Kafka topic names + type: string + bootstrapServers: + title: Bootstrap Servers + description: Comma separated list of Kafka Broker URLs + type: string + valueSerializer: + title: Value Serializer + description: Serliazer class for value that implements the Serializer interface. + type: string + default: "io.apicurio.registry.serde.avro.AvroKafkaSerializer" + apicurioRegistryUrl: + title: Apicurio Registry URL + description: The Apicurio Schema Registry URL + type: string + x-descriptors: + - urn:keda:metadata:apicurioRegistryUrl + - urn:keda:required + avroDatumProvider: + title: Avro Datum Provider + description: How to write data with Avro + type: string + default: "io.apicurio.registry.serde.avro.ReflectAvroDatumProvider" + dependencies: + - "camel:core" + - "camel:kamelet" + - "camel:kafka" + - "mvn:io.apicurio:apicurio-registry-serdes-avro-serde:2.4.12.Final" + template: + from: + uri: "kamelet:source" + steps: + - choice: + when: + - simple: "${header[key]}" + steps: + - set-header: + name: kafka.KEY + simple: "${header[key]}" + - simple: "${header[ce-key]}" + steps: + - set-header: + name: kafka.KEY + simple: "${header[ce-key]}" + - choice: + when: + - simple: "${header[partition-key]}" + steps: + - set-header: + name: kafka.PARTITION_KEY + simple: "${header[partition-key]}" + - simple: "${header[ce-partitionkey]}" + steps: + - set-header: + name: kafka.PARTITION_KEY + simple: "${header[ce-partitionkey]}" + - to: + uri: "kafka:{{topic}}" + parameters: + brokers: "{{bootstrapServers}}" + valueSerializer: "{{valueSerializer}}" + additionalProperties.apicurio.registry.url: "{{apicurioRegistryUrl}}" + additionalProperties.apicurio.registry.avro-datum-provider: "{{avroDatumProvider}}" diff --git a/library/camel-kamelets/src/main/resources/kamelets/kafka-apicurio-registry-not-secured-sink.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/kafka-apicurio-registry-not-secured-sink.kamelet.yaml new file mode 100644 index 000000000..ebb2604d1 --- /dev/null +++ b/library/camel-kamelets/src/main/resources/kamelets/kafka-apicurio-registry-not-secured-sink.kamelet.yaml @@ -0,0 +1,115 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- +apiVersion: camel.apache.org/v1 +kind: Kamelet +metadata: + name: kafka-apicurio-registry-not-secured-sink + annotations: + camel.apache.org/kamelet.support.level: "Stable" + camel.apache.org/catalog.version: "4.1.0-SNAPSHOT" + camel.apache.org/kamelet.icon: "" + camel.apache.org/provider: "Apache Software Foundation" + camel.apache.org/kamelet.group: "Kafka" + camel.apache.org/kamelet.namespace: "Kafka" + labels: + camel.apache.org/kamelet.type: "sink" +spec: + definition: + title: "Kafka Not Secured with Apicurio Registry Sink" + description: |- + Send data to Kafka topics on an insecure broker with Apicurio Registry. + + The Kamelet is able to understand the following headers to be set: + + - `key` / `ce-key`: as message key + + - `partition-key` / `ce-partitionkey`: as message partition key + + Both the headers are optional. + + Three headers, if specified, will be deduplicated with different names, kafka.key will be duplicated into kafka.KEY, kafka.topic into kafka.TOPIC and kafka.override_topic into kafka.OVERRIDE_TOPIC + required: + - topic + - bootstrapServers + - apicurioRegistryUrl + type: object + properties: + topic: + title: Topic Names + description: Comma separated list of Kafka topic names + type: string + bootstrapServers: + title: Bootstrap Servers + description: Comma separated list of Kafka Broker URLs + type: string + valueSerializer: + title: Value Serializer + description: Serliazer class for value that implements the Serializer interface. + type: string + default: "io.apicurio.registry.serde.avro.AvroKafkaSerializer" + apicurioRegistryUrl: + title: Apicurio Registry URL + description: The Apicurio Schema Registry URL + type: string + x-descriptors: + - urn:keda:metadata:apicurioRegistryUrl + - urn:keda:required + avroDatumProvider: + title: Avro Datum Provider + description: How to write data with Avro + type: string + default: "io.apicurio.registry.serde.avro.ReflectAvroDatumProvider" + dependencies: + - "camel:core" + - "camel:kamelet" + - "camel:kafka" + - "mvn:io.apicurio:apicurio-registry-serdes-avro-serde:2.4.12.Final" + template: + from: + uri: "kamelet:source" + steps: + - choice: + when: + - simple: "${header[key]}" + steps: + - set-header: + name: kafka.KEY + simple: "${header[key]}" + - simple: "${header[ce-key]}" + steps: + - set-header: + name: kafka.KEY + simple: "${header[ce-key]}" + - choice: + when: + - simple: "${header[partition-key]}" + steps: + - set-header: + name: kafka.PARTITION_KEY + simple: "${header[partition-key]}" + - simple: "${header[ce-partitionkey]}" + steps: + - set-header: + name: kafka.PARTITION_KEY + simple: "${header[ce-partitionkey]}" + - to: + uri: "kafka:{{topic}}" + parameters: + brokers: "{{bootstrapServers}}" + valueSerializer: "{{valueSerializer}}" + additionalProperties.apicurio.registry.url: "{{apicurioRegistryUrl}}" + additionalProperties.apicurio.registry.avro-datum-provider: "{{avroDatumProvider}}"