-
Notifications
You must be signed in to change notification settings - Fork 232
when kafka server broken, pykafka will broken but can not catch exception #233
Comments
Thanks for reporting this. I think there are two issues here (and they interact), and there are some proposed solutions on the issue tracker that touch on both issues to some extent. It's probably high time to address them.
Just to tie these all together: #224 is really the same problem that #223 solves, and #227 is, at least in part, also about this. Even #189 could be the same thing (ie if "it hangs" turns out to mean that the worker threads silently died). @kbourgoin @emmett9001 what are your thoughts on this? Perhaps it would be better even to copy the exception handling model that is now in #177 for detecting dead worker threads, re-raising the exceptions on the main thread? |
Yup @yungchin, I'm in favor of using the same approach from the async producer in the consumers, in which worker exceptions are raised to the main thread when a call is made to the public consumer API. We can even get away without implementing retry logic for disconnected sockets for the time being, since users will at least be able to catch the exception and restart the consumer themselves. |
By the way, thanks for putting in the effort to deduplicate so many issues. It's very helpful to see the connections between them. |
I encountered this issue while trying to port our code from kafka-python to pykafka in order to use the In my test environment I only have one Kafka broker. But if I had more than one Kafka broker, will the consumer be able to successfully switch to other available brokers on |
The broad issue of the To directly answer your question @pau: in general pykafka does assume that there's at least one broker available - the current |
Thanks for the response @emmett9001. Looking forward to the new PR 👍 For my input, I like the ideas from your previous comments:
Our current code the uses kafka-python can catch kafka-python exceptions and handle it gracefully. So it is probably best to just offload that decision to the user of the library rather than building complex retry logic on your end. |
In the producer, we retry a few times before erroring. I imagine we'll do On Sunday, September 20, 2015, Paulo Tioseco notifications@github.com
Emmett Butler | Software Engineer |
Thanks. Is there any guidance on how to handle retries until this issue is fixed? |
I think you can get pretty far catching |
The issue is that the main thread can't catch Here's some sample code: import logging
from pykafka import KafkaClient
from pykafka.exceptions import SocketDisconnectedError
logging.basicConfig(level='DEBUG')
client = KafkaClient(hosts='kafka:9092')
topic = client.topics['test-topic']
consumer = None
try:
consumer = topic.get_balanced_consumer(consumer_group='test-group',
zookeeper_connect='zookeeper:2181',
auto_commit_enable=True,
auto_commit_interval_ms=1000)
while True:
message = consumer.consume()
print message.__dict__
except SocketDisconnectedError:
print 'handle error here'
finally:
if consumer:
consumer.stop() |
Closing this issue in favor of #223, where work on smart retry logic in the consumer will be tracked |
Exception in thread Thread-11:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 813, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.7/threading.py", line 766, in run
self.__target(_self.__args, *_self.__kwargs)
File "/usr/lib/python2.7/site-packages/pykafka/simpleconsumer.py", line 277, in fetcher
self.fetch()
File "/usr/lib/python2.7/site-packages/pykafka/simpleconsumer.py", line 576, in fetch
min_bytes=self._fetch_min_bytes
File "/usr/lib/python2.7/site-packages/pykafka/broker.py", line 221, in fetch_messages
return future.get(FetchResponse)
File "/usr/lib/python2.7/site-packages/pykafka/handlers.py", line 55, in get
raise self.error
SocketDisconnectedError
Exception in thread Thread-12:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 813, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.7/threading.py", line 766, in run
self.__target(_self.__args, *_self.__kwargs)
File "/usr/lib/python2.7/site-packages/pykafka/simpleconsumer.py", line 265, in autocommitter
self._auto_commit()
File "/usr/lib/python2.7/site-packages/pykafka/simpleconsumer.py", line 329, in _auto_commit
self.commit_offsets()
File "/usr/lib/python2.7/site-packages/pykafka/simpleconsumer.py", line 349, in commit_offsets
self._consumer_group, 1, 'pykafka', reqs)
File "/usr/lib/python2.7/site-packages/pykafka/broker.py", line 303, in commit_consumer_group_offsets
self.connect_offsets_channel()
File "/usr/lib/python2.7/site-packages/pykafka/broker.py", line 191, in connect_offsets_channel
self._offsets_channel_connection.connect(self._offsets_channel_socket_timeout_ms)
File "/usr/lib/python2.7/site-packages/pykafka/connection.py", line 66, in connect
timeout=timeout / 1000
File "/usr/lib64/python2.7/socket.py", line 575, in create_connection
raise err
error: [Errno 111] Connection refused
The text was updated successfully, but these errors were encountered: