Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer Hangs while Dispose in K8s #2013

Open
8 tasks
ksdvishnukumar opened this issue Mar 6, 2023 · 5 comments
Open
8 tasks

Consumer Hangs while Dispose in K8s #2013

ksdvishnukumar opened this issue Mar 6, 2023 · 5 comments

Comments

@ksdvishnukumar
Copy link

Description

I have a case where the consumer hangs indefinitely almost 10 Minutes, and I cannot figure out why.
I use Confluent Kafka to connect to Eventhub.

I have 3 instances of application running. Out of that 2 are getting disposed faster. But 1 hangs. This scenario is not happening all the time but 20% (1/5) its happening.

How to reproduce

Its a very simple Consumer application you can use.

Checklist

Please provide the following information:

  • A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • Confluent.Kafka nuget version 1.9.3
  • Apache Kafka version -
  • Client configuration.
  • Operating system.
  • Provide logs (with "debug" : "..." as necessary in configuration).
  • Provide broker log excerpts.
  • Critical issue.
@paul-cheung
Copy link

paul-cheung commented Mar 6, 2023

Some issues we encountered as well in K8S.
I am using Confluent.Kafka 2.0.2 in our project. Few mins after the application started, the following error message showed:
4|1677860967.721|MAXPOLL|rdkafka#consumer-1| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 151ms (adjust max.poll.interval.ms for long-running message processing): leaving group
Not sure if some configuration related to this issue for fixing.

@bartelink
Copy link

bartelink commented Mar 6, 2023

@paul-cheung This message/situation is unrelated to this issue (though to be fair, this issue is of the "stuff is going wrong sometimes in our environment; we have not narrowed it down" variety which is hard for the maintainers to produce). The config value the message cites is the maximum amount of time your code is allowed between polling for a message from the client. In other words, it's saying "you said I was to assume you're dead if you took more than 5 minutes to come back and ask for the next message. This is your notice that you took 151ms longer and we automatically removed you from the healthy clients list in accordance with that". Your choices are to either guarantee to check more frequently by adjusting your code, or extend that poll interval.

@plachor
Copy link

plachor commented Dec 15, 2023

@ksdvishnukumar I do not get which one is true :P

I have a case where the consumer hangs indefinitely almost 10 Minutes

is it indefinitely or 10 minutes ? I spotted an issue where one of my consumer instance was not disposed in my process for days and I'm trying to reproduce it / find similar issue.

In my case I'm calling Close and than Dispose on my consumer instance. It seems this correlated with temporary network partition to broker as I seen logs with connection refused.

@ksdvishnukumar
Copy link
Author

ksdvishnukumar commented Dec 15, 2023

@plachor, I encountered this issue some times close to 10 min and some time even 1 hr (Pipeline Max Timeout). Since am using Azure Pipeline to deploy the pod in AKS, Pipeline has 1hr Max Time out. After that it times out.

I also use the same way as you mentioned

try
{
tokenSource?.Cancel();
await Task.Delay(3000); //Waiting 3 Sec here in order to give the some breathing time to the consume method to stop the consumption. Else we will be getting an Access Violation Exception which cant be caught in the exception block
if (_consumerBuilder != null && (_consumerBuilder?.Subscription?.Any() ?? false))
{
_consumerBuilder?.Close();
_consumerBuilder?.Dispose();
}
}
catch (KafkaException ex)
{
_logger!.Log($"Handled Kafka Exception in {nameof(CloseConsumeBrokerMessageClient)}", LogLevel.Warning);
}
catch (Exception ex)
{
_logger!.Log($"Exception caught in {nameof(CloseConsumeBrokerMessageClient)} and handled. Message={ex.Message}", LogLevel.Warning);
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants