Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Camel Rest Kafka Connector - Issue with Kubernetes Rest connector. #1171

Closed
qafro1 opened this issue May 13, 2021 · 73 comments
Closed

Camel Rest Kafka Connector - Issue with Kubernetes Rest connector. #1171

qafro1 opened this issue May 13, 2021 · 73 comments

Comments

@qafro1
Copy link

qafro1 commented May 13, 2021

I have an issue creating a CAMEL REST KAFKA-CONNECTOR in k8s and I just want to ingress REST API payload in Kafka strimzi.
Any ideas of the resolve and I'll be able to test the Kafka connector.

kind: KafkaConnector
apiVersion: kafka.strimzi.io/v1alpha1
metadata:
  name: rest-source
  namespace: kafka
  labels:
    # must match connect cluster name
    strimzi.io/cluster: my-connect
spec:
  tasksMax: 1
  class: org.apache.camel.kafkaconnector.rest.CamelRestSourceConnector
  config:
    topics: my-topic
    camel.source.path.method: get
    camel.source.path.path: http://api.open-notify.org/astros.json
    camel.source.endpoint.produces: application/json
    camel.source.endpoint.description: Calls API open notify

Error msg:

Connector Status:
    Connector:
      State:      RUNNING
      worker_id:  xxx.xx.x.x:8083
    Name:         rest-source
    Tasks:
      Id:     0
      State:  FAILED
      Trace:  org.apache.kafka.connect.errors.ConnectException: Failed to create and start Camel context
              at org.apache.camel.kafkaconnector.CamelSourceTask.start(CamelSourceTask.java:144)
              at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
              at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
              at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
              at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
              at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
              at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.camel.FailedToStartRouteException: Failed to start route route3 because of null
  at org.apache.camel.impl.engine.RouteService.warmUp(RouteService.java:122)
  at org.apache.camel.impl.engine.InternalRouteStartupManager.doWarmUpRoutes(InternalRouteStartupManager.java:270)
  at org.apache.camel.impl.engine.InternalRouteStartupManager.safelyStartRouteServices(InternalRouteStartupManager.java:157)
  at org.apache.camel.impl.engine.InternalRouteStartupManager.doStartOrResumeRoutes(InternalRouteStartupManager.java:115)
  at org.apache.camel.impl.engine.AbstractCamelContext.doStartCamel(AbstractCamelContext.java:2889)
  at org.apache.camel.impl.engine.AbstractCamelContext.doStartContext(AbstractCamelContext.java:2702)
  at org.apache.camel.impl.engine.AbstractCamelContext.doStart(AbstractCamelContext.java:2665)
  at org.apache.camel.support.service.BaseService.start(BaseService.java:115)
  at org.apache.camel.impl.engine.AbstractCamelContext.start(AbstractCamelContext.java:2431)
  at org.apache.camel.main.SimpleMain.doStart(SimpleMain.java:43)
  at org.apache.camel.support.service.BaseService.start(BaseService.java:115)
  at org.apache.camel.kafkaconnector.CamelSourceTask.start(CamelSourceTask.java:141)
  ... 8 more
Caused by: java.lang.IllegalStateException: Cannot find RestConsumerFactory in Registry or as a Component to use
  at org.apache.camel.component.rest.RestEndpoint.createConsumer(RestEndpoint.java:602)
  at org.apache.camel.impl.engine.DefaultRoute.addServices(DefaultRoute.java:575)
  at org.apache.camel.impl.engine.DefaultRoute.onStartingServices(DefaultRoute.java:160)
  at org.apache.camel.impl.engine.RouteService.doWarmUp(RouteService.java:150)
  at org.apache.camel.impl.engine.RouteService.warmUp(RouteService.java:120)
@oscerd oscerd changed the title CAMEL REST KAFKA-CONNECTOR SOURCE CONFIGURATION - Issue with Kubernetes Rest connector. Camel Rest Kafka Connector - Issue with Kubernetes Rest connector. May 14, 2021
@oscerd
Copy link
Contributor

oscerd commented May 14, 2021

You need to add one of the following components in the connector unzipped folder:
https://camel.apache.org/manual/latest/rest-dsl.html#_components_supporting_rest_dsl

This should solve the 'Cannot find RestConsumerFactory in Registry or as a Component to use'

@qafro1
Copy link
Author

qafro1 commented May 14, 2021

Please wherein the kind: KafkaConnector script do I add the components in the connector unzipped and can you show me a sample in the k8s script above?

@oscerd
Copy link
Contributor

oscerd commented May 14, 2021

How are you deploying the connector?

@qafro1
Copy link
Author

qafro1 commented May 14, 2021

After deploying strimzi Kafka connect cluster then deploy kafka connector
kubectl apply -f kafka-connector-rest-source.yaml

Please how do I add does "..._components_supporting_rest_dsl " into the script?

@oscerd
Copy link
Contributor

oscerd commented May 14, 2021

How do you include the connector in your strimzi Kafka connect instance? You should have added it somewhere.

@qafro1
Copy link
Author

qafro1 commented May 14, 2021

I followed the doc from strimzi for the sample than for my usage I just need to ingress REST API payload via Camel Rest Kafka Connector.
https://strimzi.io/blog/2021/03/29/connector-build/

@oscerd
Copy link
Contributor

oscerd commented May 14, 2021

you need to add one of the component listed in the documentation in the zip declared in the Strimzi example. It will be unzipped with the connector. This is for making it start

I don't think by the way that this is component you're looking for. The Rest component will expose the Rest service, but to consume from it you'll need to invoke the service through a different component, for example a timer or http.

@qafro1
Copy link
Author

qafro1 commented May 14, 2021

Argh - Please which Kafka connector will do the REST API source into Kafka connect?

@oscerd
Copy link
Contributor

oscerd commented May 14, 2021

I would not use any connector for doing this. I would do a plain camel route

from(timer).to("http://rest-api").to("kafka")

@qafro1
Copy link
Author

qafro1 commented May 14, 2021

Therefore, I should use camel-timer-kafka-connector with from(timer).to("http://rest-api").to("kafka")

@oscerd
Copy link
Contributor

oscerd commented May 14, 2021

No, you should use camel main project.

Https://github.com/apache/camel

@oscerd
Copy link
Contributor

oscerd commented May 14, 2021

What do you mean with ingress? So you mean ingress kubernetes resource? It could be it is just me, but i don't quite get your use case from your description. Maybe elaborate a bit.

@qafro1
Copy link
Author

qafro1 commented May 14, 2021

I'll explain the project to data input from REST API into Kafka (strimzi) in k8s.
Producer API > Kafka connector > Kafka cluster > customers (Spark)
I've created the Kafka cluster & Kafka connect but I just need the right Kafka connector to input the REST API payload. using k8s.
Sample : https://youtu.be/CemgJcJMufI
https://youtu.be/QqF2bp30VZk

@oscerd
Copy link
Contributor

oscerd commented May 14, 2021

You want to invoke a REST endpoint and send the response to Kafka cluster right?

@qafro1
Copy link
Author

qafro1 commented May 14, 2021

Yes - REST endpoint http://api.open-notify.org/astros.json into Kafka cluster

@oscerd
Copy link
Contributor

oscerd commented May 14, 2021

So, I would do it in plain camel or through camel-k with a kamelet, I wouldn't use the camel-kafka-connector in this case.

https://camel.apache.org/docs/

@qafro1
Copy link
Author

qafro1 commented May 14, 2021

So I'm confused here, so I only need Apache Camel K as a producer into Kafka?

@oscerd
Copy link
Contributor

oscerd commented May 14, 2021

Yes, because in this moment, it's not supported to invoke a timer, then another endpoint and then ingest to kafka, through camel-kafka-connector. It will be supported when we'll support kamelet even in camel-kafka-connector (work in progress).

So, yes, at this point I would do that through camel-k.

I can also create a sample if needed.

@qafro1
Copy link
Author

qafro1 commented May 14, 2021

Please can you create a sample REST endpoint http://api.open-notify.org/astros.json into kafka bootstrapServers: my-cluster-kafka-bootstrap:9092

@oscerd
Copy link
Contributor

oscerd commented May 14, 2021

I can create a kamelet invoking that endpoint every 5 seconds or whatever and write the response to a kafka topic in your cluster.

@qafro1
Copy link
Author

qafro1 commented May 14, 2021

Yes Please

@oscerd
Copy link
Contributor

oscerd commented May 14, 2021

I'll try to do this next week.

@qafro1
Copy link
Author

qafro1 commented May 14, 2021

Thank you and also please can you include API endpoint that requires username and password or tokens

@oscerd
Copy link
Contributor

oscerd commented May 14, 2021

Not sure about this, you'll need something that works, for the auth part or other part, you can do by yourself after the example.

@qafro1
Copy link
Author

qafro1 commented May 14, 2021

no problem .. just the general task

@qafro1
Copy link
Author

qafro1 commented May 16, 2021

Please update me on the progress?

@oscerd
Copy link
Contributor

oscerd commented May 16, 2021

I'll work on this when I have time. Probably next week. Not sure.

@oscerd
Copy link
Contributor

oscerd commented May 17, 2021

@oscerd
Copy link
Contributor

oscerd commented May 17, 2021

I'm closing this, because I think it's not related to ckc anymore.

@qafro1
Copy link
Author

qafro1 commented May 18, 2021

last part

% kubectl -n kafka run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.23.0-kafka-2.8.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic test-topic --from-beginning
If you don't see a command prompt, try pressing enter.
[2021-05-18 10:18:37,218] WARN [Consumer clientId=consumer-console-consumer-1815-1, groupId=console-consumer-1815] Error while fetching metadata with correlation id 2 : {test-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

@oscerd
Copy link
Contributor

oscerd commented May 18, 2021

Please follow the instructions. You need to install the Kamelet files before the flow binding file and all must be done in the same namespace of strimzi. So Kafka.

It's in the readme.

@oscerd
Copy link
Contributor

oscerd commented May 18, 2021

I updated the readme for clarity

@qafro1
Copy link
Author

qafro1 commented May 18, 2021

It worked, so the instructions it to install in namespace : kafka

  1. kamel install -n kafka
  2. install kafka-not-secured-sink.yaml
  3. install flow-binding.yaml
  4. Check logs kamel logs timer-http-kafka -n kafka

Therefore, I need to install them both in order?

@oscerd
Copy link
Contributor

oscerd commented May 18, 2021

point 2 and 3 are in the kafka namespace. To run the binding you'll need the kafka source, the order is not important, but you need both.

@qafro1
Copy link
Author

qafro1 commented May 18, 2021

Thank you - Is there anything I need to config to make it in production to a real REST API endpoint?

@oscerd
Copy link
Contributor

oscerd commented May 18, 2021

This will work with any non-authenticated http url

@qafro1
Copy link
Author

qafro1 commented May 18, 2021

Am trying to get this authentication HTTPS URL to work, but am unsure if am using the correct properties for username and password.
Sample fake REST API endpoint: HTTPS://.....
username: fake test
password: fake test
The Real API works in Postman with the authentication method but how do I config username and password for the REST API endpoint?
SAMPLE config

apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
  name: timer-http-kafka
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1alpha1
      name: timer-source
    properties:
      period: "5000"
      message: '"Invocation"'
  steps:
  - ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1alpha1
      name: http-sink
    properties:
      url: "https://earthquake.usgs.gov/earthquakes/eventpage/ci39808832"
      #username: "test"
      #password: "test" 
      method: "get"

  sink:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1alpha1
      name: kafka-not-secured-sink
    properties:
      brokers: 'my-cluster-kafka-bootstrap:9092'
      topic: 'test-topic'

@oscerd
Copy link
Contributor

oscerd commented May 19, 2021

You may need to modify this: https://github.com/apache/camel-kamelets/blob/main/http-sink.kamelet.yaml

and add the required parameters listed here: https://camel.apache.org/components/3.9.x/http-component.html

authMethod, authUsername and authPassword and modify the flow-binding accordingly.

In the installation you'll need to do kubectl apply -f http-sink.kamelet.yaml -n kafka too.

@qafro1
Copy link
Author

qafro1 commented May 19, 2021

Am I on the right track? - WIP

apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
  name: http-sink
  annotations:
    camel.apache.org/kamelet.icon: "data:image/svg+xml;base64,PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0iVVRGLTgiPz4KPHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHdpZHRoPSI0MjAiCmhlaWdodD0iNDIwIiBzdHJva2U9IiMwMDAiIGZpbGw9Im5vbmUiPgo8cGF0aCBzdHJva2Utd2lkdGg9IjI2IgpkPSJNMjA5LDE1YTE5NSwxOTUgMCAxLDAgMiwweiIvPgo8cGF0aCBzdHJva2Utd2lkdGg9IjE4IgpkPSJtMjEwLDE1djM5MG0xOTUtMTk1SDE1TTU5LDkwYTI2MCwyNjAgMCAwLDAgMzAyLDAgbTAsMjQwIGEyNjAsMjYwIDAgMCwwLTMwMiwwTTE5NSwyMGEyNTAsMjUwIDAgMCwwIDAsMzgyIG0zMCwwIGEyNTAsMjUwIDAgMCwwIDAtMzgyIi8+Cjwvc3ZnPg=="
    camel.apache.org/provider: "Apache Software Foundation"
    camel.apache.org/kamelet.group: "HTTP"
  labels:
    camel.apache.org/kamelet.type: sink
spec:
  definition:
    title: HTTP Sink
    description: Forwards an event to a HTTP endpoint
    required:
    - url
    type: object
    properties:
      url:
        title: URL
        description: The URL to send data to
        type: string
        example: "https://zzzz/api/api_journey"
        pattern: "^(http|https)://.*"
      method:
        title: Method
        description: The HTTP method to use
        type: string
        default: GET
      authMethod:
        title:
        description: Authentication methods allowed to use as a comma separated list of values Basic, Digest or NTLM.
        type: string
        default:
      authUsername:
        title:
        description: Authentication username
        type: String
        default:
      authPassword:
        title:
        description: Authentication password
        type: String
        default:
  dependencies:
  - "camel:http"
  - "camel:kamelet"
  - "camel:core"
  flow:
    from:
      uri: kamelet:source
      steps:
      - set-header:
          name: CamelHttpMethod
          constant: "{{method}}"
      - to: "{{url}}"``

@oscerd
Copy link
Contributor

oscerd commented May 19, 2021

yes but in the

  • to: "{{url}}"``

You need to specify also the new parameters

  • to: "{{url}}"``
    parameters:
    authMethod: "{{authMethod}}"
    authUsername: "{{authUsername}}"
    authPassword: "{{authPassword}}"

Then you'll need to specify this parameters into the flow-binding in http-sink part

@qafro1
Copy link
Author

qafro1 commented May 19, 2021

ERROR [org.apa.cam.qua.mai.CamelMainRuntime] (main) Failed to start application

apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
  name: timer-http-kafka
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1alpha1
      name: timer-source
    properties:
      period: "5000"
      message: '"Invocation"'
  steps:
  - ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1alpha1
      name: http-sink
    properties:
      url: "https://zzzzz/api/api_journey"
      authMethod: GET 
      authUsername: "test"
      authPassword: "test"

  sink:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1alpha1
      name: kafka-not-secured-sink
    properties:
      brokers: 'my-cluster-kafka-bootstrap:9092'
      topic: 'test-topic'
apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
  name: http-sink
  annotations:
    camel.apache.org/kamelet.icon: "data:image/svg+xml;base64,PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0iVVRGLTgiPz4KPHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHdpZHRoPSI0MjAiCmhlaWdodD0iNDIwIiBzdHJva2U9IiMwMDAiIGZpbGw9Im5vbmUiPgo8cGF0aCBzdHJva2Utd2lkdGg9IjI2IgpkPSJNMjA5LDE1YTE5NSwxOTUgMCAxLDAgMiwweiIvPgo8cGF0aCBzdHJva2Utd2lkdGg9IjE4IgpkPSJtMjEwLDE1djM5MG0xOTUtMTk1SDE1TTU5LDkwYTI2MCwyNjAgMCAwLDAgMzAyLDAgbTAsMjQwIGEyNjAsMjYwIDAgMCwwLTMwMiwwTTE5NSwyMGEyNTAsMjUwIDAgMCwwIDAsMzgyIG0zMCwwIGEyNTAsMjUwIDAgMCwwIDAtMzgyIi8+Cjwvc3ZnPg=="
    camel.apache.org/provider: "Apache Software Foundation"
    camel.apache.org/kamelet.group: "HTTP"
  labels:
    camel.apache.org/kamelet.type: sink
spec:
  definition:
    title: HTTP Sink
    description: Forwards an event to a HTTP endpoint
    required:
      - url
    type: object
    properties:
      url:
        title: URL
        description: The URL to send data to
        type: string
        example: "https://zzzzz/api/api_journey"
        pattern: "^(http|https)://.*"
      authMethod:
        title: The HTTP method to use
        description: Authentication methods allowed to use as a comma separated list of values Basic, Digest or NTLM.
        type: string
      authUsername:
        title: The username to use
        description: Authentication username
        type: String
      authPassword:
        title: The password to use
        description: Authentication password
        type: String
  dependencies:
    - "camel:http"
    - "camel:kamelet"
    - "camel:core"
  flow:
    from:
      uri: kamelet:source
      steps:
        - set-header:
            name: CamelHttpMethod
            constant: "{{method}}"
        - to: "{{url}}"
          parameters:
            authMethod: "{{authMethod}}"
            authUsername: "{{authUsername}}"
            authPassword: "{{authPassword}}"
kamel logs timer-http-kafka -n kafka
Integration 'timer-http-kafka' is now running. Showing log ...
[1] Monitoring pod timer-http-kafka-64f7dfb684-xphp8
[1] exec java -cp ./resources:/etc/camel/application.properties:/etc/camel/conf.d/user.properties:/etc/camel/resources:/etc/camel/sources/camel-k-embedded-flow.yaml:/etc/camel/sources/http-sink.yaml:/etc/camel/sources/kafka-not-secured-sink.yaml:/etc/camel/sources/timer-source.yaml:dependencies/app/camel-k-integration-1.4.0.jar:dependencies/lib/boot/io.quarkus.quarkus-bootstrap-runner-1.13.0.Final.jar:dependencies/lib/boot/io.quarkus.quarkus-development-mode-spi-1.13.0.Final.jar:dependencies/lib/boot/io.smallrye.common.smallrye-common-io-1.5.0.jar:dependencies/lib/boot/org.graalvm.sdk.graal-sdk-21.0.0.jar:dependencies/lib/boot/org.jboss.logging.jboss-logging-3.4.1.Final.jar:dependencies/lib/boot/org.jboss.logmanager.jboss-logmanager-embedded-1.0.9.jar:dependencies/lib/boot/org.wildfly.common.wildfly-common-1.5.4.Final-format-001.jar:dependencies/lib/main/com.fasterxml.jackson.core.jackson-annotations-2.12.1.jar:dependencies/lib/main/com.fasterxml.jackson.core.jackson-core-2.12.1.jar:dependencies/lib/main/com.fasterxml.jackson.core.jackson-databind-2.12.1.jar:dependencies/lib/main/com.github.ben-manes.caffeine.caffeine-2.9.0.jar:dependencies/lib/main/com.github.luben.zstd-jni-1.4.5-6.jar:dependencies/lib/main/com.google.errorprone.error_prone_annotations-2.2.0.jar:dependencies/lib/main/com.sun.activation.javax.activation-1.2.0.jar:dependencies/lib/main/commons-codec.commons-codec-1.14.jar:dependencies/lib/main/io.quarkus.arc.arc-1.13.0.Final.jar:dependencies/lib/main/io.quarkus.quarkus-arc-1.13.0.Final.jar:dependencies/lib/main/io.quarkus.quarkus-caffeine-1.13.0.Final.jar:dependencies/lib/main/io.quarkus.quarkus-core-1.13.0.Final.jar:dependencies/lib/main/io.quarkus.quarkus-kafka-client-1.13.0.Final.jar:dependencies/lib/main/io.smallrye.common.smallrye-common-annotation-1.5.0.jar:dependencies/lib/main/io.smallrye.common.smallrye-common-classloader-1.5.0.jar:dependencies/lib/main/io.smallrye.common.smallrye-common-constraint-1.5.0.jar:dependencies/lib/main/io.smallrye.common.smallrye-common-expression-1.5.0.jar:dependencies/lib/main/io.smallrye.common.smallrye-common-function-1.5.0.jar:dependencies/lib/main/io.smallrye.config.smallrye-config-1.11.1.jar:dependencies/lib/main/io.smallrye.config.smallrye-config-common-1.11.1.jar:dependencies/lib/main/jakarta.annotation.jakarta.annotation-api-1.3.5.jar:dependencies/lib/main/jakarta.el.jakarta.el-api-3.0.3.jar:dependencies/lib/main/jakarta.enterprise.jakarta.enterprise.cdi-api-2.0.2.jar:dependencies/lib/main/jakarta.inject.jakarta.inject-api-1.0.jar:dependencies/lib/main/jakarta.interceptor.jakarta.interceptor-api-1.2.5.jar:dependencies/lib/main/jakarta.servlet.jakarta.servlet-api-4.0.3.jar:dependencies/lib/main/jakarta.transaction.jakarta.transaction-api-1.3.3.jar:dependencies/lib/main/org.apache.camel.camel-api-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-attachments-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-base-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-base-engine-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-bean-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-cloud-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-cluster-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-componentdsl-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-core-catalog-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-core-engine-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-core-languages-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-core-model-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-core-processor-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-core-reifier-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-endpointdsl-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-file-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-http-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-http-base-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-http-common-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-kafka-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-kamelet-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-main-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-management-api-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-microprofile-config-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-support-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-timer-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-tooling-model-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-util-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-util-json-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-yaml-dsl-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-yaml-dsl-common-3.9.0.jar:dependencies/lib/main/org.apache.camel.camel-yaml-dsl-deserializers-3.9.0.jar:dependencies/lib/main/org.apache.camel.k.camel-k-core-1.7.0.jar:dependencies/lib/main/org.apache.camel.k.camel-k-core-api-1.7.0.jar:dependencies/lib/main/org.apache.camel.k.camel-k-core-support-1.7.0.jar:dependencies/lib/main/org.apache.camel.k.camel-k-loader-yaml-1.7.0.jar:dependencies/lib/main/org.apache.camel.k.camel-k-loader-yaml-impl-1.7.0.jar:dependencies/lib/main/org.apache.camel.k.camel-k-runtime-1.7.0.jar:dependencies/lib/main/org.apache.camel.quarkus.camel-quarkus-bean-1.8.1.jar:dependencies/lib/main/org.apache.camel.quarkus.camel-quarkus-core-1.8.1.jar:dependencies/lib/main/org.apache.camel.quarkus.camel-quarkus-http-1.8.1.jar:dependencies/lib/main/org.apache.camel.quarkus.camel-quarkus-http-common-1.8.1.jar:dependencies/lib/main/org.apache.camel.quarkus.camel-quarkus-kafka-1.8.1.jar:dependencies/lib/main/org.apache.camel.quarkus.camel-quarkus-kamelet-1.8.1.jar:dependencies/lib/main/org.apache.camel.quarkus.camel-quarkus-support-common-1.8.1.jar:dependencies/lib/main/org.apache.camel.quarkus.camel-quarkus-support-commons-logging-1.8.1.jar:dependencies/lib/main/org.apache.camel.quarkus.camel-quarkus-support-httpclient-1.8.1.jar:dependencies/lib/main/org.apache.camel.quarkus.camel-quarkus-timer-1.8.1.jar:dependencies/lib/main/org.apache.camel.quarkus.camel-quarkus-yaml-dsl-1.8.1.jar:dependencies/lib/main/org.apache.httpcomponents.httpclient-4.5.13.jar:dependencies/lib/main/org.apache.httpcomponents.httpcore-4.4.14.jar:dependencies/lib/main/org.apache.kafka.kafka-clients-2.7.0.jar:dependencies/lib/main/org.eclipse.microprofile.config.microprofile-config-api-1.4.jar:dependencies/lib/main/org.eclipse.microprofile.context-propagation.microprofile-context-propagation-api-1.0.1.jar:dependencies/lib/main/org.jboss.logging.commons-logging-jboss-logging-1.0.0.Final.jar:dependencies/lib/main/org.jboss.logging.jboss-logging-annotations-2.2.0.Final.jar:dependencies/lib/main/org.jboss.slf4j.slf4j-jboss-logmanager-1.1.0.Final.jar:dependencies/lib/main/org.jboss.threads.jboss-threads-3.2.0.Final.jar:dependencies/lib/main/org.lz4.lz4-java-1.7.1.jar:dependencies/lib/main/org.slf4j.slf4j-api-1.7.30.jar:dependencies/lib/main/org.snakeyaml.snakeyaml-engine-2.2.1.jar:dependencies/lib/main/org.xerial.snappy.snappy-java-1.1.7.7.jar:dependencies/quarkus-app-dependencies.txt:dependencies/quarkus-run.jar:dependencies/quarkus/generated-bytecode.jar:dependencies/quarkus/quarkus-application.dat io.quarkus.bootstrap.runner.QuarkusEntryPoint
[1] 2021-05-19 14:36:20,808 INFO  [org.apa.cam.k.Runtime] (main) Apache Camel K Runtime 1.7.0
[1] 2021-05-19 14:36:20,849 INFO  [org.apa.cam.qua.cor.CamelBootstrapRecorder] (main) bootstrap runtime: org.apache.camel.quarkus.main.CamelMainRuntime
[1] 2021-05-19 14:36:20,960 INFO  [org.apa.cam.k.lis.SourcesConfigurer] (main) Loading routes from: SourceDefinition{name='http-sink', language='yaml', type='template', propertyNames='[authMethod, authPassword, authUsername, method, url]', location='file:/etc/camel/sources/http-sink.yaml', }
[1] 2021-05-19 14:36:21,006 INFO  [org.apa.cam.k.lis.SourcesConfigurer] (main) Loading routes from: SourceDefinition{name='kafka-not-secured-sink', language='yaml', type='template', propertyNames='[brokers, topic]', location='file:/etc/camel/sources/kafka-not-secured-sink.yaml', }
[1] 2021-05-19 14:36:21,006 INFO  [org.apa.cam.k.lis.SourcesConfigurer] (main) Loading routes from: SourceDefinition{name='timer-source', language='yaml', type='template', propertyNames='[message, period]', location='file:/etc/camel/sources/timer-source.yaml', }
[1] 2021-05-19 14:36:21,007 INFO  [org.apa.cam.k.lis.SourcesConfigurer] (main) Loading routes from: SourceDefinition{name='camel-k-embedded-flow', language='yaml', location='file:/etc/camel/sources/camel-k-embedded-flow.yaml', }
[1] 2021-05-19 14:36:21,210 ERROR [org.apa.cam.qua.mai.CamelMainRuntime] (main) Failed to start application: org.snakeyaml.engine.v2.exceptions.YamlEngineException: org.apache.camel.dsl.yaml.common.exception.YamlDeserializationException: Unable to find constructor for node: <org.snakeyaml.engine.v2.nodes.MappingNode (tag=tag:yaml.org,2002:map, values={ key=<org.snakeyaml.engine.v2.nodes.ScalarNode (tag=tag:yaml.org,2002:str, value=parameters)>; value=1658511941 }{ key=<org.snakeyaml.engine.v2.nodes.ScalarNode (tag=tag:yaml.org,2002:str, value=to)>; value=<NodeTuple keyNode=<org.snakeyaml.engine.v2.nodes.ScalarNode (tag=tag:yaml.org,2002:str, value=to)>; valueNode=<org.snakeyaml.engine.v2.nodes.ScalarNode (tag=tag:yaml.org,2002:str, value={{url}})>> })>

Any ideas?

@oscerd
Copy link
Contributor

oscerd commented May 19, 2021

Try with

      - to:
          uri: "{{url}}"
          parameters: 
            authMethod: "{{authMethod}}"
            authUsername: "{{authUsername}}"
            authPassword: "{{authPassword}}"

@qafro1
Copy link
Author

qafro1 commented May 19, 2021

1] 2021-05-19 15:01:13,992 ERROR [org.apa.cam.qua.mai.CamelMainRuntime] (main) Failed to start application: org.snakeyaml.engine.v2.exceptions.YamlEngineException: org.apache.camel.dsl.yaml.common.exception.YamlDeserializationException: Unable to find constructor for node: <org.snakeyaml.engine.v2.nodes.MappingNode (tag=tag:yaml.org,2002:map, values={ key=<org.snakeyaml.engine.v2.nodes.ScalarNode (tag=tag:yaml.org,2002:str, value=parameters)>; value=1264941544 }{ key=<org.snakeyaml.engine.v2.nodes.ScalarNode (tag=tag:yaml.org,2002:str, value=to)>; value=<NodeTuple keyNode=<org.snakeyaml.engine.v2.nodes.ScalarNode (tag=tag:yaml.org,2002:str, value=to)>; valueNode=<org.snakeyaml.engine.v2.nodes.ScalarNode (tag=tag:yaml.org,2002:str, value={{url}})>> })>

@oscerd
Copy link
Contributor

oscerd commented May 19, 2021

It seems is not indented correctly.

@oscerd
Copy link
Contributor

oscerd commented May 19, 2021

Also the type for username and password should be "string"

@qafro1
Copy link
Author

qafro1 commented May 19, 2021

Here's the code..

apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
  name: http-sink
  annotations:
    camel.apache.org/kamelet.icon: "data:image/svg+xml;base64,PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0iVVRGLTgiPz4KPHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHdpZHRoPSI0MjAiCmhlaWdodD0iNDIwIiBzdHJva2U9IiMwMDAiIGZpbGw9Im5vbmUiPgo8cGF0aCBzdHJva2Utd2lkdGg9IjI2IgpkPSJNMjA5LDE1YTE5NSwxOTUgMCAxLDAgMiwweiIvPgo8cGF0aCBzdHJva2Utd2lkdGg9IjE4IgpkPSJtMjEwLDE1djM5MG0xOTUtMTk1SDE1TTU5LDkwYTI2MCwyNjAgMCAwLDAgMzAyLDAgbTAsMjQwIGEyNjAsMjYwIDAgMCwwLTMwMiwwTTE5NSwyMGEyNTAsMjUwIDAgMCwwIDAsMzgyIG0zMCwwIGEyNTAsMjUwIDAgMCwwIDAtMzgyIi8+Cjwvc3ZnPg=="
    camel.apache.org/provider: "Apache Software Foundation"
    camel.apache.org/kamelet.group: "HTTP"
  labels:
    camel.apache.org/kamelet.type: sink
spec:
  definition:
    title: HTTP Sink
    description: Forwards an event to a HTTP endpoint
    required:
      - url
    type: object
    properties:
      url:
        title: URL
        description: The URL to send data to
        type: string
        example: "https://zzzzzz/api/api_journey"
        pattern: "^(http|https)://.*"
      authMethod:
        title: The HTTP method to use
        description: Authentication methods allowed to use as a comma separated list of values Basic, Digest or NTLM.
        type: string
      authUsername:
        title: The username to use
        description: Authentication username
        type: String
      authPassword:
        title: The password to use
        description: Authentication password
        type: String
  dependencies:
    - "camel:http"
    - "camel:kamelet"
    - "camel:core"
  flow:
    from:
      uri: kamelet:source
      steps:
        - set-header:
            name: CamelHttpMethod
            constant: "{{method}}"
        - to:
            uri: "{{url}}"
            parameters:
              authMethod: "{{authMethod}}"
              authUsername: "{{authUsername}}"
              authPassword: "{{authPassword}"

@oscerd
Copy link
Contributor

oscerd commented May 19, 2021

Did you modified the flow binding too?

@qafro1
Copy link
Author

qafro1 commented May 19, 2021

Modified code


apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
  name: timer-http-kafka
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1alpha1
      name: timer-source
    properties:
      period: "5000"
      message: '"Invocation"'
  steps:
    - ref:
        kind: Kamelet
        apiVersion: camel.apache.org/v1alpha1
        name: http-sink
      properties:
        url: "https://zzzzz/api/api_journey"
        authMethod: GET
        authUsername: "test"
        authPassword: "test"

  sink:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1alpha1
      name: kafka-not-secured-sink
    properties:
      brokers: "my-cluster-kafka-bootstrap:9092"
      topic: "test-topic"

@oscerd
Copy link
Contributor

oscerd commented May 19, 2021

I'll try to reproduce, but i think is on your side

@oscerd
Copy link
Contributor

oscerd commented May 20, 2021

The http sink seems to be wrong:

This is the correct one. Method was missing and you were passing GET as authentication method.

apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
  name: http-sink-kamelet
  annotations:
    camel.apache.org/kamelet.icon: "data:image/svg+xml;base64,PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0iVVRGLTgiPz4KPHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHdpZHRoPSI0MjAiCmhlaWdodD0iNDIwIiBzdHJva2U9IiMwMDAiIGZpbGw9Im5vbmUiPgo8cGF0aCBzdHJva2Utd2lkdGg9IjI2IgpkPSJNMjA5LDE1YTE5NSwxOTUgMCAxLDAgMiwweiIvPgo8cGF0aCBzdHJva2Utd2lkdGg9IjE4IgpkPSJtMjEwLDE1djM5MG0xOTUtMTk1SDE1TTU5LDkwYTI2MCwyNjAgMCAwLDAgMzAyLDAgbTAsMjQwIGEyNjAsMjYwIDAgMCwwLTMwMiwwTTE5NSwyMGEyNTAsMjUwIDAgMCwwIDAsMzgyIG0zMCwwIGEyNTAsMjUwIDAgMCwwIDAtMzgyIi8+Cjwvc3ZnPg=="
    camel.apache.org/provider: "Apache Software Foundation"
    camel.apache.org/kamelet.group: "HTTP"
  labels:
    camel.apache.org/kamelet.type: sink
spec:
  definition:
    title: HTTP Sink
    description: Forwards an event to a HTTP endpoint
    required:
      - url
    type: object
    properties:
      url:
        title: URL
        description: The URL to send data to
        type: string
        example: "https://zzzzzz/api/api_journey"
        pattern: "^(http|https)://.*"
      method:
        title: Method
        description: The HTTP method to use
        type: string
        default: POST
      authMethod:
        title: The HTTP method to use
        description: Authentication methods allowed to use as a comma separated list of values Basic, Digest or NTLM.
        type: string
      authUsername:
        title: The username to use
        description: Authentication username
        type: String
      authPassword:
        title: The password to use
        description: Authentication password
        type: String
  dependencies:
    - "camel:http"
    - "camel:kamelet"
    - "camel:core"
  flow:
    from:
      uri: kamelet:source
      steps:
        - set-header:
            name: CamelHttpMethod
            constant: "{{method}}"
        - to:
            uri: "{{url}}"
            parameters:
              authMethod: "{{authMethod}}"
              authUsername: "{{authUsername}}"
              authPassword: "{{authPassword}}"

This is the flow binding

apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
  name: timer-http-kafka
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1alpha1
      name: timer-source
    properties:
      period: "5000"
      message: '"Invocation "'
  steps:
    - ref:
        kind: Kamelet
        apiVersion: camel.apache.org/v1alpha1
        name: http-sink-kamelet
      properties:
        url: "http://httpbin.org/basic-auth/pippo/pippo"
        method: GET
        authMethod: BASIC
        authUsername: "pippo"
        authPassword: "pippo"

  sink:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1alpha1
      name: kafka-not-secured-sink
    properties:
      brokers: "my-cluster-kafka-bootstrap:9092"
      topic: "test-topic"

@qafro1
Copy link
Author

qafro1 commented May 20, 2021

After installing :
Install camel-k on the kafka namespace

kamel install -n kafka

Run the following commands

kubectl apply -f kafka-not-secured-sink.yaml -n kafka kubectl apply -f http-sink.kamelet.yaml -n kafka kubectl apply -f flow-binding.yaml -n kafka

Check logs

kamel logs timer-http-kafka -n kafka

camel-support-commons-logging, camel-support-httpclient, camel-timer, camel-yaml-dsl, cdi]
[1] 2021-05-20 10:29:02,154 INFO  [org.apa.kaf.cli.Metadata] (kafka-producer-network-thread | producer-1) [Producer clientId=producer-1] Cluster ID: KhYIVrx9S3CaKCOtONrWdg
[1] 2021-05-20 10:29:03,980 ERROR [org.apa.cam.pro.err.DefaultErrorHandler] (Camel (camel-1) thread #0 - timer://tick) Failed delivery for (MessageId: AAB47C0F94531D6-0000000000000000 on ExchangeId: AAB47C0F94531D6-0000000000000000). Exhausted after delivery attempt: 1 caught: org.apache.camel.http.common.HttpOperationFailedException: HTTP operation failed invoking http://httpbin.org/basic-auth/pippo/pippo with statusCode: 401
[1] 
[1] Message History (complete message history is disabled)
[1] ---------------------------------------------------------------------------------------------------------------------------------------
[1] RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[1] [source            ] [source            ] [from[timer://tick?period=5000]                                                ] [      1314]
[1]     ...
[1] [action-0          ] [to4               ] [{{url}}                                                                       ] [         0]
[1] 
[1] Stacktrace
[1] ---------------------------------------------------------------------------------------------------------------------------------------
[1] : org.apache.camel.http.common.HttpOperationFailedException: HTTP operation failed invoking http://httpbin.org/basic-auth/pippo/pippo with statusCode: 401
[1]     at org.apache.camel.component.http.HttpProducer.populateHttpOperationFailedException(HttpProducer.java:419)
[1]     at org.apache.camel.component.http.HttpProducer.process(HttpProducer.java:291)
[1]     at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66)
[1]     at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172)
[1]     at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:439)
[1]     at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181)
[1]     at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:62)
[1]     at org.apache.camel.processor.Pipeline.process(Pipeline.java:167)
[1]     at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:388)
[1]     at org.apache.camel.component.timer.TimerConsumer.sendTimerExchange(TimerConsumer.java:209)
[1]     at org.apache.camel.component.timer.TimerConsumer$1.run(TimerConsumer.java:76)
[1]     at java.base/java.util.TimerThread.mainLoop(Timer.java:556)
[1]     at java.base/java.util.TimerThread.run(Timer.java:506)
[1] 

@oscerd
Copy link
Contributor

oscerd commented May 20, 2021

It doesn't seems you changed the flow binding. Please paste what you're deploying. But also, try to check why something is failing. If we're going ahead in this way, I'll solve the problem for you, but you won't be able to use this approach for anything else. So maybe read a bit of documentation. Don't copy/paste stuff just to make things work.

@qafro1
Copy link
Author

qafro1 commented May 20, 2021

I understand I'll study more on the doc, but is the kafka-not-secured-sink file a generic kemel for kafka, it's function?

here's flow-binding.yaml

apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
  name: timer-http-kafka
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1alpha1
      name: timer-source
    properties:
      period: "5000"
      message: '"Invocation "'
  steps:
    - ref:
        kind: Kamelet
        apiVersion: camel.apache.org/v1alpha1
        name: http-sink
      properties:
        url: "http://httpbin.org/basic-auth/pippo/pippo"
        method: GET
        authMethod: BASIC
        authUsername: "pippo"
        authPassword: "pippo"

  sink:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1alpha1
      name: kafka-not-secured-sink
    properties:
      brokers: "my-cluster-kafka-bootstrap:9092"
      topic: "test-topic"

@qafro1
Copy link
Author

qafro1 commented May 20, 2021

http-sink.kamelet.yaml

apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
  name: http-sink-kamelet
  annotations:
    camel.apache.org/kamelet.icon: "data:image/svg+xml;base64,PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0iVVRGLTgiPz4KPHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHdpZHRoPSI0MjAiCmhlaWdodD0iNDIwIiBzdHJva2U9IiMwMDAiIGZpbGw9Im5vbmUiPgo8cGF0aCBzdHJva2Utd2lkdGg9IjI2IgpkPSJNMjA5LDE1YTE5NSwxOTUgMCAxLDAgMiwweiIvPgo8cGF0aCBzdHJva2Utd2lkdGg9IjE4IgpkPSJtMjEwLDE1djM5MG0xOTUtMTk1SDE1TTU5LDkwYTI2MCwyNjAgMCAwLDAgMzAyLDAgbTAsMjQwIGEyNjAsMjYwIDAgMCwwLTMwMiwwTTE5NSwyMGEyNTAsMjUwIDAgMCwwIDAsMzgyIG0zMCwwIGEyNTAsMjUwIDAgMCwwIDAtMzgyIi8+Cjwvc3ZnPg=="
    camel.apache.org/provider: "Apache Software Foundation"
    camel.apache.org/kamelet.group: "HTTP"
  labels:
    camel.apache.org/kamelet.type: sink
spec:
  definition:
    title: HTTP Sink
    description: Forwards an event to a HTTP endpoint
    required:
      - url
    type: object
    properties:
      url:
        title: URL
        description: The URL to send data to
        type: string
        example: "https://zzzzzz/api/api_journey"
        pattern: "^(http|https)://.*"
      method:
        title: Method
        description: The HTTP method to use
        type: string
        default: POST
      authMethod:
        title: The HTTP method to use
        description: Authentication methods allowed to use as a comma separated list of values Basic, Digest or NTLM.
        type: string
      authUsername:
        title: The username to use
        description: Authentication username
        type: String
      authPassword:
        title: The password to use
        description: Authentication password
        type: String
  dependencies:
    - "camel:http"
    - "camel:kamelet"
    - "camel:core"
  flow:
    from:
      uri: kamelet:source
      steps:
        - set-header:
            name: CamelHttpMethod
            constant: "{{method}}"
        - to:
            uri: "{{url}}"
            parameters:
              authMethod: "{{authMethod}}"
              authUsername: "{{authUsername}}"
              authPassword: "{{authPassword}}"

@qafro1
Copy link
Author

qafro1 commented May 20, 2021

kafka-not-secured-sink.kamelet.yaml


apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
  name: kafka-not-secured-sink
  annotations:
    camel.apache.org/kamelet.icon: "data:image/svg+xml;base64,PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0idXRmLTgiPz4NCjwhLS0gR2VuZXJhdG9yOiBBZG9iZSBJbGx1c3RyYXRvciAxOS4wLjAsIFNWRyBFeHBvcnQgUGx1Zy1JbiAuIFNWRyBWZXJzaW9uOiA2LjAwIEJ1aWxkIDApICAtLT4NCjxzdmcgdmVyc2lvbj0iMS4xIiBpZD0iTGF5ZXJfMSIgeG1sbnM9Imh0dHA6Ly93d3cudzMub3JnLzIwMDAvc3ZnIiB4bWxuczp4bGluaz0iaHR0cDovL3d3dy53My5vcmcvMTk5OS94bGluayIgeD0iMHB4IiB5PSIwcHgiDQoJIHZpZXdCb3g9IjAgMCA1MDAgNTAwIiBzdHlsZT0iZW5hYmxlLWJhY2tncm91bmQ6bmV3IDAgMCA1MDAgNTAwOyIgeG1sOnNwYWNlPSJwcmVzZXJ2ZSI+DQo8ZyBpZD0iWE1MSURfMV8iPg0KCTxwYXRoIGlkPSJYTUxJRF85XyIgZD0iTTMxNC44LDI2OS43Yy0xNC4yLDAtMjcsNi4zLTM1LjcsMTYuMkwyNTYuOCwyNzBjMi40LTYuNSwzLjctMTMuNiwzLjctMjAuOWMwLTcuMi0xLjMtMTQuMS0zLjYtMjAuNg0KCQlsMjIuMy0xNS43YzguNyw5LjksMjEuNCwxNi4xLDM1LjYsMTYuMWMyNi4yLDAsNDcuNi0yMS4zLDQ3LjYtNDcuNnMtMjEuMy00Ny42LTQ3LjYtNDcuNnMtNDcuNiwyMS4zLTQ3LjYsNDcuNg0KCQljMCw0LjcsMC43LDkuMiwyLDEzLjVsLTIyLjMsMTUuN2MtOS4zLTExLjYtMjIuOC0xOS42LTM4LjEtMjIuMXYtMjYuOWMyMS42LTQuNSwzNy44LTIzLjcsMzcuOC00Ni42YzAtMjYuMi0yMS4zLTQ3LjYtNDcuNi00Ny42DQoJCWMtMjYuMiwwLTQ3LjYsMjEuMy00Ny42LDQ3LjZjMCwyMi42LDE1LjgsNDEuNSwzNi45LDQ2LjN2MjcuM2MtMjguOCw1LjEtNTAuOCwzMC4yLTUwLjgsNjAuNWMwLDMwLjQsMjIuMiw1NS43LDUxLjIsNjAuNXYyOC44DQoJCWMtMjEuMyw0LjctMzcuNCwyMy43LTM3LjQsNDYuNGMwLDI2LjIsMjEuMyw0Ny42LDQ3LjYsNDcuNmMyNi4yLDAsNDcuNi0yMS4zLDQ3LjYtNDcuNmMwLTIyLjctMTYtNDEuOC0zNy40LTQ2LjR2LTI4LjgNCgkJYzE1LTIuNSwyOC4yLTEwLjQsMzcuNC0yMS44bDIyLjUsMTUuOWMtMS4yLDQuMy0xLjksOC43LTEuOSwxMy40YzAsMjYuMiwyMS4zLDQ3LjYsNDcuNiw0Ny42czQ3LjYtMjEuMyw0Ny42LTQ3LjYNCgkJQzM2Mi40LDI5MSwzNDEuMSwyNjkuNywzMTQuOCwyNjkuN3ogTTMxNC44LDE1OC40YzEyLjcsMCwyMy4xLDEwLjQsMjMuMSwyMy4xYzAsMTIuNy0xMC4zLDIzLjEtMjMuMSwyMy4xcy0yMy4xLTEwLjQtMjMuMS0yMy4xDQoJCUMyOTEuOCwxNjguOCwzMDIuMSwxNTguNCwzMTQuOCwxNTguNHogTTE3NiwxMTUuMWMwLTEyLjcsMTAuMy0yMy4xLDIzLjEtMjMuMWMxMi43LDAsMjMuMSwxMC40LDIzLjEsMjMuMQ0KCQljMCwxMi43LTEwLjMsMjMuMS0yMy4xLDIzLjFDMTg2LjMsMTM4LjIsMTc2LDEyNy44LDE3NiwxMTUuMXogTTIyMi4xLDM4NC45YzAsMTIuNy0xMC4zLDIzLjEtMjMuMSwyMy4xDQoJCWMtMTIuNywwLTIzLjEtMTAuNC0yMy4xLTIzLjFjMC0xMi43LDEwLjMtMjMuMSwyMy4xLTIzLjFDMjExLjgsMzYxLjgsMjIyLjEsMzcyLjIsMjIyLjEsMzg0Ljl6IE0xOTkuMSwyODEuMw0KCQljLTE3LjcsMC0zMi4yLTE0LjQtMzIuMi0zMi4yYzAtMTcuNywxNC40LTMyLjIsMzIuMi0zMi4yYzE3LjcsMCwzMi4yLDE0LjQsMzIuMiwzMi4yQzIzMS4yLDI2Ni45LDIxNi44LDI4MS4zLDE5OS4xLDI4MS4zeg0KCQkgTTMxNC44LDM0MC4zYy0xMi43LDAtMjMuMS0xMC40LTIzLjEtMjMuMWMwLTEyLjcsMTAuMy0yMy4xLDIzLjEtMjMuMXMyMy4xLDEwLjQsMjMuMSwyMy4xQzMzNy45LDMzMCwzMjcuNSwzNDAuMywzMTQuOCwzNDAuM3oiLz4NCjwvZz4NCjwvc3ZnPg0K"
    camel.apache.org/provider: "Apache Software Foundation"
    camel.apache.org/kamelet.group: "Kafka"
  labels:
    camel.apache.org/kamelet.type: "sink"
spec:
  definition:
    title: "Kafka Not Secured Sink"
    description: |-
      Send data to Kafka topics on an insecure broker.

      The Kamelet is able to understand the following headers to be set:

      - `key` / `ce-key`: as message key
    
      - `partition-key` / `ce-partition-key`: as message partition key

      Both the headers are optional.
    required:
      - topic
      - brokers
    type: object
    properties:
      topic:
        title: Topic Names
        description: Comma separated list of Kafka topic names
        type: string
      brokers:
        title: Brokers
        description: Comma separated list of Kafka Broker URLs
        type: string
  dependencies:
    - "camel:kafka"
    - "camel:kamelet"
  flow:
    from:
      uri: "kamelet:source"
      steps:
      - choice:
          when:
          - simple: "${header[key]}"
            steps:
            - set-header:
                name: kafka.KEY
                simple: "${header[key]}"
          - simple: "${header[ce-key]}"
            steps:
            - set-header:
                name: kafka.KEY
                simple: "${header[ce-key]}"
      - choice:
          when:
          - simple: "${header[partition-key]}"
            steps:
            - set-header:
                name: kafka.PARTITION_KEY
                simple: "${header[partition-key]}"
          - simple: "${header[ce-partition-key]}"
            steps:
            - set-header:
                name: kafka.PARTITION_KEY
                simple: "${header[ce-partition-key]}"
      - to:
          uri: "kafka:{{topic}}"
          parameters:
            brokers: "{{brokers}}"

@oscerd
Copy link
Contributor

oscerd commented May 20, 2021

If you noticed in the http sink I posted the name is "http-sink-kamelet"

So the correct flow-binding is

apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
  name: timer-http-kafka
spec:
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1alpha1
      name: timer-source
    properties:
      period: "5000"
      message: '"Invocation "'
  steps:
    - ref:
        kind: Kamelet
        apiVersion: camel.apache.org/v1alpha1
        name: http-sink-kamelet
      properties:
        url: "http://httpbin.org/basic-auth/pippo/pippo"
        method: GET
        authMethod: BASIC
        authUsername: "pippo"
        authPassword: "pippo"

  sink:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1alpha1
      name: kafka-not-secured-sink
    properties:
      brokers: "my-cluster-kafka-bootstrap:9092"
      topic: "test-topic"

@oscerd
Copy link
Contributor

oscerd commented May 20, 2021

This will work if you installed the http-sink yaml I posted.

@qafro1
Copy link
Author

qafro1 commented May 20, 2021

Argh - I see and this is training on camel k for me..lol

@qafro1
Copy link
Author

qafro1 commented May 20, 2021

Awesome it works - Thank you and now flowing the structure of camel-k.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants