Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add example for Kafka kamelet consuming with Azure Schema Registry in YAML DSL #21

Closed
vedran-kolka-syntio opened this issue May 24, 2023 · 53 comments
Assignees

Comments

@vedran-kolka-syntio
Copy link
Contributor

vedran-kolka-syntio commented May 24, 2023

Hi and thanks for all the examples with running Camel Routes with JBang!

Still, I'm having trouble using beans to configure a route in YAML DSL, when a property of a Kamelet expects an object (not a string, int, ...).

The bean I am using is a class I implemented as a wrapper around DefaultAzureCredential implementing TokenCredential, because DefaultAzureCredential can only be instantiated using a builder class, which I cannot do in YAML DSL.

Use case: let's say messages on Kafka are in Avro format, and use Azure Schema Registry for schemas. To integrate with a service on which consumers only understand JSON, a Camel Route would need to read a Kafka message, deserialize it using the schema registry, convert to JSON, and send to the sink.
The deserializer expects a property schema.registry.credential of type com.azure.core.credential.TokenCredential, which I define as a bean, but can't seem to reference correctly.

These are the relevant parts of route definition:

# camel-k: dependency=mvn:my.custom.azure.credential:eventhubs:0.1

- beans:
  - name: defaultAzureCredential
    type: "#class:my.custom.azure.credential.SimpleCredential"

- route:
    id: "kafka-to-mq"
    from:
      uri: "kafka:{{kafka.kafka-to-mq-topic}}"
      parameters:
        brokers: "{{kafka.bootstrapServers}}"
        ...
        valueDeserializer: 'com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer'
        additionalProperties.schema.registry.url: '{{kafka.schema-registry-url}}'
        additionalProperties.schema.registry.credential: '{{bean:defaultAzureCredential}}'
        ...

The error when running camel run route.yaml --properties application.properties:

Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class com.azure.core.credential.TokenCredential (java.lang.String is in module java.base of loader 'bootstrap'; com.azure.core.credential.TokenCredential is in unnamed module of loader org.apache.camel.main.download.DependencyDownloaderClassLoader @1fdfafd2)

Referencing the bean like this (which works for the headerDeserializer):

additionalProperties.schema.registry.credential: '#defaultAzureCredential'

results with the same Exception.

Basically, my question is: Is it possible to use custom SerDe classes for integration with various schema registries (in this case, Azure Schema Registry), as you would in a normal Kafka consumer/producer microservice, but in Camel's YAML DSL?

@oscerd
Copy link
Contributor

oscerd commented May 24, 2023

We didn't really focus on Schema registry and in particular we never focused on Azure one. I'll try to have a look, but I don't think it is feasible in that way. Also I do think AD must be used to set everything up: https://learn.microsoft.com/en-us/azure/event-hubs/schema-registry-json-schema-kafka

@oscerd
Copy link
Contributor

oscerd commented May 24, 2023

Also it should be
'#bean:{{defaultAzureCredential}}' and not '{{bean:defaultAzureCredential}}'

@vedran-kolka-syntio
Copy link
Contributor Author

Thank you for the response.
Referencing like that also causes an Exception, but a different one:

Caused by: java.lang.IllegalArgumentException: Property with key [defaultAzureCredential] not found in properties from text: kafka:camel-eh-to-mq?additionalProperties.schema.registry.credential=#bean:{{defaultAzureCredential}}&additionalProperties.schema.registry.url=...

And do you think integration with APICurio would be feasible, if the problem is in authenticating to Azure?

@oscerd
Copy link
Contributor

oscerd commented May 24, 2023

I think it's just a matter of understanding what paremeters should be specified as additional properties even for the Azure one. I need to test the situation a bit.

@oscerd
Copy link
Contributor

oscerd commented May 24, 2023

@davsclaus I do think the additionalProperties parameter is a map so the syntax in yaml should be slightly different

@davsclaus
Copy link
Contributor

what version of camel-jbang are you using?

camel version

And would you be able to share the example (you can email me in private if so - the yaml and the JAR) that allows me quicker to investigate this.

At first hunch I suspect its the additionalProperties in camel-kafka that may need special support for dealing with # references, as its a special multi valued option in camel-karaf, but it may indicate a general problem with those kinds.

@oscerd
Copy link
Contributor

oscerd commented May 24, 2023

We have example of integration between camel kafka quarkus and apicurio, but not through kamelets and not by using beans.

@vedran-kolka-syntio
Copy link
Contributor Author

My Camel version: Camel JBang version: 3.20.4
I will email you the whole route and JAR in a minute. I really appreciate the support!

I will check out the Quarkus example, but unfortunately I have to stick with kamelets and beans.

@oscerd
Copy link
Contributor

oscerd commented May 24, 2023

Please add me to the email in cc. I'd like to have something to try.

@davsclaus
Copy link
Contributor

davsclaus commented May 25, 2023

you can also use jbang to export to a real maven/gradle based project (quarkus, spring boot, or main)

What I did with your yaml was

camel export  --gav=com.foo:acme:1.0 -dir=mycode --runtime=camel-main

and I can load it into IDEA and have classpath with the JARs I need. I had to add

# camel-k: dependency=mvn:com.azure:azure-identity:1.9.0
# camel-k: dependency=mvn:com.microsoft.azure:azure-schemaregistry-kafka-avro:1.1.1

@davsclaus
Copy link
Contributor

I have a fix in the works, and created a ticket to fix this in camel-kakfa component
https://issues.apache.org/jira/browse/CAMEL-19387

Your example can now run with the fix and I can use this syntax

# camel-k: dependency=mvn:com.azure:azure-identity:1.9.0
# camel-k: dependency=mvn:com.microsoft.azure:azure-schemaregistry-kafka-avro:1.1.1

- beans:
  - name: defaultAzureCredential
    #type: "#class:my.custom.azure.credential.SimpleCredential"
    type: "#class:com.foo.acme.MyCredential"
    properties:
      username: "scott"

- route:
    id: "kafka-to-mq"
    from:
      uri: "kafka:cheese"
      parameters:
        brokers: "localhost:1234"
        consumerRequestTimeoutMs: "1234"
        valueDeserializer: 'com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer'
        additionalProperties.schema.registry.url: 'https://mykafka.servicebus.windows.net'
        additionalProperties.schema.registry.credential: '#bean:defaultAzureCredential'
      steps:
        - to: "log:foo"

And I implemented a custom bean

package com.foo.acme;

import com.azure.core.credential.AccessToken;
import com.azure.core.credential.TokenCredential;
import com.azure.core.credential.TokenRequestContext;
import reactor.core.publisher.Mono;

public class MyCredential implements TokenCredential {

    private String username;

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    @Override
    public Mono<AccessToken> getToken(TokenRequestContext tokenRequestContext) {
        return null;
    }

    @Override
    public AccessToken getTokenSync(TokenRequestContext request) {
        return TokenCredential.super.getTokenSync(request);
    }
}

@davsclaus
Copy link
Contributor

Notice the syntax is

        additionalProperties.schema.registry.credential: '#bean:defaultAzureCredential'

@vedran-kolka-syntio
Copy link
Contributor Author

Thank you for the help!
Is that how beans should always be referenced or only in additionalProperties?

Also, will you close this issue when the fix is available for use?

@davsclaus
Copy link
Contributor

There is a CI build and if it completes, then it published the fix into a ASF snapshot repo
http://repository.apache.org/content/groups/snapshots/

The CI job (it runs daily but I kicked it now - it may fail for odd reasons)
https://ci-builds.apache.org/job/Camel/job/Camel%20Daily%20Snapshot%20Deploy/job/camel-3.20.x/37/

You can try with the fix with jbang

edit this file (is usually in that directory) .jbang/cache/urls/f20491f9ebc0e16d2fca028eb3d4f03aafdf8cd62b8af1a9d1730bfe382d8165/CamelJBang.java

And change it to use 3.20.5-SNAPSHOT for camel-bom, and camel-jbang-core as shown below:

//JAVA 11+
//REPOS mavencentral,apache-snapshot=http://repository.apache.org/content/groups/snapshots/
//DEPS org.apache.camel:camel-bom:${camel.jbang.version:3.20.5-SNASPHOT}@pom
//DEPS org.apache.camel:camel-jbang-core:${camel.jbang.version:3.20.5-SNAPSHOT}
//DEPS org.apache.camel.kamelets:camel-kamelets:${camel-kamelets.version:3.20.4}

Also you can change to use 3.20.5-SNAPSHOT version in maven projects and add that ASF snapshot repo to maven repositories in the pom.xml file
http://repository.apache.org/content/groups/snapshots/

The fix is coming in 3.20.6 / 3.21 / 4.0-RC1 etc

@oscerd oscerd self-assigned this May 25, 2023
@oscerd
Copy link
Contributor

oscerd commented May 25, 2023

We are going to add an example similar to the eventhubs one with schema registry too

@vedran-kolka-syntio
Copy link
Contributor Author

That would be perfect

@davsclaus
Copy link
Contributor

You can run this example locally with jbang today, and download a patch JAR with the fix.

Then it works with the released Camel 3.20.4

  1. Download patched JAR
    https://repository.apache.org/content/groups/snapshots/org/apache/camel/camel-kafka/3.20.5-SNAPSHOT/

Make sure to take the newest date; I am using [camel-kafka-3.20.5-20230525.085449-18.jar]
Download this JAR to the folder where you can your Camel yaml route and rename it camel-kafka-3.20.6-fix.jar

  1. Have folder with the source. Here is what I have

-rw-r--r-- 1 davsclaus staff 722 May 25 10:24 MyCredential.java
-rw-r--r-- 1 davsclaus staff 743 May 25 16:29 azure.yaml
-rw-r--r--@ 1 davsclaus staff 251101 May 25 16:27 camel-kafka-3.20.6-fix.jar

This source is in a gist at: https://gist.github.com/davsclaus/acce5925c4abce08aff5d41d7747ae44

You can now run camel with the patched JAR

camel run azure.yaml MyCredential.java camel-kafka-3.20.6-fix.jar

If you dont have other files you should not include, you can also run with camel run *

When Camel startup it logs that it has added that patched JAR

2023-05-25 16:34:05.944  INFO 32691 --- [           main] org.apache.camel.main.MainSupport   : Apache Camel (JBang) 3.20.4 is starting
2023-05-25 16:34:05.967  INFO 32691 --- [           main] org.apache.camel.main.MainSupport   : Additional jars added to classpath: camel-kafka-3.20.6-fix.jar

@vedran-kolka-syntio
Copy link
Contributor Author

The additionalProperties.schema.registry.credential works now. Thank you for the support!
Unfortunately now I have a problem with a boolean in the additionalProperties, but I will raise a new issue for that tomorrow.

@davsclaus
Copy link
Contributor

Camel 3.20.6 is expected to be released this week

@davsclaus
Copy link
Contributor

Camel 3.20.6 has been released
https://camel.apache.org/blog/2023/06/RELEASE-3.20.6/

@vedran-kolka-syntio
Copy link
Contributor Author

Thank you for the update! The issue with the credentials has been resolved so do you plan on closing the issue or are you keeping it open until an example is produced?

@davsclaus
Copy link
Contributor

@vedran-kolka-syntio - Sorry for any delayed reply. We are currently very busy and focused on bringing Apache Camel 4 to GA which is scheduled for this month.

Also its PTO for people on and off these days, so we have better time to get back to this subject towards end of september.

I think its valuable for the Camel community to have examples or snippets how to work with Azure Kafka. So any help from you or your team is appreciated.

Please if you want to help then ping back later in September. Thanks.

@oscerd
Copy link
Contributor

oscerd commented Aug 24, 2023

I'll restart working on the example soon. Probably next week. Sorry for delay but I've been busy.

@oscerd
Copy link
Contributor

oscerd commented Sep 15, 2023

The additionalProperties.schema.registry.credential works now. Thank you for the support! Unfortunately now I have a problem with a boolean in the additionalProperties, but I will raise a new issue for that tomorrow.

Is the boolean issue resolved?

@oscerd
Copy link
Contributor

oscerd commented Sep 15, 2023

Yes it is. apache/camel-kamelets#1492

@oscerd
Copy link
Contributor

oscerd commented Sep 19, 2023

I was trying to create an example, but I would like to have exactly your same settings @vedran-kolka-syntio, can you please share your full configuration for this integration? Thanks.

@vedran-kolka-syntio
Copy link
Contributor Author

Sorry for the late response, but I am currently otherwise engaged at the job. I would love to contribute the example myself, as I might have some time for the weekend. Until then, the full configuration for the route is as follows:

# camel-k: dependency=mvn:com.ibm.mq:com.ibm.mq.allclient:9.2.5.0
# camel-k: dependency=mvn:org.apache.camel.kamelets:camel-kamelets-utils:3.20.1.1
# camel-k: dependency=mvn:kolka.camel:eventhubs:0.2
# camel-k: dependency=mvn:com.microsoft.azure:azure-schemaregistry-kafka-avro:1.1.1
# camel-k: dependency=mvn:com.azure:azure-data-schemaregistry-apacheavro:1.1.4
# camel-k: dependency=mvn:com.azure:azure-identity:1.9.0

- beans:
  - name: wmqConnectionFactory
    type: "#class:com.ibm.mq.jms.MQConnectionFactory"
    properties:
      XMSC_WMQ_HOST_NAME: '{{mq.serverName}}'
      XMSC_WMQ_PORT: '{{mq.serverPort}}'
      XMSC_WMQ_CHANNEL: '{{mq.channel}}'
      XMSC_WMQ_QUEUE_MANAGER: '{{mq.queueManager}}'
      XMSC_WMQ_CONNECTION_MODE: 1
      XMSC_USERID: '{{mq.username}}'
      XMSC_PASSWORD: '{{mq.password}}'
      XMSC_CLIENT_ID: '{{?mq.clientId}}'
      XMSC_WMQ_SSL_CIPHER_SUITE: '*TLS12ORHIGHER'
  - name: kafkaHeaderDeserializer
    type: "#class:kolka.camel.eventhubs.StringKafkaHeaderDeserializer"
  - name: defaultAzureCredential
    type: "#class:kolka.camel.eventhubs.SimpleCredential"

- route:
    id: "kafka-to-mq"
    from:
      uri: "kafka:{{kafka.kafka-to-mq-topic}}"
      parameters:
        autoOffsetReset: earliest
        brokers: "{{kafka.bootstrapServers}}"
        saslJaasConfig: 'org.apache.kafka.common.security.plain.PlainLoginModule required username={{kafka.username}} password={{kafka.password}};'
        saslMechanism: PLAIN
        securityProtocol: SASL_SSL
        headerDeserializer: '#kafkaHeaderDeserializer'
        valueDeserializer: 'com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer'
        groupId: 'kolka-consumer-group'
        additionalProperties.schema.registry.url: '{{kafka.schema-registry-url}}'
        additionalProperties.schema.group: avro
        additionalProperties.schema.registry.credential: '#bean:defaultAzureCredential'
        additionalProperties.specific.avro.value.type: 'kolka.camel.eventhubs.models.Order'
        additionalProperties.specific.avro.reader: '#valueAs(boolean):true'
      steps:
        - set-header:
            name: "JMSCorrelationID"
            header:
              expression: "kafka.KEY"
        - remove-header:
            name: "kafka.KEY"
        - remove-header:
            name: "kafka.HEADERS"
        - to:
            uri: "jms:queue:{{mq.destinationName}}"
            parameters:
              connectionFactory: "{{bean:wmqConnectionFactory}}"
              allowAdditionalHeaders: "*"
              allowSerializedHeaders: true

I would like to try creating the example and contribute it, if that's alright.
For the example, I would leave only Kafka and Schema Registry related things. It would also include the implementation of SimpleCredential, as it is only a thin wrapper around Azure's DefaultAzureCredential.

@oscerd
Copy link
Contributor

oscerd commented Sep 20, 2023

It would be super. Thanks a lot. It would be great also to have some terraform or infra instructions for creating the azure resources. No rush. Thanks a lot for your time.

@oscerd oscerd assigned vedran-kolka-syntio and unassigned oscerd Sep 21, 2023
@oscerd
Copy link
Contributor

oscerd commented Sep 21, 2023

I assigned this to you @vedran-kolka-syntio

@vedran-kolka-syntio
Copy link
Contributor Author

Using camel version 4.0.0, I am having the same problem for which I opened the issue, so I reverted back to using camel-kafka-3.22.0 (this one exactly).
Is it possible that the fix isn't available in 4.0.0?

@oscerd
Copy link
Contributor

oscerd commented Sep 24, 2023

Can you report the error exactly? Thanks

@oscerd
Copy link
Contributor

oscerd commented Sep 24, 2023

I don't think we didn't backport the fix

@vedran-kolka-syntio
Copy link
Contributor Author

This is my current configuration:

# camel-k: dependency=mvn:org.apache.camel.kamelets:camel-kamelets-utils:3.20.1.1
# camel-k: dependency=mvn:org.apache.camel.kamelets:azure-identity:0.1
# camel-k: dependency=mvn:com.microsoft.azure:azure-schemaregistry-kafka-avro:1.1.1
# camel-k: dependency=mvn:com.azure:azure-data-schemaregistry-apacheavro:1.1.4
# camel-k: dependency=mvn:com.azure:azure-identity:1.9.0
# camle-k: dependency=mvn:org.apache.camel:camel-kafka:3.20.6

- beans:
  - name: defaultAzureCredential
    type: "#class:org.apache.camel.kamelets.azure.DefaultAzureCredentialWrapper"

- route:
    id: "kafka-to-log"
    from:
      uri: "kafka:{{kafka.my-topic}}"
      parameters:
        autoOffsetReset: earliest
        brokers: "{{kafka.bootstrapServers}}"
        saslJaasConfig: 'org.apache.kafka.common.security.plain.PlainLoginModule required username={{kafka.username}} password={{kafka.password}};'
        saslMechanism: PLAIN
        securityProtocol: SASL_SSL
        valueDeserializer: 'com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer'
        groupId: 'my-consumer-group'
        additionalProperties.schema.registry.url: '{{kafka.schema-registry-url}}'
        additionalProperties.schema.group: avro
        additionalProperties.schema.registry.credential: '#bean:defaultAzureCredential'
        additionalProperties.specific.avro.value.type: '#valueAs(java.lang.Class):org.apache.camel.kamelets.eventhubs.models.Order'
        additionalProperties.specific.avro.reader: '#valueAs(boolean):true'
      steps:
        - to:
            uri: "kamelet:log-sink"
            parameters:
              showStreams: true
              showHeaders: true
              multiline: true

And this is the error:

2023-09-24 17:49:53.389  WARN 18716 --- [sumer[my-topic]] l.component.kafka.KafkaFetchRecords : Error creating org.apache.kafka.clients.consumer.KafkaConsumer due to: Failed to construct kafka consumer
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:825) ~[kafka-clients-3.2.3.jar:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666) ~[kafka-clients-3.2.3.jar:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:647) ~[kafka-clients-3.2.3.jar:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:627) ~[kafka-clients-3.2.3.jar:?]
        at org.apache.camel.component.kafka.DefaultKafkaClientFactory.getConsumer(DefaultKafkaClientFactory.java:34) ~[camel-kafka-3.20.1.jar:3.20.1]
        at org.apache.camel.component.kafka.KafkaFetchRecords.createConsumer(KafkaFetchRecords.java:245) ~[camel-kafka-3.20.1.jar:3.20.1]
        at org.apache.camel.component.kafka.KafkaFetchRecords.createConsumerTask(KafkaFetchRecords.java:205) ~[camel-kafka-3.20.1.jar:3.20.1]
        at org.apache.camel.support.task.ForegroundTask.run(ForegroundTask.java:94) [camel-support-4.0.0.jar:4.0.0]
        at org.apache.camel.component.kafka.KafkaFetchRecords.run(KafkaFetchRecords.java:127) [camel-kafka-3.20.1.jar:3.20.1]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
        at java.lang.Thread.run(Thread.java:833) [?:?]
Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class com.azure.core.credential.TokenCredential (java.lang.String is in module java.base of loader 'bootstrap'; com.azure.core.credential.TokenCredential is in unnamed module of loader org.apache.camel.main.download.DependencyDownloaderClassLoader @3e8f7922)
        at com.microsoft.azure.schemaregistry.kafka.avro.AbstractKafkaSerdeConfig.getCredential(AbstractKafkaSerdeConfig.java:66) ~[azure-schemaregistry-kafka-avro-1.1.1.jar:?]
        at com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializerConfig.getCredential(KafkaAvroDeserializerConfig.java:11) ~[azure-schemaregistry-kafka-avro-1.1.1.jar:?]
        at com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer.configure(KafkaAvroDeserializer.java:55) ~[azure-schemaregistry-kafka-avro-1.1.1.jar:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:718) ~[kafka-clients-3.2.3.jar:?]
        ... 13 more

The error looks exactly the same as in the original error of this issue, and using camel-kafka-3.22.0 I was able to resolve it, like so:

camel run kafka-log.yaml camel-kafka-3.22.0.jar --properties application.properties

@oscerd
Copy link
Contributor

oscerd commented Sep 24, 2023 via email

@vedran-kolka-syntio
Copy link
Contributor Author

I don't have instructions for Azure resources yet, but I'll open the PR in a few minutes with everything else.

@oscerd
Copy link
Contributor

oscerd commented Sep 24, 2023

@davsclaus thoughts?

@davsclaus
Copy link
Contributor

Is # camel-k: dependency=mvn:org.apache.camel.kamelets:azure-identity:0.1 a JAR you have made yourself internally?
Its not a good idea to use org.apache.camel as that is for the official ASF project.

@davsclaus
Copy link
Contributor

Is org.apache.camel.kamelets.eventhubs.models.Order a class from your own JAR ?

@davsclaus
Copy link
Contributor

And for

additionalProperties.specific.avro.value.type: '#valueAs(java.lang.Class):org.apache.camel.kamelets.eventhubs.models.Order'

Can you try with

additionalProperties.specific.avro.value.type: '#type:org.apache.camel.kamelets.eventhubs.models.Order'

@vedran-kolka-syntio
Copy link
Contributor Author

Yes, I have made the JAR for the example, not for the company internally. It also contains Order. I will rename it.

Running what you suggested with camel-kafka-4.0.0 gives the following error upon KafkaConsumer construction:

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:825) ~[kafka-clients-3.2.3.jar:?]
        ...
Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class com.azure.core.credential.TokenCredential (java.lang.String is in module java.base of loader 'bootstrap'; com.azure.core.credential.TokenCredential is in unnamed module of loader org.apache.camel.main.download.DependencyDownloaderClassLoader @3e8f7922)

Using camel-kafka-3.22.0 it fails earlier:

Caused by: org.apache.camel.ResolveEndpointFailedException: Failed to resolve endpoint: kafka://my-topic?additionalProperties.schema.group=avro&additionalProperties.schema.registry.credential=%23bean%3AdefaultAzureCredential&additionalProperties.schema.registry.url=https%3A%2F%2Flabscamel-eventhubs-dev.servicebus.windows.net&additionalProperties.specific.avro.reader=%23valueAs%28boolean%29%3Atrue&additionalProperties.specific.avro.value.type=%23type%3Acom.acme.example.eventhubs.models.Order&autoOffsetReset=earliest&brokers=labscamel-eventhubs-dev.servicebus.windows.net%3A9093&groupId=my-consumer-group&saslJaasConfig=xxxxxx&saslMechanism=PLAIN&securityProtocol=SASL_SSL&valueDeserializer=com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroDeserializer due to: Error binding property (specific.avro.value.type=#type:com.acme.example.eventhubs.models.Order) with name: specific.avro.value.type on bean: {schema.registry.url=https://labscamel-eventhubs-dev.servicebus.windows.net, schema.group=avro, schema.registry.credential=com.acme.example.azure.DefaultAzureCredentialWrapper@322204dc, specific.avro.reader=true} with value: #type:com.acme.example.eventhubs.models.Order
        at org.apache.camel.impl.engine.AbstractCamelContext.doGetEndpoint(AbstractCamelContext.java:856)
        at org.apache.camel.impl.engine.AbstractCamelContext.getEndpoint(AbstractCamelContext.java:758)
        at org.apache.camel.support.CamelContextHelper.getMandatoryEndpoint(CamelContextHelper.java:60)
        at org.apache.camel.reifier.AbstractReifier.resolveEndpoint(AbstractReifier.java:195)
        at org.apache.camel.reifier.RouteReifier.doCreateRoute(RouteReifier.java:96)
        at org.apache.camel.reifier.RouteReifier.createRoute(RouteReifier.java:76)
        ... 26 more
Caused by: org.apache.camel.PropertyBindingException: Error binding property (specific.avro.value.type=#type:com.acme.example.eventhubs.models.Order) with name: specific.avro.value.type on bean: {schema.registry.url=https://labscamel-eventhubs-dev.servicebus.windows.net, schema.group=avro, schema.registry.credential=com.acme.example.azure.DefaultAzureCredentialWrapper@322204dc, specific.avro.reader=true} with value: #type:com.acme.example.eventhubs.models.Order
        at org.apache.camel.support.PropertyBindingSupport.doSetPropertyValue(PropertyBindingSupport.java:548)
        at org.apache.camel.support.PropertyBindingSupport.doBuildPropertyOgnlPath(PropertyBindingSupport.java:420)
        at org.apache.camel.support.PropertyBindingSupport.doBindProperties(PropertyBindingSupport.java:297)
        at org.apache.camel.support.PropertyBindingSupport$Builder.bind(PropertyBindingSupport.java:1906)
        at org.apache.camel.support.PropertyBindingSupport.bindProperties(PropertyBindingSupport.java:132)
        at org.apache.camel.component.kafka.KafkaComponent.createEndpoint(KafkaComponent.java:87)
        at org.apache.camel.component.kafka.KafkaComponent.createEndpoint(KafkaComponent.java:34)
        at org.apache.camel.support.DefaultComponent.createEndpoint(DefaultComponent.java:170)
        at org.apache.camel.impl.engine.AbstractCamelContext.doGetEndpoint(AbstractCamelContext.java:822)
        ... 31 more
Caused by: java.lang.IllegalStateException: Cannot select single type: com.acme.example.eventhubs.models.Order as there are no beans in the registry with this type
        at org.apache.camel.support.PropertyBindingSupport.resolveBean(PropertyBindingSupport.java:1597)
        at org.apache.camel.support.PropertyBindingSupport.resolveValue(PropertyBindingSupport.java:902)
        at org.apache.camel.support.PropertyBindingSupport.doSetPropertyValue(PropertyBindingSupport.java:538)
        ... 39 more

After defining an Order as a bean:

- beans:
  - name: order
    type: "#class:com.acme.example.eventhubs.models.Order"

The following error occurs, upon deserializing the message (still using camel-kafka-3.22.0):

org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition my-topic-0 at offset 11. If needed, please seek past the record to continue consumption.
        at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1435) ~[kafka-clients-3.2.3.jar:?]
        ...
Caused by: java.lang.ClassCastException: class com.acme.example.eventhubs.models.Order cannot be cast to class java.lang.Class (com.acme.example.eventhubs.models.Order is in unnamed module of loader org.apache.camel.main.download.DependencyDownloaderClassLoader @3e8f7922; java.lang.Class is in module java.base of loader 'bootstrap')

Could you provide a link to any documentation describing the operations invoked with # (#valueAs, #bean, #type, #class...) ?
I am having trouble using these and cannot seem to find the docs.

@davsclaus
Copy link
Contributor

https://camel.apache.org/manual/property-binding.html#_property_binding_basics

@davsclaus
Copy link
Contributor

The old example works fine with 3.20.6 and 4.0.0. However I think you have since added more configuration, in particular

additionalProperties.specific.avro.value.type: '#valueAs(java.lang.Class):org.apache.camel.kamelets.eventhubs.models.Order'

I have not had time to dive more into this, but it would be good to have a reproducer, and also I wonder if its the karaf client / avro that does the classloading that it cannot load this class.

It may be of the nature of dynamic classloading via jbang is complex. I had another use-case today
https://issues.apache.org/jira/browse/CAMEL-19924

In the mean time, you can try to export this to a standard spring-boot / quarkus or camel-main project and run it with pure maven / java.

camel export * --runtime=spring-boot --directory=code

But still it would be great to get this issue resolved and work OOTB with camel-jbang. However a bit busy this week.

@vedran-kolka-syntio
Copy link
Contributor Author

You are right, running with Spring Boot works with 4.0.0, so I've added that as a note in the example's readme.

@oscerd
Copy link
Contributor

oscerd commented Oct 2, 2023

Trying to follow the README and to run produce.java I'm getting

[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:3.1.0:java (default-cli) on project azure-identity: An exception occurred while executing the Java class. Status code 401, "{"error":{"code":"ErrorInRequest","message":"Authorization failed for specified action: SchemaGroupWrite,SchemaWrite. TrackingId:1a558a26-cdc0-4bca-9e7c-065076e7a3e0_G5, SystemTracker:pippo-test123-namespace.servicebus.windows.net:$schemaGroups\/avro\/schemas\/com.acme.example.eventhubs.models.Order, Timestamp:2023-10-02T10:03:43"}}

The application.properties seems to be correct and I used the terraform main.tf

Are you sure we don't need some more manual steps in Azure?

The command is:

mvn compile exec:java -Dexec.mainClass="com.acme.example.eventhubs.Produce"

in Azure Identity folder.

@vedran-kolka-syntio
Copy link
Contributor Author

Did You add anything to the pom.xml to run it that way? The command doesn't work for me.
I ran it in IntelliJ, and seems to work for me, but I suspect it's because I have already authenticated to Azure using Azure CLI.
Could you try running az login before running Produce.java ?

@oscerd
Copy link
Contributor

oscerd commented Oct 10, 2023 via email

@oscerd
Copy link
Contributor

oscerd commented Oct 10, 2023

I just modify the template and renamed as application.properties by populating the field before running that command.

@oscerd
Copy link
Contributor

oscerd commented Oct 11, 2023

It worked, in the terraform configuration is missing the Schema Registry Contributor role assignment.

@vedran-kolka-syntio
Copy link
Contributor Author

Great! Could You add it to the terraform script?
Or perhaps just mention the role as a prerequisite? Because assigning roles through terraform also requires additional roles.

@oscerd
Copy link
Contributor

oscerd commented Oct 11, 2023

Done. Added a note about this, the role assignment could be done just one time. So if we force it in terraform configuration it will start to fail after the first apply.

@oscerd
Copy link
Contributor

oscerd commented Oct 11, 2023

It should now work with Camel 4.x too through camel-jbang. If you find time, give it a look.

@vedran-kolka-syntio
Copy link
Contributor Author

I'll try it out soon. Thanks a lot for the help!

@davsclaus
Copy link
Contributor

Do we have the example now and if so we can close this ticket.

Feel free to open new tickets for other examples and ideas for things to improve etc.

@oscerd oscerd closed this as completed Oct 18, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants