Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #811 from Parsely/bugfix/group_coordinator_discove…
Browse files Browse the repository at this point in the history
…ry_logging

improve logging and retry logic when broker is unreachable
  • Loading branch information
Emmett J. Butler committed May 29, 2018
2 parents 8585540 + 899885a commit 56efe39
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
5 changes: 3 additions & 2 deletions pykafka/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,9 @@ def __init__(self,
try:
self.connect()
except SocketDisconnectedError:
log.warning("Failed to connect newly created broker for {host}:{port}".format(
host=self._host, port=self._port))
log.warning("Failed to connect to broker at {host}:{port}. Check the "
"`listeners` property in server.config."
.format(host=self._host, port=self._port))

def __repr__(self):
return "<{module}.{name} at {id_} (host={host}, port={port}, id={my_id})>".format(
Expand Down
11 changes: 9 additions & 2 deletions pykafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,11 +453,18 @@ def get_group_coordinator(self, consumer_group):
for broker in itervalues(self.brokers):

req = GroupCoordinatorRequest(consumer_group)
future = broker.handler.request(req)
try:
future = broker.handler.request(req)
except AttributeError:
log.error("Broker {} not connected during offset manager discovery"
.format(broker.id))
if i == max_connection_retries - 1:
raise
continue
try:
res = future.get(GroupCoordinatorResponse)
except GroupCoordinatorNotAvailable:
log.error('Error discovering offset manager.')
log.error('Error discovering offset manager - GroupCoordinatorNotAvailable.')
if i == max_connection_retries - 1:
raise
except SocketDisconnectedError:
Expand Down

0 comments on commit 56efe39

Please sign in to comment.