diff --git a/pykafka/cluster.py b/pykafka/cluster.py index d9ced1971..8271a629f 100644 --- a/pykafka/cluster.py +++ b/pykafka/cluster.py @@ -26,6 +26,8 @@ from .broker import Broker from .exceptions import (ERROR_CODES, ConsumerCoordinatorNotAvailable, + KafkaException, + SocketDisconnectedError, LeaderNotAvailable) from .protocol import ConsumerMetadataRequest, ConsumerMetadataResponse from .topic import Topic @@ -285,6 +287,10 @@ def get_offset_manager(self, consumer_group): log.error('Error discovering offset manager.') if i == MAX_RETRIES - 1: raise + except SocketDisconnectedError: + raise KafkaException("Socket disconnected during offset manager " + "discovery. This can happen when using PyKafka " + "with a Kafka version lower than 0.8.2.") else: coordinator = self.brokers.get(res.coordinator_id, None) if coordinator is None: