Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect if broker is available (when using Kafka 2.x client) #674

Open
ennru opened this issue Dec 5, 2018 · 14 comments

Comments

Projects
None yet
4 participants
@ennru
Copy link
Member

commented Dec 5, 2018

When using the new Kafka consumer API from Kafka 2.x, we won't use WakeupExceptions anymore to stop the client from blocking the actor thread.
Until now theses exceptions served as an indication that there was now Kafka broker available, or that it disappeared. This won't be detected automatically with the new API use.
For use cases where we actually want to fail the consuming stream when there is no Kafka broker, a regular call to listTopics(Duration) would be able to check the connection.
(As suggested on StackOverflow)

@mriehl

This comment has been minimized.

Copy link

commented Mar 13, 2019

Hi,

We noticed that when a broker is assigned a new IP (due to machine failure for example), the source never fails anymore. This is an issue because the kafka consumer library will cache the broker InetSocketAddresses on startup, and never update them again.
We have a retry backoff supervision around the consumer source that will re-create it if the stream blows up. This used to work because the source would blow up, and then our supervision would recreate it which causes the kafka consumer to resolve the broker IPs anew.

The new API is pretty terrible, because now the source never fails. And since it doesn't fail, we don't recreate it and the kafka consumer keeps its resolved broker InetSocketAddress forever.

At this point I think our options are to ensure the brokers have stable IPs even when they restart (not so trivial on docker networks) or bake the list topics "health check" in our supervision strategy somehow.

@ennru

This comment has been minimized.

Copy link
Member Author

commented Mar 13, 2019

Did you upgrade to 1.0.1?

@mriehl

This comment has been minimized.

Copy link

commented Mar 13, 2019

Damn, I was pretty sure I was using the latest from maven central but apparently still on 1.0. I'll upgrade and see what happens, but it really looks like they solved it. Thanks!

@mriehl

This comment has been minimized.

Copy link

commented Mar 13, 2019

Can confirm using 1.0.1 solves the issue

@ennru

This comment has been minimized.

Copy link
Member Author

commented Mar 13, 2019

I'm glad to hear that. Thanks for getting in touch.

@gabrielgiussi

This comment has been minimized.

Copy link

commented Apr 26, 2019

@ennru could I give it a try?

@ennru

This comment has been minimized.

Copy link
Member Author

commented Apr 26, 2019

Sure, that would be great.

@tayvs

This comment has been minimized.

Copy link

commented Jun 13, 2019

@gabrielgiussi @ennru Is there any updates?

@ennru

This comment has been minimized.

Copy link
Member Author

commented Jun 13, 2019

This could possibly be implemented together with #765

@tayvs

This comment has been minimized.

Copy link

commented Jun 13, 2019

As were mentioned, listTopics can be used to check connectivity.
What if there was a scheduler that periodically checks listTopics and in case of failure retries exponentially configurable number of attempts. After attempts exceeded, KafkaConsumerActor stops and processErrors signals all stages, thus stream fails.

@ennru

This comment has been minimized.

Copy link
Member Author

commented Jun 13, 2019

Sounds viable. But that should be implemented in another actor so that it can be amended with another dispatcher and easily replaced with other strategies.

How would it work for producers?

@tayvs

This comment has been minimized.

Copy link

commented Jun 13, 2019

@ennru
Regarding producers.
Correct me if I'm wrong, producer fails the stream with TimeoutException after an attempt of issuing message when there is no alive connection to broker. That is, there is already kind of liveness check in producers, "deferred" though.

@ennru

This comment has been minimized.

Copy link
Member Author

commented Jun 13, 2019

Ok, that is probably enough.

@tayvs

This comment has been minimized.

Copy link

commented Jun 13, 2019

Thanks, then I'll try to contribute.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.