Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot re-process old Kafka messages because of Quarkus overriding the group.id #2901

Closed
nicolaferraro opened this issue Jul 13, 2021 · 6 comments
Assignees
Labels
bug Something isn't working
Milestone

Comments

@nicolaferraro
Copy link
Member

I'm running a simple integration in Camel K:

from('kafka:bitcoin-kafka-topic?brokers=my-cluster-kafka-bootstrap.kafka-demo.svc.cluster.local:9092&groupId=myowngroupid&autoOffsetReset=earliest')
    .to('log:info')

I'm using myowngroupid to reprocess old messages using a different consumer group. But when I run this integration, I notice that the Kafka group Id is always a fixed string camel-k-integration.

The reason seems to be due to this part of the code:

if (quarkusKafkaConfiguration != null) {
for (Map.Entry<String, Object> entry : quarkusKafkaConfiguration.entrySet()) {
camelKafkaProperties.put(entry.getKey(), entry.getValue());
}
}

Which seems correct at first glance, but unfortunately the Quarkus config unexpectedly contains wrong values at runtime:

Screenshot from 2021-07-13 16-09-11

@nicolaferraro nicolaferraro added the bug Something isn't working label Jul 13, 2021
@oscerd
Copy link
Contributor

oscerd commented Jul 13, 2021

I think this behavior should be configurable. If I don't want to merge configurations or I don't care about the quarkus extension workflow, I should be able to avoid it.

@jamesnetherton jamesnetherton added this to the 2.1.0 milestone Jul 13, 2021
@jamesnetherton jamesnetherton self-assigned this Jul 14, 2021
@jamesnetherton
Copy link
Contributor

Sorry about this one, it's my bad. The map put should of course be putIfAbsent. I think we can make the properties merge behaviour configurable based on whether the Quarkus k8s service binding capability is present. And if it is, only then do we set up the custom QuarkusKafkaClientFactory.

I'll also look into tweaking the Quarkus Kafka extension, as I don't see the point of it adding the group.id config, even when no other Kafka configuration has been provided.

@oscerd
Copy link
Contributor

oscerd commented Jul 14, 2021

I don't if there are other extension approaching the configuration in this way, but making it configurable would be really nice. At least on the kamelet side, we will be sure what we are configuring will be effectively used.

@oscerd
Copy link
Contributor

oscerd commented Jul 14, 2021

Thanks for looking into this.

@jamesnetherton
Copy link
Contributor

I created a PR that should fix this #2913. To my knowledge, none of the other Camel projects dependent on CQ is explicitly forcing usage of quarkus-kubernetes-service-binding, so having a check for whether the capability is present and not setting up the custom KafkaClientFactory should be good enough.

I need to do some follow up research for changing the map put to putIfAbsent. Turns out it's a bit tricky to have this because of the defaults that the Camel component sets for various options. However, I introduced a new config option to disable the config merging if required.

jamesnetherton added a commit to jamesnetherton/camel-quarkus that referenced this issue Jul 19, 2021
jamesnetherton added a commit to jamesnetherton/camel-quarkus that referenced this issue Jul 20, 2021
@cescoffier
Copy link
Contributor

For the record, we merged a fix in Quarkus.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants