From a8ad1ab09928b38b725019a523f7e7e00f3fbb22 Mon Sep 17 00:00:00 2001 From: Andrea Cosentino Date: Thu, 1 Feb 2024 12:08:24 +0100 Subject: [PATCH 1/4] Support Kafka Batch as Kamelet source - Kafka Azure Schema Registry Source as batch Signed-off-by: Andrea Cosentino --- ...-azure-schema-registry-source.kamelet.yaml | 182 ++++++++++++++++++ 1 file changed, 182 insertions(+) create mode 100644 kamelets/kafka-batch-azure-schema-registry-source.kamelet.yaml diff --git a/kamelets/kafka-batch-azure-schema-registry-source.kamelet.yaml b/kamelets/kafka-batch-azure-schema-registry-source.kamelet.yaml new file mode 100644 index 000000000..a83ad6ed3 --- /dev/null +++ b/kamelets/kafka-batch-azure-schema-registry-source.kamelet.yaml @@ -0,0 +1,182 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- +apiVersion: camel.apache.org/v1 +kind: Kamelet +metadata: + name: kafka-batch-azure-schema-registry-source + annotations: + camel.apache.org/kamelet.support.level: "Preview" + camel.apache.org/catalog.version: "4.4.0-SNAPSHOT" + camel.apache.org/kamelet.icon: "" + camel.apache.org/provider: "Apache Software Foundation" + camel.apache.org/kamelet.group: "Kafka" + camel.apache.org/kamelet.namespace: "Kafka" + camel.apache.org/keda.type: "kafka" + labels: + camel.apache.org/kamelet.type: "source" +spec: + definition: + title: "Azure Kafka Batch through Eventhubs with Azure Schema Registry Source" + description: |- + Receive data from Kafka topics in batch on Azure Eventhubs combined with Azure Schema Registry and commit them manually through KafkaManualCommit or auto commit. + required: + - topic + - bootstrapServers + - azureRegistryUrl + - password + type: object + properties: + topic: + title: Topic Names + description: Comma separated list of Kafka topic names + type: string + x-descriptors: + - urn:keda:metadata:topic + - urn:keda:required + bootstrapServers: + title: Bootstrap Servers + description: Comma separated list of Kafka Broker URLs + type: string + x-descriptors: + - urn:keda:metadata:bootstrapServers + - urn:keda:required + securityProtocol: + title: Security Protocol + description: Protocol used to communicate with brokers. SASL_PLAINTEXT, PLAINTEXT, SASL_SSL and SSL are supported + type: string + default: SASL_SSL + saslMechanism: + title: SASL Mechanism + description: The Simple Authentication and Security Layer (SASL) Mechanism used. + type: string + default: PLAIN + password: + title: Password + description: Password to authenticate to kafka + type: string + format: password + x-descriptors: + - urn:camel:group:credentials + - urn:keda:authentication:password + - urn:keda:required + autoCommitEnable: + title: Auto Commit Enable + description: If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer + type: boolean + default: true + allowManualCommit: + title: Allow Manual Commit + description: Whether to allow doing manual commits + type: boolean + default: false + pollOnError: + title: Poll On Error Behavior + description: What to do if kafka threw an exception while polling for new messages. There are 5 enums and the value can be one of DISCARD, ERROR_HANDLER, RECONNECT, RETRY, STOP + type: string + default: "ERROR_HANDLER" + autoOffsetReset: + title: Auto Offset Reset + description: What to do when there is no initial offset. There are 3 enums and the value can be one of latest, earliest, none + type: string + default: "latest" + x-descriptors: + - urn:keda:metadata:offsetResetPolicy + consumerGroup: + title: Consumer Group + description: A string that uniquely identifies the group of consumers to which this source belongs + type: string + example: "my-group-id" + x-descriptors: + - urn:keda:metadata:consumerGroup + - urn:keda:required + deserializeHeaders: + title: Automatically Deserialize Headers + description: When enabled the Kamelet source will deserialize all message headers to String representation. + type: boolean + default: true + valueDeserializer: + title: Value Deserializer + description: Deserializer class for value that implements the Deserializer interface. + type: string + default: "com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer" + azureRegistryUrl: + title: Azure Schema Registry URL + description: The Apicurio Schema Registry URL + type: string + x-descriptors: + - urn:keda:metadata:bootstrapServers + - urn:keda:required + specificAvroValueType: + title: Specific Avro Value Type + description: The Specific Type Avro will have to deal with + type: string + example: "com.example.Order" + batchSize: + title: Batch Dimension + description: The maximum number of records returned in a single call to poll() + type: int + default: 500 + pollTimeout: + title: Poll Timeout Interval + description: The timeout used when polling the KafkaConsumer + type: int + default: 5000 + dependencies: + - "mvn:org.apache.camel.kamelets:camel-kamelets-utils:4.4.0-SNAPSHOT" + - "camel:kafka" + - "camel:core" + - "camel:kamelet" + - "camel:azure-schema-registry" + - "mvn:com.microsoft.azure:azure-schemaregistry-kafka-avro:1.1.1" + - "mvn:com.azure:azure-data-schemaregistry-apacheavro:1.1.11" + - "mvn:com.azure:azure-identity:1.10.4" + template: + beans: + - name: defaultAzureCredential + type: "#class:org.apache.camel.component.azure.schema.registry.DefaultAzureCredentialWrapper" + - name: kafkaHeaderDeserializer + type: "#class:org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer" + properties: + enabled: '{{deserializeHeaders}}' + - name: manualCommitFactory + type: "#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory" + from: + uri: "kafka:{{topic}}" + parameters: + brokers: "{{bootstrapServers}}" + securityProtocol: "{{securityProtocol}}" + saslMechanism: "{{saslMechanism}}" + saslJaasConfig: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password={{password}};' + autoCommitEnable: "{{autoCommitEnable}}" + allowManualCommit: "{{allowManualCommit}}" + pollOnError: "{{pollOnError}}" + autoOffsetReset: "{{autoOffsetReset}}" + groupId: "{{?consumerGroup}}" + valueDeserializer: "{{valueDeserializer}}" + maxPollRecords: "{{batchSize}}" + pollTimeoutMs: "{{pollTimeout}}" + batching: true + kafkaManualCommitFactory: "#bean:{{manualCommitFactory}}" + additionalProperties.schema.registry.url: "{{azureRegistryUrl}}" + additionalProperties.schema.group: avro + additionalProperties.schema.registry.credential: '#bean:defaultAzureCredential' + additionalProperties.specific.avro.value.type: '#valueAs(java.lang.Class):{{specificAvroValueType}}' + additionalProperties.specific.avro.reader: '#valueAs(boolean):true' + steps: + - process: + ref: "{{kafkaHeaderDeserializer}}" + - to: "kamelet:sink" From f63e43c0884599ce4e837737ffc2f2768700c21b Mon Sep 17 00:00:00 2001 From: Andrea Cosentino Date: Thu, 1 Feb 2024 12:10:20 +0100 Subject: [PATCH 2/4] Support Kafka Batch as Kamelet source - Kafka Azure Schema Registry Source as batch Signed-off-by: Andrea Cosentino --- ...-azure-schema-registry-source.kamelet.yaml | 182 ++++++++++++++++++ 1 file changed, 182 insertions(+) create mode 100644 library/camel-kamelets/src/main/resources/kamelets/kafka-batch-azure-schema-registry-source.kamelet.yaml diff --git a/library/camel-kamelets/src/main/resources/kamelets/kafka-batch-azure-schema-registry-source.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/kafka-batch-azure-schema-registry-source.kamelet.yaml new file mode 100644 index 000000000..a83ad6ed3 --- /dev/null +++ b/library/camel-kamelets/src/main/resources/kamelets/kafka-batch-azure-schema-registry-source.kamelet.yaml @@ -0,0 +1,182 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- +apiVersion: camel.apache.org/v1 +kind: Kamelet +metadata: + name: kafka-batch-azure-schema-registry-source + annotations: + camel.apache.org/kamelet.support.level: "Preview" + camel.apache.org/catalog.version: "4.4.0-SNAPSHOT" + camel.apache.org/kamelet.icon: "" + camel.apache.org/provider: "Apache Software Foundation" + camel.apache.org/kamelet.group: "Kafka" + camel.apache.org/kamelet.namespace: "Kafka" + camel.apache.org/keda.type: "kafka" + labels: + camel.apache.org/kamelet.type: "source" +spec: + definition: + title: "Azure Kafka Batch through Eventhubs with Azure Schema Registry Source" + description: |- + Receive data from Kafka topics in batch on Azure Eventhubs combined with Azure Schema Registry and commit them manually through KafkaManualCommit or auto commit. + required: + - topic + - bootstrapServers + - azureRegistryUrl + - password + type: object + properties: + topic: + title: Topic Names + description: Comma separated list of Kafka topic names + type: string + x-descriptors: + - urn:keda:metadata:topic + - urn:keda:required + bootstrapServers: + title: Bootstrap Servers + description: Comma separated list of Kafka Broker URLs + type: string + x-descriptors: + - urn:keda:metadata:bootstrapServers + - urn:keda:required + securityProtocol: + title: Security Protocol + description: Protocol used to communicate with brokers. SASL_PLAINTEXT, PLAINTEXT, SASL_SSL and SSL are supported + type: string + default: SASL_SSL + saslMechanism: + title: SASL Mechanism + description: The Simple Authentication and Security Layer (SASL) Mechanism used. + type: string + default: PLAIN + password: + title: Password + description: Password to authenticate to kafka + type: string + format: password + x-descriptors: + - urn:camel:group:credentials + - urn:keda:authentication:password + - urn:keda:required + autoCommitEnable: + title: Auto Commit Enable + description: If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer + type: boolean + default: true + allowManualCommit: + title: Allow Manual Commit + description: Whether to allow doing manual commits + type: boolean + default: false + pollOnError: + title: Poll On Error Behavior + description: What to do if kafka threw an exception while polling for new messages. There are 5 enums and the value can be one of DISCARD, ERROR_HANDLER, RECONNECT, RETRY, STOP + type: string + default: "ERROR_HANDLER" + autoOffsetReset: + title: Auto Offset Reset + description: What to do when there is no initial offset. There are 3 enums and the value can be one of latest, earliest, none + type: string + default: "latest" + x-descriptors: + - urn:keda:metadata:offsetResetPolicy + consumerGroup: + title: Consumer Group + description: A string that uniquely identifies the group of consumers to which this source belongs + type: string + example: "my-group-id" + x-descriptors: + - urn:keda:metadata:consumerGroup + - urn:keda:required + deserializeHeaders: + title: Automatically Deserialize Headers + description: When enabled the Kamelet source will deserialize all message headers to String representation. + type: boolean + default: true + valueDeserializer: + title: Value Deserializer + description: Deserializer class for value that implements the Deserializer interface. + type: string + default: "com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer" + azureRegistryUrl: + title: Azure Schema Registry URL + description: The Apicurio Schema Registry URL + type: string + x-descriptors: + - urn:keda:metadata:bootstrapServers + - urn:keda:required + specificAvroValueType: + title: Specific Avro Value Type + description: The Specific Type Avro will have to deal with + type: string + example: "com.example.Order" + batchSize: + title: Batch Dimension + description: The maximum number of records returned in a single call to poll() + type: int + default: 500 + pollTimeout: + title: Poll Timeout Interval + description: The timeout used when polling the KafkaConsumer + type: int + default: 5000 + dependencies: + - "mvn:org.apache.camel.kamelets:camel-kamelets-utils:4.4.0-SNAPSHOT" + - "camel:kafka" + - "camel:core" + - "camel:kamelet" + - "camel:azure-schema-registry" + - "mvn:com.microsoft.azure:azure-schemaregistry-kafka-avro:1.1.1" + - "mvn:com.azure:azure-data-schemaregistry-apacheavro:1.1.11" + - "mvn:com.azure:azure-identity:1.10.4" + template: + beans: + - name: defaultAzureCredential + type: "#class:org.apache.camel.component.azure.schema.registry.DefaultAzureCredentialWrapper" + - name: kafkaHeaderDeserializer + type: "#class:org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer" + properties: + enabled: '{{deserializeHeaders}}' + - name: manualCommitFactory + type: "#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory" + from: + uri: "kafka:{{topic}}" + parameters: + brokers: "{{bootstrapServers}}" + securityProtocol: "{{securityProtocol}}" + saslMechanism: "{{saslMechanism}}" + saslJaasConfig: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password={{password}};' + autoCommitEnable: "{{autoCommitEnable}}" + allowManualCommit: "{{allowManualCommit}}" + pollOnError: "{{pollOnError}}" + autoOffsetReset: "{{autoOffsetReset}}" + groupId: "{{?consumerGroup}}" + valueDeserializer: "{{valueDeserializer}}" + maxPollRecords: "{{batchSize}}" + pollTimeoutMs: "{{pollTimeout}}" + batching: true + kafkaManualCommitFactory: "#bean:{{manualCommitFactory}}" + additionalProperties.schema.registry.url: "{{azureRegistryUrl}}" + additionalProperties.schema.group: avro + additionalProperties.schema.registry.credential: '#bean:defaultAzureCredential' + additionalProperties.specific.avro.value.type: '#valueAs(java.lang.Class):{{specificAvroValueType}}' + additionalProperties.specific.avro.reader: '#valueAs(boolean):true' + steps: + - process: + ref: "{{kafkaHeaderDeserializer}}" + - to: "kamelet:sink" From 792f524ab24ae124826612df02a60e00324e5191 Mon Sep 17 00:00:00 2001 From: Andrea Cosentino Date: Thu, 1 Feb 2024 12:11:15 +0100 Subject: [PATCH 3/4] Support Kafka Batch as Kamelet source - Kafka Azure Schema Registry Source as batch Signed-off-by: Andrea Cosentino --- docs/modules/ROOT/nav.adoc | 1 + script/validator/validator.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/modules/ROOT/nav.adoc b/docs/modules/ROOT/nav.adoc index e62ee0dfb..32c52e343 100644 --- a/docs/modules/ROOT/nav.adoc +++ b/docs/modules/ROOT/nav.adoc @@ -131,6 +131,7 @@ * xref:kafka-apicurio-registry-not-secured-source.adoc[] * xref:kafka-azure-schema-registry-sink.adoc[] * xref:kafka-azure-schema-registry-source.adoc[] +* xref:kafka-batch-azure-schema-registry-source.adoc[] * xref:kafka-batch-manual-commit-action.adoc[] * xref:kafka-batch-not-secured-source.adoc[] * xref:kafka-batch-scram-source.adoc[] diff --git a/script/validator/validator.go b/script/validator/validator.go index cf62cf5fe..e2da1d592 100644 --- a/script/validator/validator.go +++ b/script/validator/validator.go @@ -378,7 +378,7 @@ func listKamelets(dir string) []KameletInfo { func verifyUsedParams(kamelets []KameletInfo) (errors []error) { for _, k := range kamelets { - if k.FileName != "../../kamelets/azure-storage-blob-source.kamelet.yaml" && k.FileName != "../../kamelets/aws-s3-cdc-source.kamelet.yaml" && k.FileName != "../../kamelets/set-kafka-key-action.kamelet.yaml" && k.FileName != "../../kamelets/azure-storage-blob-cdc-source.kamelet.yaml" && k.FileName != "../../kamelets/google-storage-cdc-source.kamelet.yaml" && k.FileName != "../../kamelets/elasticsearch-search-source.kamelet.yaml" && k.FileName != "../../kamelets/opensearch-search-source.kamelet.yaml" && k.FileName != "../../kamelets/kafka-azure-schema-registry-source.kamelet.yaml" && k.FileName != "../../kamelets/kafka-azure-schema-registry-sink.kamelet.yaml" { + if k.FileName != "../../kamelets/azure-storage-blob-source.kamelet.yaml" && k.FileName != "../../kamelets/aws-s3-cdc-source.kamelet.yaml" && k.FileName != "../../kamelets/set-kafka-key-action.kamelet.yaml" && k.FileName != "../../kamelets/azure-storage-blob-cdc-source.kamelet.yaml" && k.FileName != "../../kamelets/google-storage-cdc-source.kamelet.yaml" && k.FileName != "../../kamelets/elasticsearch-search-source.kamelet.yaml" && k.FileName != "../../kamelets/opensearch-search-source.kamelet.yaml" && k.FileName != "../../kamelets/kafka-azure-schema-registry-source.kamelet.yaml" && k.FileName != "../../kamelets/kafka-azure-schema-registry-sink.kamelet.yaml" && k.FileName != "../../kamelets/kafka-batch-azure-schema-registry-sink.kamelet.yaml" { used := getUsedParams(k.Kamelet) declared := getDeclaredParams(k.Kamelet) for p := range used { From 97d97c9e32d9de8ec1a4aab042fe85aa3a4e2f89 Mon Sep 17 00:00:00 2001 From: Andrea Cosentino Date: Thu, 1 Feb 2024 12:11:52 +0100 Subject: [PATCH 4/4] Support Kafka Batch as Kamelet source - Kafka Azure Schema Registry Source as batch Signed-off-by: Andrea Cosentino --- script/validator/validator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/script/validator/validator.go b/script/validator/validator.go index e2da1d592..ef8ec537c 100644 --- a/script/validator/validator.go +++ b/script/validator/validator.go @@ -378,7 +378,7 @@ func listKamelets(dir string) []KameletInfo { func verifyUsedParams(kamelets []KameletInfo) (errors []error) { for _, k := range kamelets { - if k.FileName != "../../kamelets/azure-storage-blob-source.kamelet.yaml" && k.FileName != "../../kamelets/aws-s3-cdc-source.kamelet.yaml" && k.FileName != "../../kamelets/set-kafka-key-action.kamelet.yaml" && k.FileName != "../../kamelets/azure-storage-blob-cdc-source.kamelet.yaml" && k.FileName != "../../kamelets/google-storage-cdc-source.kamelet.yaml" && k.FileName != "../../kamelets/elasticsearch-search-source.kamelet.yaml" && k.FileName != "../../kamelets/opensearch-search-source.kamelet.yaml" && k.FileName != "../../kamelets/kafka-azure-schema-registry-source.kamelet.yaml" && k.FileName != "../../kamelets/kafka-azure-schema-registry-sink.kamelet.yaml" && k.FileName != "../../kamelets/kafka-batch-azure-schema-registry-sink.kamelet.yaml" { + if k.FileName != "../../kamelets/azure-storage-blob-source.kamelet.yaml" && k.FileName != "../../kamelets/aws-s3-cdc-source.kamelet.yaml" && k.FileName != "../../kamelets/set-kafka-key-action.kamelet.yaml" && k.FileName != "../../kamelets/azure-storage-blob-cdc-source.kamelet.yaml" && k.FileName != "../../kamelets/google-storage-cdc-source.kamelet.yaml" && k.FileName != "../../kamelets/elasticsearch-search-source.kamelet.yaml" && k.FileName != "../../kamelets/opensearch-search-source.kamelet.yaml" && k.FileName != "../../kamelets/kafka-azure-schema-registry-source.kamelet.yaml" && k.FileName != "../../kamelets/kafka-azure-schema-registry-sink.kamelet.yaml" && k.FileName != "../../kamelets/kafka-batch-azure-schema-registry-source.kamelet.yaml" { used := getUsedParams(k.Kamelet) declared := getDeclaredParams(k.Kamelet) for p := range used {