From 65c77d1c5f6875b369c133d24bb17c994ae09d4a Mon Sep 17 00:00:00 2001 From: Luigi De Masi Date: Tue, 7 Dec 2021 19:18:05 +0100 Subject: [PATCH] #76: Add Pulsar source Kamelet --- .../assets/images/kamelets/pulsar-source.svg | 100 ++++++++++ docs/modules/ROOT/nav.adoc | 1 + docs/modules/ROOT/pages/pulsar-source.adoc | 175 ++++++++++++++++++ kamelets/pulsar-source.kamelet.yaml | 169 +++++++++++++++++ .../kamelets/pulsar-source.kamelet.yaml | 169 +++++++++++++++++ .../camel-k/pulsar-source-binding.yaml | 22 +++ .../bindings/core/pulsar-source-binding.yaml | 13 ++ 7 files changed, 649 insertions(+) create mode 100644 docs/modules/ROOT/assets/images/kamelets/pulsar-source.svg create mode 100644 docs/modules/ROOT/pages/pulsar-source.adoc create mode 100755 kamelets/pulsar-source.kamelet.yaml create mode 100644 library/camel-kamelets/src/main/resources/kamelets/pulsar-source.kamelet.yaml create mode 100644 templates/bindings/camel-k/pulsar-source-binding.yaml create mode 100644 templates/bindings/core/pulsar-source-binding.yaml diff --git a/docs/modules/ROOT/assets/images/kamelets/pulsar-source.svg b/docs/modules/ROOT/assets/images/kamelets/pulsar-source.svg new file mode 100644 index 000000000..3fc97fc65 --- /dev/null +++ b/docs/modules/ROOT/assets/images/kamelets/pulsar-source.svg @@ -0,0 +1,100 @@ + + + + + + + + + + Asset 2 + + + + + + + + + + + + + + + Asset 2 + + + + diff --git a/docs/modules/ROOT/nav.adoc b/docs/modules/ROOT/nav.adoc index c47dc770e..abd079192 100644 --- a/docs/modules/ROOT/nav.adoc +++ b/docs/modules/ROOT/nav.adoc @@ -113,6 +113,7 @@ * xref:ROOT:predicate-filter-action.adoc[image:kamelets/predicate-filter-action.svg[] Predicate Filter Action] * xref:ROOT:protobuf-deserialize-action.adoc[image:kamelets/protobuf-deserialize-action.svg[] Protobuf Deserialize Action] * xref:ROOT:protobuf-serialize-action.adoc[image:kamelets/protobuf-serialize-action.svg[] Protobuf Serialize Action] +* xref:ROOT:pulsar-source.adoc[image:kamelets/pulsar-source.svg[] Pulsar Source] * xref:ROOT:rabbitmq-source.adoc[image:kamelets/rabbitmq-source.svg[] RabbitMQ Source] * xref:ROOT:redis-sink.adoc[image:kamelets/redis-sink.svg[] Redis Sink] * xref:ROOT:redis-source.adoc[image:kamelets/redis-source.svg[] Redis Source] diff --git a/docs/modules/ROOT/pages/pulsar-source.adoc b/docs/modules/ROOT/pages/pulsar-source.adoc new file mode 100644 index 000000000..4aced930b --- /dev/null +++ b/docs/modules/ROOT/pages/pulsar-source.adoc @@ -0,0 +1,175 @@ +// THIS FILE IS AUTOMATICALLY GENERATED: DO NOT EDIT + += image:kamelets/pulsar-source.svg[] Pulsar Source + +*Provided by: "Apache Software Foundation"* + +*Support Level for this Kamelet is: "Preview"* + +Receive data from Pulsar topics. + +== Configuration Options + +The following table summarizes the configuration options available for the `pulsar-source` Kamelet: +[width="100%",cols="2,^2,3,^2,^2,^3",options="header"] +|=== +| Property| Name| Description| Type| Default| Example +| *namespaceName {empty}* *| Pulsar Namespace Name| The Pulsar Namespace Name| string| | +| *serviceUrl {empty}* *| Service URL| The Pulsar Service URL to point while creating the client from URI.| string| | +| *tenant {empty}* *| Tenant Name| The Tenant Name| string| | +| *topic {empty}* *| Topic Name| The topic name or regexp| string| | +| *topicType {empty}* *| Topic Type| The topic type. Possible values are: persistent or non-persistent| string| | +| authenticationClass| Authentication Class| The Authentication FQCN to be used while creating the client from URI.| string| | +| authenticationParams| Authentication Params| The Authentication Parameters to be used while creating the client from URI.| string| | +| consumerNamePrefix| Consumer Name Prefix| Prefix to add to consumer names when a SHARED or FAILOVER subscription is used| string| `"cons"`| +| consumerQueueSize| Consumer Queue Size| Size of the consumer queue| int| `10`| +| deadLetterTopic| Dead Letter Topic| Name of the topic where the messages which fail maxRedeliverCount times will be sent. Note: if not set, default topic name will be topicName-subscriptionName-DLQ.| int| | +| maxRedeliverCount| Maximum Redelivery Count| Maximum number of times that a message will be redelivered before being sent to the dead letter queue. If this value is not set, no Dead Letter Policy will be created.| int| | +| messageListener| Message Listener| Whether to use the messageListener interface, or to receive messages using a separate thread pool.| boolean| `true`| +| negativeAckRedeliveryDelayMicros| Negative Ack Redelivery Delay in Microseconds| Set the negative acknowledgement delay.| long| `60000000`| +| numberOfConsumerThreads| Number Of Consumer Threads| Number of threads to receive and handle messages when using a separate thread pool.| int| `1`| +| numberOfConsumers| Number Of Consumers| Number of consumers.| int| `1`| +| readCompacted| Read Compacted| Enable compacted topic reading.| boolean| `false`| +| subscriptionInitialPosition| Subscription Initial Position| Control the initial position in the topic of a newly created subscription. Default is latest message.Possible values: EARLIEST or LATEST| string| `"LATEST"`| +| subscriptionName| Subscription Name| Name of the subscription to use.| string| `"subs"`| +| subscriptionTopicsMode| Subscription Topics Mode| Determines to which topics this consumer should be subscribed to - Persistent, Non-Persistent, or both. Only used with pattern subscriptions.Possible values: PersistentOnly, NonPersistentOnly, AllTopics| string| `"PersistentOnly"`| +| subscriptionType| Subscription Type| Type of the subscription. Possible values: EXCLUSIVE, SHARED, FAILOVER, KEY_SHARED| string| `"EXCLUSIVE"`| +| topicsPattern| Topic Pattern| Whether the topic is a pattern (regular expression) that allows the consumer to subscribe to all matching topics in the namespace.| boolean| `false`| +|=== + +NOTE: Fields marked with an asterisk ({empty}*) are mandatory. + + +== Dependencies + +At runtime, the `pulsar-source` Kamelet relies upon the presence of the following dependencies: + +- camel:pulsar +- camel:kamelet +- camel:core + +== Usage + +This section describes how you can use the `pulsar-source`. + +=== Knative Source + +You can use the `pulsar-source` Kamelet as a Knative source by binding it to a Knative object. + +.pulsar-source-binding.yaml +[source,yaml] +---- +apiVersion: camel.apache.org/v1alpha1 +kind: KameletBinding +metadata: + name: pulsar-source-binding +spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: pulsar-source + properties: + namespaceName: "The Pulsar Namespace Name" + serviceUrl: "The Service URL" + tenant: "The Tenant Name" + topic: "The Topic Name" + topicType: "The Topic Type" + sink: + ref: + kind: Channel + apiVersion: messaging.knative.dev/v1 + name: mychannel + +---- + +==== *Prerequisite* + +You have xref:{camel-k-version}@camel-k::installation/installation.adoc[Camel K installed] on the cluster. + +==== *Procedure for using the cluster CLI* + +. Save the `pulsar-source-binding.yaml` file to your local drive, and then edit it as needed for your configuration. + +. Run the source by using the following command: ++ +[source,shell] +---- +kubectl apply -f pulsar-source-binding.yaml +---- + +==== *Procedure for using the Kamel CLI* + +Configure and run the source by using the following command: + +[source,shell] +---- +kamel bind pulsar-source -p "source.namespaceName=The Pulsar Namespace Name" -p "source.serviceUrl=The Service URL" -p "source.tenant=The Tenant Name" -p "source.topic=The Topic Name" -p "source.topicType=The Topic Type" channel:mychannel +---- + +This command creates the KameletBinding in the current namespace on the cluster. + +=== Kafka Source + +You can use the `pulsar-source` Kamelet as a Kafka source by binding it to a Kafka topic. + +.pulsar-source-binding.yaml +[source,yaml] +---- +apiVersion: camel.apache.org/v1alpha1 +kind: KameletBinding +metadata: + name: pulsar-source-binding +spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: pulsar-source + properties: + namespaceName: "The Pulsar Namespace Name" + serviceUrl: "The Service URL" + tenant: "The Tenant Name" + topic: "The Topic Name" + topicType: "The Topic Type" + sink: + ref: + kind: KafkaTopic + apiVersion: kafka.strimzi.io/v1beta1 + name: my-topic + +---- + +==== *Prerequisites* + +* You've installed https://strimzi.io/[Strimzi]. +* You've created a topic named `my-topic` in the current namespace. +* You have xref:{camel-k-version}@camel-k::installation/installation.adoc[Camel K installed] on the cluster. + +==== *Procedure for using the cluster CLI* + +. Save the `pulsar-source-binding.yaml` file to your local drive, and then edit it as needed for your configuration. + +. Run the source by using the following command: ++ +[source,shell] +---- +kubectl apply -f pulsar-source-binding.yaml +---- + +==== *Procedure for using the Kamel CLI* + +Configure and run the source by using the following command: + +[source,shell] +---- +kamel bind pulsar-source -p "source.namespaceName=The Pulsar Namespace Name" -p "source.serviceUrl=The Service URL" -p "source.tenant=The Tenant Name" -p "source.topic=The Topic Name" -p "source.topicType=The Topic Type" kafka.strimzi.io/v1beta1:KafkaTopic:my-topic +---- + +This command creates the KameletBinding in the current namespace on the cluster. + +== Kamelet source file + +https://github.com/apache/camel-kamelets/blob/main/kamelets/pulsar-source.kamelet.yaml + +// THIS FILE IS AUTOMATICALLY GENERATED: DO NOT EDIT diff --git a/kamelets/pulsar-source.kamelet.yaml b/kamelets/pulsar-source.kamelet.yaml new file mode 100755 index 000000000..c54f58f0a --- /dev/null +++ b/kamelets/pulsar-source.kamelet.yaml @@ -0,0 +1,169 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- +apiVersion: camel.apache.org/v1alpha1 +kind: Kamelet +metadata: + name: pulsar-source + annotations: + camel.apache.org/kamelet.support.level: "Preview" + camel.apache.org/catalog.version: "main-SNAPSHOT" + camel.apache.org/kamelet.icon: "" + camel.apache.org/provider: "Apache Software Foundation" + camel.apache.org/kamelet.group: "Pulsar" + labels: + camel.apache.org/kamelet.type: "source" +spec: + dependencies: + - "camel:pulsar" + - "camel:kamelet" + - "camel:core" + definition: + title: "Pulsar Source" + description: "Receive data from Pulsar topics." + required: + - topicType + - topic + - namespaceName + - tenant + - serviceUrl + properties: + topic: + title: Topic Name + description: The topic name or regexp + type: string + tenant: + title: Tenant Name + description: The Tenant Name + type: string + topicType: + title: Topic Type + description: "The topic type. Possible values are: persistent or non-persistent" + type: string + namespaceName: + title: Pulsar Namespace Name + description: The Pulsar Namespace Name + type: string + serviceUrl: + title: Service URL + description: The Pulsar Service URL to point while creating the client from URI. + type: string + authenticationClass: + title: Authentication Class + description: The Authentication FQCN to be used while creating the client from URI. + type: string + authenticationParams: + title: Authentication Params + description: The Authentication Parameters to be used while creating the client from URI. + type: string + consumerNamePrefix: + title: Consumer Name Prefix + description: Prefix to add to consumer names when a SHARED or FAILOVER subscription is used + type: string + default: cons + consumerQueueSize: + title: Consumer Queue Size + description: Size of the consumer queue + type: int + default: 10 + deadLetterTopic: + title: Dead Letter Topic + description: "Name of the topic where the messages which fail maxRedeliverCount times will be sent. Note: if not set, default topic name will be topicName-subscriptionName-DLQ." + type: int + maxRedeliverCount: + title: Maximum Redelivery Count + description: "Maximum number of times that a message will be redelivered before being sent to the dead letter queue. If this value is not set, no Dead Letter Policy will be created." + type: int + negativeAckRedeliveryDelayMicros: + title: Negative Ack Redelivery Delay in Microseconds + description: "Set the negative acknowledgement delay." + type: long + default: 60000000 + messageListener: + title: Message Listener + description: "Whether to use the messageListener interface, or to receive messages using a separate thread pool." + type: boolean + default: true + x-descriptors: + - 'urn:alm:descriptor:com.tectonic.ui:checkbox' + numberOfConsumers: + title: Number Of Consumers + description: "Number of consumers." + type: int + default: 1 + numberOfConsumerThreads: + title: Number Of Consumer Threads + description: "Number of threads to receive and handle messages when using a separate thread pool." + type: int + default: 1 + readCompacted: + title: Read Compacted + description: "Enable compacted topic reading." + type: boolean + default: false + x-descriptors: + - 'urn:alm:descriptor:com.tectonic.ui:checkbox' + subscriptionInitialPosition: + title: Subscription Initial Position + description: "Control the initial position in the topic of a newly created subscription. Default is latest message.Possible values: EARLIEST or LATEST" + type: string + default: LATEST + subscriptionName: + title: Subscription Name + description: "Name of the subscription to use." + type: string + default: subs + subscriptionTopicsMode: + title: Subscription Topics Mode + description: "Determines to which topics this consumer should be subscribed to - Persistent, Non-Persistent, or both. Only used with pattern subscriptions.Possible values: PersistentOnly, NonPersistentOnly, AllTopics" + type: string + default: PersistentOnly + subscriptionType: + title: Subscription Type + description: "Type of the subscription. Possible values: EXCLUSIVE, SHARED, FAILOVER, KEY_SHARED" + type: string + default: EXCLUSIVE + topicsPattern: + title: Topic Pattern + description: "Whether the topic is a pattern (regular expression) that allows the consumer to subscribe to all matching topics in the namespace." + type: boolean + default: false + x-descriptors: + - 'urn:alm:descriptor:com.tectonic.ui:checkbox' + type: object + flow: + from: + uri: pulsar:{{topicType}}/{{tenant}}/{{namespaceName}}/{{topic}} + parameters: + serviceUrl: "{{serviceUrl}}" + authenticationClass: "{{?authenticationClass}}" + authenticationParams: "{{?authenticationParams}}" + consumerNamePrefix: "{{?consumerNamePrefix}}" + consumerQueueSize: "{{?consumerQueueSize}}" + deadLetterTopic: "{{?deadLetterTopic}}" + maxRedeliverCount: "{{?maxRedeliverCount}}" + negativeAckRedeliveryDelayMicros: "{{?negativeAckRedeliveryDelayMicros}}" + messageListener: "{{?messageListener}}" + numberOfConsumers: "{{?numberOfConsumers}}" + numberOfConsumerThreads: "{{?numberOfConsumerThreads}}" + readCompacted: "{{?readCompacted}}" + subscriptionInitialPosition: "{{?subscriptionInitialPosition}}" + subscriptionName: "{{?subscriptionName}}" + subscriptionTopicsMode: "{{?subscriptionTopicsMode}}" + subscriptionType: "{{?subscriptionType}}" + topicsPattern: "{{?topicsPattern}}" + steps: + - to: "kamelet:sink" diff --git a/library/camel-kamelets/src/main/resources/kamelets/pulsar-source.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/pulsar-source.kamelet.yaml new file mode 100644 index 000000000..c54f58f0a --- /dev/null +++ b/library/camel-kamelets/src/main/resources/kamelets/pulsar-source.kamelet.yaml @@ -0,0 +1,169 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- +apiVersion: camel.apache.org/v1alpha1 +kind: Kamelet +metadata: + name: pulsar-source + annotations: + camel.apache.org/kamelet.support.level: "Preview" + camel.apache.org/catalog.version: "main-SNAPSHOT" + camel.apache.org/kamelet.icon: "" + camel.apache.org/provider: "Apache Software Foundation" + camel.apache.org/kamelet.group: "Pulsar" + labels: + camel.apache.org/kamelet.type: "source" +spec: + dependencies: + - "camel:pulsar" + - "camel:kamelet" + - "camel:core" + definition: + title: "Pulsar Source" + description: "Receive data from Pulsar topics." + required: + - topicType + - topic + - namespaceName + - tenant + - serviceUrl + properties: + topic: + title: Topic Name + description: The topic name or regexp + type: string + tenant: + title: Tenant Name + description: The Tenant Name + type: string + topicType: + title: Topic Type + description: "The topic type. Possible values are: persistent or non-persistent" + type: string + namespaceName: + title: Pulsar Namespace Name + description: The Pulsar Namespace Name + type: string + serviceUrl: + title: Service URL + description: The Pulsar Service URL to point while creating the client from URI. + type: string + authenticationClass: + title: Authentication Class + description: The Authentication FQCN to be used while creating the client from URI. + type: string + authenticationParams: + title: Authentication Params + description: The Authentication Parameters to be used while creating the client from URI. + type: string + consumerNamePrefix: + title: Consumer Name Prefix + description: Prefix to add to consumer names when a SHARED or FAILOVER subscription is used + type: string + default: cons + consumerQueueSize: + title: Consumer Queue Size + description: Size of the consumer queue + type: int + default: 10 + deadLetterTopic: + title: Dead Letter Topic + description: "Name of the topic where the messages which fail maxRedeliverCount times will be sent. Note: if not set, default topic name will be topicName-subscriptionName-DLQ." + type: int + maxRedeliverCount: + title: Maximum Redelivery Count + description: "Maximum number of times that a message will be redelivered before being sent to the dead letter queue. If this value is not set, no Dead Letter Policy will be created." + type: int + negativeAckRedeliveryDelayMicros: + title: Negative Ack Redelivery Delay in Microseconds + description: "Set the negative acknowledgement delay." + type: long + default: 60000000 + messageListener: + title: Message Listener + description: "Whether to use the messageListener interface, or to receive messages using a separate thread pool." + type: boolean + default: true + x-descriptors: + - 'urn:alm:descriptor:com.tectonic.ui:checkbox' + numberOfConsumers: + title: Number Of Consumers + description: "Number of consumers." + type: int + default: 1 + numberOfConsumerThreads: + title: Number Of Consumer Threads + description: "Number of threads to receive and handle messages when using a separate thread pool." + type: int + default: 1 + readCompacted: + title: Read Compacted + description: "Enable compacted topic reading." + type: boolean + default: false + x-descriptors: + - 'urn:alm:descriptor:com.tectonic.ui:checkbox' + subscriptionInitialPosition: + title: Subscription Initial Position + description: "Control the initial position in the topic of a newly created subscription. Default is latest message.Possible values: EARLIEST or LATEST" + type: string + default: LATEST + subscriptionName: + title: Subscription Name + description: "Name of the subscription to use." + type: string + default: subs + subscriptionTopicsMode: + title: Subscription Topics Mode + description: "Determines to which topics this consumer should be subscribed to - Persistent, Non-Persistent, or both. Only used with pattern subscriptions.Possible values: PersistentOnly, NonPersistentOnly, AllTopics" + type: string + default: PersistentOnly + subscriptionType: + title: Subscription Type + description: "Type of the subscription. Possible values: EXCLUSIVE, SHARED, FAILOVER, KEY_SHARED" + type: string + default: EXCLUSIVE + topicsPattern: + title: Topic Pattern + description: "Whether the topic is a pattern (regular expression) that allows the consumer to subscribe to all matching topics in the namespace." + type: boolean + default: false + x-descriptors: + - 'urn:alm:descriptor:com.tectonic.ui:checkbox' + type: object + flow: + from: + uri: pulsar:{{topicType}}/{{tenant}}/{{namespaceName}}/{{topic}} + parameters: + serviceUrl: "{{serviceUrl}}" + authenticationClass: "{{?authenticationClass}}" + authenticationParams: "{{?authenticationParams}}" + consumerNamePrefix: "{{?consumerNamePrefix}}" + consumerQueueSize: "{{?consumerQueueSize}}" + deadLetterTopic: "{{?deadLetterTopic}}" + maxRedeliverCount: "{{?maxRedeliverCount}}" + negativeAckRedeliveryDelayMicros: "{{?negativeAckRedeliveryDelayMicros}}" + messageListener: "{{?messageListener}}" + numberOfConsumers: "{{?numberOfConsumers}}" + numberOfConsumerThreads: "{{?numberOfConsumerThreads}}" + readCompacted: "{{?readCompacted}}" + subscriptionInitialPosition: "{{?subscriptionInitialPosition}}" + subscriptionName: "{{?subscriptionName}}" + subscriptionTopicsMode: "{{?subscriptionTopicsMode}}" + subscriptionType: "{{?subscriptionType}}" + topicsPattern: "{{?topicsPattern}}" + steps: + - to: "kamelet:sink" diff --git a/templates/bindings/camel-k/pulsar-source-binding.yaml b/templates/bindings/camel-k/pulsar-source-binding.yaml new file mode 100644 index 000000000..6be279d07 --- /dev/null +++ b/templates/bindings/camel-k/pulsar-source-binding.yaml @@ -0,0 +1,22 @@ +apiVersion: camel.apache.org/v1alpha1 +kind: KameletBinding +metadata: + name: pulsar-source-binding +spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: pulsar-source + properties: + namespaceName: "The Pulsar Namespace Name" + serviceUrl: "The Service URL" + tenant: "The Tenant Name" + topic: "The Topic Name" + topicType: "The Topic Type" + sink: + ref: + kind: KafkaTopic + apiVersion: kafka.strimzi.io/v1beta1 + name: my-topic + \ No newline at end of file diff --git a/templates/bindings/core/pulsar-source-binding.yaml b/templates/bindings/core/pulsar-source-binding.yaml new file mode 100644 index 000000000..37ce79d5d --- /dev/null +++ b/templates/bindings/core/pulsar-source-binding.yaml @@ -0,0 +1,13 @@ +- route: + from: + uri: "kamelet:pulsar-source" + parameters: + namespaceName: "The Pulsar Namespace Name" + serviceUrl: "The Service URL" + tenant: "The Tenant Name" + topic: "The Topic Name" + topicType: "The Topic Type" + steps: + - to: + uri: "log:info" + \ No newline at end of file