Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #143 from Parsely/feature/cluster_reconfig
Browse files Browse the repository at this point in the history
Handle Cluster Reconfiguration
  • Loading branch information
emmettbutler committed Apr 23, 2015
2 parents 12b003c + 917f994 commit 6a966a2
Show file tree
Hide file tree
Showing 11 changed files with 375 additions and 233 deletions.
12 changes: 12 additions & 0 deletions pykafka/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ class MessageTooLargeError(KafkaException):
class SocketDisconnectedError(KafkaException):
pass

class ProduceFailureError(KafkaException):
pass



##
Expand Down Expand Up @@ -85,6 +88,15 @@ class StaleControllerEpoch(ProtocolClientError):
class OffsetMetadataTooLarge(ProtocolClientError):
ERROR_CODE = 12

class OffsetsLoadInProgress(ProtocolClientError):
ERROR_CODE = 14

class ConsumerCoordinatorNotAvailable(ProtocolClientError):
ERROR_CODE = 15

class NotCoordinatorForConsumer(ProtocolClientError):
ERROR_CODE = 16

ERROR_CODES = dict(
(exc.ERROR_CODE, exc)
for exc in (UnknownError,
Expand Down
67 changes: 46 additions & 21 deletions pykafka/pykafka/broker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import time

from pykafka import base
from .connection import BrokerConnection
Expand All @@ -7,8 +8,10 @@
FetchRequest, FetchResponse, OffsetRequest,
OffsetResponse, MetadataRequest, MetadataResponse,
OffsetCommitRequest, OffsetCommitResponse,
OffsetFetchRequest, OffsetFetchResponse
OffsetFetchRequest, OffsetFetchResponse,
ProduceResponse
)
from pykafka.exceptions import LeaderNotAvailable


logger = logging.getLogger(__name__)
Expand All @@ -31,8 +34,8 @@ def __init__(self,
:param timeout: TODO: Fill in
:type timeout: :class:int
"""
self._connected = False
self._offsets_channel_connected = False
self._connection = None
self._offsets_channel_connection = None
self._id = int(id_)
self._host = host
self._port = port
Expand Down Expand Up @@ -64,12 +67,14 @@ def from_metadata(cls,
@property
def connected(self):
"""Returns True if the connected to the broker."""
return self._connected
return self._connection.connected

@property
def offsets_channel_connected(self):
"""Returns True if the connected to the broker."""
return self._offsets_channel_connected
if self._offsets_channel_connection:
return self._offsets_channel_connection.connected
return False

@property
def id(self):
Expand Down Expand Up @@ -104,19 +109,21 @@ def offsets_channel_handler(self):

def connect(self):
"""Establish a connection to the Broker."""
conn = BrokerConnection(self.host, self.port, self._buffer_size)
conn.connect(self._socket_timeout_ms)
self._req_handler = RequestHandler(self._handler, conn)
self._connection = BrokerConnection(self.host, self.port,
self._buffer_size)
self._connection.connect(self._socket_timeout_ms)
self._req_handler = RequestHandler(self._handler, self._connection)
self._req_handler.start()
self._connected = True

def connect_offsets_channel(self):
"""Establish a connection to the Broker for the offsets channel"""
conn = BrokerConnection(self.host, self.port, self._buffer_size)
conn.connect(self._offsets_channel_socket_timeout_ms)
self._offsets_channel_req_handler = RequestHandler(self._handler, conn)
self._offsets_channel_connection = BrokerConnection(self.host, self.port,
self._buffer_size)
self._offsets_channel_connection.connect(self._offsets_channel_socket_timeout_ms)
self._offsets_channel_req_handler = RequestHandler(
self._handler, self._offsets_channel_connection
)
self._offsets_channel_req_handler.start()
self._offsets_channel_connected = True

def fetch_messages(self,
partition_requests,
Expand Down Expand Up @@ -145,18 +152,36 @@ def produce_messages(self, produce_request):
if produce_request.required_acks == 0:
self._req_handler.request(produce_request, has_response=False)
else:
self._req_handler.request(produce_request).get()
# Any errors will be decoded and raised in the `.get()`
return None
future = self._req_handler.request(produce_request)
return future.get(ProduceResponse)

def request_offset_limits(self, partition_requests):
"""Request offset information for a set of topic/partitions"""
future = self._req_handler.request(OffsetRequest(partition_requests))
return future.get(OffsetResponse)

def request_metadata(self, topics=None):
future = self._req_handler.request(MetadataRequest(topics=topics))
return future.get(MetadataResponse)
max_retries = 3
for i in xrange(max_retries):
if i > 0:
logger.debug("Retrying")
time.sleep(i)

future = self._req_handler.request(MetadataRequest(topics=topics))
response = future.get(MetadataResponse)

errored = False
for name, topic_metadata in response.topics.iteritems():
if topic_metadata.err == LeaderNotAvailable.ERROR_CODE:
logger.warning("Leader not available.")
errored = True
for pid, partition_metadata in topic_metadata.partitions.iteritems():
if partition_metadata.err == LeaderNotAvailable.ERROR_CODE:
logger.warning("Leader not available.")
errored = True

if not errored:
return response

######################
# Commit/Fetch API #
Expand All @@ -180,14 +205,14 @@ def commit_consumer_group_offsets(self,
:param preqs: a sequence of <protocol.PartitionOffsetCommitRequest>
:type preqs: sequence
"""
if not self._offsets_channel_connected:
if not self.offsets_channel_connected:
self.connect_offsets_channel()
# TODO - exponential backoff
req = OffsetCommitRequest(consumer_group,
consumer_group_generation_id,
consumer_id,
partition_requests=preqs)
self._offsets_channel_req_handler.request(req).get(OffsetCommitResponse)
return self._offsets_channel_req_handler.request(req).get(OffsetCommitResponse)

def fetch_consumer_group_offsets(self, consumer_group, preqs):
"""Fetch the offsets stored in Kafka with the Offset Commit/Fetch API
Expand All @@ -200,7 +225,7 @@ def fetch_consumer_group_offsets(self, consumer_group, preqs):
:param preqs: a sequence of <protocol.PartitionOffsetFetchRequest>
:type preqs: sequence
"""
if not self._offsets_channel_connected:
if not self.offsets_channel_connected:
self.connect_offsets_channel()
# TODO - exponential backoff
req = OffsetFetchRequest(consumer_group, partition_requests=preqs)
Expand Down
58 changes: 33 additions & 25 deletions pykafka/pykafka/cluster.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from __future__ import division

import logging
import time
import random

from .broker import Broker
from .topic import Topic
from .protocol import ConsumerMetadataRequest, ConsumerMetadataResponse
from pykafka.exceptions import ConsumerCoordinatorNotAvailable


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -45,23 +48,28 @@ def handler(self):
def _get_metadata(self):
"""Get fresh cluster metadata from a broker"""
# Works either on existing brokers or seed_hosts list
if self.brokers:
brokers = self.brokers.values()
else:
brokers = [b for b in self.brokers.values() if b.connected]
if brokers:
for broker in brokers:
response = broker.request_metadata()
if response is not None:
return response
else: # try seed hosts
brokers = self._seed_hosts.split(',')

for broker in brokers:
try:
if isinstance(broker, basestring):
h, p = broker.split(':')
broker = Broker(-1, h, p, self._handler, self._socket_timeout_ms,
for broker_str in brokers:
try:
h, p = broker_str.split(':')
broker = Broker(-1, h, p, self._handler,
self._socket_timeout_ms,
self._offsets_channel_socket_timeout_ms,
buffer_size=self._socket_receive_buffer_bytes)
return broker.request_metadata()
# TODO: Change to typed exception
except Exception:
logger.exception('Unable to connect to broker %s', broker)
raise
response = broker.request_metadata()
if response is not None:
return response
except:
logger.exception('Unable to connect to broker %s',
broker_str)
# Couldn't connect anywhere. Raise an error.
raise Exception('Unable to connect to a broker to fetch metadata.')

def _update_brokers(self, broker_metadata):
Expand Down Expand Up @@ -134,20 +142,20 @@ def get_offset_manager(self, consumer_group):
"""
# arbitrarily choose a broker, since this request can go to any
broker = self.brokers[random.choice(self.brokers.keys())]
backoff, retries = 2, 0
MAX_RETRIES = 3
while True:

for i in xrange(MAX_RETRIES):
if i > 0:
logger.debug("Retrying")
time.sleep(i)

req = ConsumerMetadataRequest(consumer_group)
future = broker.handler.request(req)
try:
retries += 1
req = ConsumerMetadataRequest(consumer_group)
future = broker.handler.request(req)
res = future.get(ConsumerMetadataResponse)
except Exception:
logger.debug('Error discovering offset manager. Sleeping for {}s'.format(backoff))
if retries < MAX_RETRIES:
time.sleep(backoff)
backoff = backoff ** 2
else:
except ConsumerCoordinatorNotAvailable:
logger.debug('Error discovering offset manager.')
if i == MAX_RETRIES - 1:
raise
else:
coordinator = self.brokers.get(res.coordinator_id, None)
Expand Down
16 changes: 9 additions & 7 deletions pykafka/pykafka/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,17 @@ def reconnect(self):

def request(self, request):
bytes = request.get_bytes()
if not self._socket:
raise SocketDisconnectedError
self._socket.sendall(bytes)

def response(self):
"""Wait for a response from the broker"""
try:
size = self._socket.recv(4)
size = struct.unpack('!i', size)[0]
recvall_into(self._socket, self._buff, size)
return buffer(self._buff[4:4 + size])
except SocketDisconnectedError:
size = self._socket.recv(4)
if len(size) == 0:
# Happens when broker has shut down
self.disconnect()
raise
raise SocketDisconnectedError
size = struct.unpack('!i', size)[0]
recvall_into(self._socket, self._buff, size)
return buffer(self._buff[4:4 + size])
4 changes: 2 additions & 2 deletions pykafka/pykafka/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,10 @@ def update(self, brokers, metadata):
# Check Replicas
if sorted(r.id for r in self.replicas) != sorted(metadata.replicas):
logger.info('Updating replicas list for %s', self)
self.replicas = [brokers[b] for b in metadata.replicas]
self._replicas = [brokers[b] for b in metadata.replicas]
# Check In-Sync-Replicas
if sorted(i.id for i in self.isr) != sorted(metadata.isr):
logger.info('Updating in sync replicas list for %s', self)
self.isr = [brokers[b] for b in metadata.isr]
self._isr = [brokers[b] for b in metadata.isr]
except KeyError:
raise Exception("TODO: Type this exception")

0 comments on commit 6a966a2

Please sign in to comment.