From f5a26c9df51ada0c622ddd85e32f6d10aaf3d55e Mon Sep 17 00:00:00 2001 From: Andrea Cosentino Date: Tue, 30 Jan 2024 15:16:40 +0100 Subject: [PATCH 1/5] Support Kafka Batch as Kamelet source - Kafka Scram Source as Batch Signed-off-by: Andrea Cosentino --- .../kafka-batch-scram-source.kamelet.yaml | 163 ++++++++++++++++++ 1 file changed, 163 insertions(+) create mode 100644 kamelets/kafka-batch-scram-source.kamelet.yaml diff --git a/kamelets/kafka-batch-scram-source.kamelet.yaml b/kamelets/kafka-batch-scram-source.kamelet.yaml new file mode 100644 index 000000000..2046a3354 --- /dev/null +++ b/kamelets/kafka-batch-scram-source.kamelet.yaml @@ -0,0 +1,163 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- +apiVersion: camel.apache.org/v1 +kind: Kamelet +metadata: + name: kafka-scram-source + annotations: + camel.apache.org/kamelet.support.level: "Preview" + camel.apache.org/catalog.version: "4.4.0-SNAPSHOT" + camel.apache.org/kamelet.icon: "" + camel.apache.org/provider: "Apache Software Foundation" + camel.apache.org/kamelet.group: "Kafka" + camel.apache.org/kamelet.namespace: "Kafka" + camel.apache.org/keda.type: "kafka" + camel.apache.org/keda.authentication.sasl: "scram-sha-512" + camel.apache.org/keda.authentication.tls: "enable" + labels: + camel.apache.org/kamelet.type: "source" +spec: + definition: + title: "Kafka Scram Source" + description: |- + Receive data from Kafka topics through SCRAM login module. + required: + - topic + - bootstrapServers + - user + - password + type: object + properties: + topic: + title: Topic Names + description: Comma separated list of Kafka topic names + type: string + x-descriptors: + - urn:keda:metadata:topic + - urn:keda:required + bootstrapServers: + title: Bootstrap Servers + description: Comma separated list of Kafka Broker URLs + type: string + x-descriptors: + - urn:keda:metadata:bootstrapServers + - urn:keda:required + securityProtocol: + title: Security Protocol + description: Protocol used to communicate with brokers. SASL_PLAINTEXT, PLAINTEXT, SASL_SSL and SSL are supported + type: string + default: SASL_SSL + saslMechanism: + title: SASL Mechanism + description: The Simple Authentication and Security Layer (SASL) Mechanism used. + type: string + default: SCRAM-SHA-512 + user: + title: Username + description: Username to authenticate to Kafka + type: string + x-descriptors: + - urn:camel:group:credentials + - urn:keda:authentication:username + - urn:keda:required + password: + title: Password + description: Password to authenticate to kafka + type: string + format: password + x-descriptors: + - urn:camel:group:credentials + - urn:keda:authentication:password + - urn:keda:required + autoCommitEnable: + title: Auto Commit Enable + description: If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer + type: boolean + default: true + allowManualCommit: + title: Allow Manual Commit + description: Whether to allow doing manual commits + type: boolean + default: false + pollOnError: + title: Poll On Error Behavior + description: What to do if kafka threw an exception while polling for new messages. There are 5 enums and the value can be one of DISCARD, ERROR_HANDLER, RECONNECT, RETRY, STOP + type: string + default: "ERROR_HANDLER" + autoOffsetReset: + title: Auto Offset Reset + description: What to do when there is no initial offset. There are 3 enums and the value can be one of latest, earliest, none + type: string + default: "latest" + x-descriptors: + - urn:keda:metadata:offsetResetPolicy + consumerGroup: + title: Consumer Group + description: A string that uniquely identifies the group of consumers to which this source belongs + type: string + example: "my-group-id" + x-descriptors: + - urn:keda:metadata:consumerGroup + - urn:keda:required + deserializeHeaders: + title: Automatically Deserialize Headers + description: When enabled the Kamelet source will deserialize all message headers to String representation. + type: boolean + default: true + batchSize: + title: Batch Dimension + description: The maximum number of records returned in a single call to poll() + type: int + default: 500 + pollTimeout: + title: Poll Timeout Interval + description: The timeout used when polling the KafkaConsumer + type: int + default: 5000 + dependencies: + - "mvn:org.apache.camel.kamelets:camel-kamelets-utils:4.4.0-SNAPSHOT" + - "camel:core" + - "camel:kafka" + - "camel:kamelet" + template: + beans: + - name: kafkaHeaderDeserializer + type: "#class:org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer" + properties: + enabled: '{{deserializeHeaders}}' + - name: manualCommitFactory + type: "#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory" + from: + uri: "kafka:{{topic}}" + parameters: + brokers: "{{?bootstrapServers}}" + securityProtocol: "{{securityProtocol}}" + saslMechanism: "{{saslMechanism}}" + saslJaasConfig: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="{{user}}" password="{{password}}";' + autoCommitEnable: "{{autoCommitEnable}}" + allowManualCommit: "{{allowManualCommit}}" + pollOnError: "{{pollOnError}}" + autoOffsetReset: "{{autoOffsetReset}}" + groupId: "{{?consumerGroup}}" + maxPollRecords: "{{batchSize}}" + pollTimeoutMs: "{{pollTimeout}}" + batching: true + kafkaManualCommitFactory: "#bean:{{manualCommitFactory}}" + steps: + - process: + ref: "{{kafkaHeaderDeserializer}}" + - to: "kamelet:sink" From 32a864693cce9df3cd1eed9552e172734986b654 Mon Sep 17 00:00:00 2001 From: Andrea Cosentino Date: Tue, 30 Jan 2024 15:17:24 +0100 Subject: [PATCH 2/5] Support Kafka Batch as Kamelet source - Kafka Source as Batch Signed-off-by: Andrea Cosentino --- .../kafka-batch-scram-source.kamelet.yaml | 163 ++++++++++++++++++ 1 file changed, 163 insertions(+) create mode 100644 library/camel-kamelets/src/main/resources/kamelets/kafka-batch-scram-source.kamelet.yaml diff --git a/library/camel-kamelets/src/main/resources/kamelets/kafka-batch-scram-source.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/kafka-batch-scram-source.kamelet.yaml new file mode 100644 index 000000000..2046a3354 --- /dev/null +++ b/library/camel-kamelets/src/main/resources/kamelets/kafka-batch-scram-source.kamelet.yaml @@ -0,0 +1,163 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- +apiVersion: camel.apache.org/v1 +kind: Kamelet +metadata: + name: kafka-scram-source + annotations: + camel.apache.org/kamelet.support.level: "Preview" + camel.apache.org/catalog.version: "4.4.0-SNAPSHOT" + camel.apache.org/kamelet.icon: "" + camel.apache.org/provider: "Apache Software Foundation" + camel.apache.org/kamelet.group: "Kafka" + camel.apache.org/kamelet.namespace: "Kafka" + camel.apache.org/keda.type: "kafka" + camel.apache.org/keda.authentication.sasl: "scram-sha-512" + camel.apache.org/keda.authentication.tls: "enable" + labels: + camel.apache.org/kamelet.type: "source" +spec: + definition: + title: "Kafka Scram Source" + description: |- + Receive data from Kafka topics through SCRAM login module. + required: + - topic + - bootstrapServers + - user + - password + type: object + properties: + topic: + title: Topic Names + description: Comma separated list of Kafka topic names + type: string + x-descriptors: + - urn:keda:metadata:topic + - urn:keda:required + bootstrapServers: + title: Bootstrap Servers + description: Comma separated list of Kafka Broker URLs + type: string + x-descriptors: + - urn:keda:metadata:bootstrapServers + - urn:keda:required + securityProtocol: + title: Security Protocol + description: Protocol used to communicate with brokers. SASL_PLAINTEXT, PLAINTEXT, SASL_SSL and SSL are supported + type: string + default: SASL_SSL + saslMechanism: + title: SASL Mechanism + description: The Simple Authentication and Security Layer (SASL) Mechanism used. + type: string + default: SCRAM-SHA-512 + user: + title: Username + description: Username to authenticate to Kafka + type: string + x-descriptors: + - urn:camel:group:credentials + - urn:keda:authentication:username + - urn:keda:required + password: + title: Password + description: Password to authenticate to kafka + type: string + format: password + x-descriptors: + - urn:camel:group:credentials + - urn:keda:authentication:password + - urn:keda:required + autoCommitEnable: + title: Auto Commit Enable + description: If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer + type: boolean + default: true + allowManualCommit: + title: Allow Manual Commit + description: Whether to allow doing manual commits + type: boolean + default: false + pollOnError: + title: Poll On Error Behavior + description: What to do if kafka threw an exception while polling for new messages. There are 5 enums and the value can be one of DISCARD, ERROR_HANDLER, RECONNECT, RETRY, STOP + type: string + default: "ERROR_HANDLER" + autoOffsetReset: + title: Auto Offset Reset + description: What to do when there is no initial offset. There are 3 enums and the value can be one of latest, earliest, none + type: string + default: "latest" + x-descriptors: + - urn:keda:metadata:offsetResetPolicy + consumerGroup: + title: Consumer Group + description: A string that uniquely identifies the group of consumers to which this source belongs + type: string + example: "my-group-id" + x-descriptors: + - urn:keda:metadata:consumerGroup + - urn:keda:required + deserializeHeaders: + title: Automatically Deserialize Headers + description: When enabled the Kamelet source will deserialize all message headers to String representation. + type: boolean + default: true + batchSize: + title: Batch Dimension + description: The maximum number of records returned in a single call to poll() + type: int + default: 500 + pollTimeout: + title: Poll Timeout Interval + description: The timeout used when polling the KafkaConsumer + type: int + default: 5000 + dependencies: + - "mvn:org.apache.camel.kamelets:camel-kamelets-utils:4.4.0-SNAPSHOT" + - "camel:core" + - "camel:kafka" + - "camel:kamelet" + template: + beans: + - name: kafkaHeaderDeserializer + type: "#class:org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer" + properties: + enabled: '{{deserializeHeaders}}' + - name: manualCommitFactory + type: "#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory" + from: + uri: "kafka:{{topic}}" + parameters: + brokers: "{{?bootstrapServers}}" + securityProtocol: "{{securityProtocol}}" + saslMechanism: "{{saslMechanism}}" + saslJaasConfig: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="{{user}}" password="{{password}}";' + autoCommitEnable: "{{autoCommitEnable}}" + allowManualCommit: "{{allowManualCommit}}" + pollOnError: "{{pollOnError}}" + autoOffsetReset: "{{autoOffsetReset}}" + groupId: "{{?consumerGroup}}" + maxPollRecords: "{{batchSize}}" + pollTimeoutMs: "{{pollTimeout}}" + batching: true + kafkaManualCommitFactory: "#bean:{{manualCommitFactory}}" + steps: + - process: + ref: "{{kafkaHeaderDeserializer}}" + - to: "kamelet:sink" From dc743b6b65a0e1c86a8e913bc2673acfcbdc90b7 Mon Sep 17 00:00:00 2001 From: Andrea Cosentino Date: Tue, 30 Jan 2024 15:18:19 +0100 Subject: [PATCH 3/5] Support Kafka Batch as Kamelet source - Kafka Scram Source as Batch Signed-off-by: Andrea Cosentino --- docs/modules/ROOT/nav.adoc | 1 + kamelets/kafka-batch-scram-source.kamelet.yaml | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/modules/ROOT/nav.adoc b/docs/modules/ROOT/nav.adoc index 320cd5c13..0bced69d1 100644 --- a/docs/modules/ROOT/nav.adoc +++ b/docs/modules/ROOT/nav.adoc @@ -133,6 +133,7 @@ * xref:kafka-azure-schema-registry-source.adoc[] * xref:kafka-batch-manual-commit-action.adoc[] * xref:kafka-batch-not-secured-source.adoc[] +* xref:kafka-scram-source.adoc[] * xref:kafka-batch-source.adoc[] * xref:kafka-manual-commit-action.adoc[] * xref:kafka-not-secured-sink.adoc[] diff --git a/kamelets/kafka-batch-scram-source.kamelet.yaml b/kamelets/kafka-batch-scram-source.kamelet.yaml index 2046a3354..0d5e65809 100644 --- a/kamelets/kafka-batch-scram-source.kamelet.yaml +++ b/kamelets/kafka-batch-scram-source.kamelet.yaml @@ -17,7 +17,7 @@ apiVersion: camel.apache.org/v1 kind: Kamelet metadata: - name: kafka-scram-source + name: kafka-batch-scram-source annotations: camel.apache.org/kamelet.support.level: "Preview" camel.apache.org/catalog.version: "4.4.0-SNAPSHOT" @@ -32,9 +32,9 @@ metadata: camel.apache.org/kamelet.type: "source" spec: definition: - title: "Kafka Scram Source" + title: "Kafka Batch Scram Source" description: |- - Receive data from Kafka topics through SCRAM login module. + Receive data from Kafka topics in batch through SCRAM login module and commit them manually through KafkaManualCommit.. required: - topic - bootstrapServers From 9934bf9584459224b2c00621b0b86ee88d9271e6 Mon Sep 17 00:00:00 2001 From: Andrea Cosentino Date: Tue, 30 Jan 2024 15:19:18 +0100 Subject: [PATCH 4/5] Support Kafka Batch as Kamelet source - Kafka Scram Source as Batch Signed-off-by: Andrea Cosentino --- .../kamelets/kafka-batch-scram-source.kamelet.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/library/camel-kamelets/src/main/resources/kamelets/kafka-batch-scram-source.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/kafka-batch-scram-source.kamelet.yaml index 2046a3354..0d5e65809 100644 --- a/library/camel-kamelets/src/main/resources/kamelets/kafka-batch-scram-source.kamelet.yaml +++ b/library/camel-kamelets/src/main/resources/kamelets/kafka-batch-scram-source.kamelet.yaml @@ -17,7 +17,7 @@ apiVersion: camel.apache.org/v1 kind: Kamelet metadata: - name: kafka-scram-source + name: kafka-batch-scram-source annotations: camel.apache.org/kamelet.support.level: "Preview" camel.apache.org/catalog.version: "4.4.0-SNAPSHOT" @@ -32,9 +32,9 @@ metadata: camel.apache.org/kamelet.type: "source" spec: definition: - title: "Kafka Scram Source" + title: "Kafka Batch Scram Source" description: |- - Receive data from Kafka topics through SCRAM login module. + Receive data from Kafka topics in batch through SCRAM login module and commit them manually through KafkaManualCommit.. required: - topic - bootstrapServers From 9e6a6ad39961cbc8b9c92046df40b5e8b2caa1b5 Mon Sep 17 00:00:00 2001 From: Andrea Cosentino Date: Tue, 30 Jan 2024 15:19:43 +0100 Subject: [PATCH 5/5] Support Kafka Batch as Kamelet source - Kafka Scram Source as Batch Signed-off-by: Andrea Cosentino --- docs/modules/ROOT/nav.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/modules/ROOT/nav.adoc b/docs/modules/ROOT/nav.adoc index 0bced69d1..4ac973134 100644 --- a/docs/modules/ROOT/nav.adoc +++ b/docs/modules/ROOT/nav.adoc @@ -133,7 +133,7 @@ * xref:kafka-azure-schema-registry-source.adoc[] * xref:kafka-batch-manual-commit-action.adoc[] * xref:kafka-batch-not-secured-source.adoc[] -* xref:kafka-scram-source.adoc[] +* xref:kafka-batch-scram-source.adoc[] * xref:kafka-batch-source.adoc[] * xref:kafka-manual-commit-action.adoc[] * xref:kafka-not-secured-sink.adoc[]