Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,26 @@ def _get_leader_for_partition(self, topic, partition):
# Otherwise return the BrokerMetadata
return self.brokers[meta.leader]

def _get_coordinator_for_group(self, group):
"""
Returns the coordinator broker for a consumer group.

ConsumerCoordinatorNotAvailableCode will be raised if the coordinator
does not currently exist for the group.

OffsetsLoadInProgressCode is raised if the coordinator is available
but is still loading offsets from the internal topic
"""

resp = self.send_consumer_metadata_request(group)

# If there's a problem with finding the coordinator, raise the
# provided error
kafka.common.check_error(resp)

# Otherwise return the BrokerMetadata
return BrokerMetadata(resp.nodeId, resp.host, resp.port)

def _next_id(self):
"""Generate a new correlation id"""
# modulo to keep w/i int32
Expand Down Expand Up @@ -237,6 +257,96 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
# Return responses in the same order as provided
return [responses[tp] for tp in original_ordering]

def _send_consumer_aware_request(self, group, payloads, encoder_fn, decoder_fn):
"""
Send a list of requests to the consumer coordinator for the group
specified using the supplied encode/decode functions. As the payloads
that use consumer-aware requests do not contain the group (e.g.
OffsetFetchRequest), all payloads must be for a single group.

Arguments:

group: the name of the consumer group (str) the payloads are for
payloads: list of object-like entities with topic (str) and
partition (int) attributes; payloads with duplicate
topic+partition are not supported.

encode_fn: a method to encode the list of payloads to a request body,
must accept client_id, correlation_id, and payloads as
keyword arguments

decode_fn: a method to decode a response body into response objects.
The response objects must be object-like and have topic
and partition attributes

Returns:

List of response objects in the same order as the supplied payloads
"""
# encoders / decoders do not maintain ordering currently
# so we need to keep this so we can rebuild order before returning
original_ordering = [(p.topic, p.partition) for p in payloads]

broker = self._get_coordinator_for_group(group)

# Send the list of request payloads and collect the responses and
# errors
responses = {}
requestId = self._next_id()
log.debug('Request %s to %s: %s', requestId, broker, payloads)
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId, payloads=payloads)

# Send the request, recv the response
try:
conn = self._get_conn(broker.host.decode('utf-8'), broker.port)
conn.send(requestId, request)

except ConnectionError as e:
log.warning('ConnectionError attempting to send request %s '
'to server %s: %s', requestId, broker, e)

for payload in payloads:
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = FailedPayloadsError(payload)

# No exception, try to get response
else:

# decoder_fn=None signal that the server is expected to not
# send a response. This probably only applies to
# ProduceRequest w/ acks = 0
if decoder_fn is None:
log.debug('Request %s does not expect a response '
'(skipping conn.recv)', requestId)
for payload in payloads:
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = None
return []

try:
response = conn.recv(requestId)
except ConnectionError as e:
log.warning('ConnectionError attempting to receive a '
'response to request %s from server %s: %s',
requestId, broker, e)

for payload in payloads:
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = FailedPayloadsError(payload)

else:
_resps = []
for payload_response in decoder_fn(response):
topic_partition = (payload_response.topic,
payload_response.partition)
responses[topic_partition] = payload_response
_resps.append(payload_response)
log.debug('Response %s: %s', requestId, _resps)

# Return responses in the same order as provided
return [responses[tp] for tp in original_ordering]

def __repr__(self):
return '<KafkaClient client_id=%s>' % (self.client_id)

Expand Down Expand Up @@ -429,6 +539,13 @@ def send_metadata_request(self, payloads=[], fail_on_error=True,

return self._send_broker_unaware_request(payloads, encoder, decoder)

def send_consumer_metadata_request(self, payloads=[], fail_on_error=True,
callback=None):
encoder = KafkaProtocol.encode_consumer_metadata_request
decoder = KafkaProtocol.decode_consumer_metadata_response

return self._send_broker_unaware_request(payloads, encoder, decoder)

def send_produce_request(self, payloads=[], acks=1, timeout=1000,
fail_on_error=True, callback=None):
"""
Expand Down Expand Up @@ -529,3 +646,14 @@ def send_offset_fetch_request(self, group, payloads=[],

return [resp if not callback else callback(resp) for resp in resps
if not fail_on_error or not self._raise_on_response_error(resp)]

def send_offset_fetch_request_kafka(self, group, payloads=[],
fail_on_error=True, callback=None):

encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request,
group=group, from_kafka=True)
decoder = KafkaProtocol.decode_offset_fetch_response
resps = self._send_consumer_aware_request(group, payloads, encoder, decoder)

return [resp if not callback else callback(resp) for resp in resps
if not fail_on_error or not self._raise_on_response_error(resp)]
22 changes: 22 additions & 0 deletions kafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@
MetadataResponse = namedtuple("MetadataResponse",
["brokers", "topics"])

# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ConsumerMetadataRequest
ConsumerMetadataRequest = namedtuple("ConsumerMetadataRequest",
["groups"])

ConsumerMetadataResponse = namedtuple("ConsumerMetadataResponse",
["error", "nodeId", "host", "port"])

# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI
ProduceRequest = namedtuple("ProduceRequest",
["topic", "partition", "messages"])
Expand Down Expand Up @@ -160,6 +167,21 @@ class StaleLeaderEpochCodeError(BrokerResponseError):
message = 'STALE_LEADER_EPOCH_CODE'


class OffsetsLoadInProgressCode(BrokerResponseError):
errno = 14
message = 'OFFSETS_LOAD_IN_PROGRESS_CODE'


class ConsumerCoordinatorNotAvailableCode(BrokerResponseError):
errno = 15
message = 'CONSUMER_COORDINATOR_NOT_AVAILABLE_CODE'


class NotCoordinatorForConsumerCode(BrokerResponseError):
errno = 16
message = 'NOT_COORDINATOR_FOR_CONSUMER_CODE'


class KafkaUnavailableError(KafkaError):
pass

Expand Down
53 changes: 47 additions & 6 deletions kafka/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
MetadataResponse, ProduceResponse, FetchResponse,
OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
ProtocolError, BufferUnderflowError, ChecksumError,
ConsumerFetchSizeTooSmall, UnsupportedCodecError
ConsumerFetchSizeTooSmall, UnsupportedCodecError,
ConsumerMetadataResponse
)
from kafka.util import (
crc32, read_short_string, read_int_string, relative_unpack,
Expand Down Expand Up @@ -43,19 +44,21 @@ class KafkaProtocol(object):
METADATA_KEY = 3
OFFSET_COMMIT_KEY = 8
OFFSET_FETCH_KEY = 9
CONSUMER_METADATA_KEY = 10

###################
# Private API #
###################

@classmethod
def _encode_message_header(cls, client_id, correlation_id, request_key):
def _encode_message_header(cls, client_id, correlation_id, request_key,
version=0):
"""
Encode the common request envelope
"""
return struct.pack('>hhih%ds' % len(client_id),
request_key, # ApiKey
0, # ApiVersion
version, # ApiVersion
correlation_id, # CorrelationId
len(client_id), # ClientId size
client_id) # ClientId
Expand Down Expand Up @@ -429,6 +432,38 @@ def decode_metadata_response(cls, data):

return MetadataResponse(brokers, topic_metadata)

@classmethod
def encode_consumer_metadata_request(cls, client_id, correlation_id, payloads):
"""
Encode a ConsumerMetadataRequest

Arguments:
client_id: string
correlation_id: int
payloads: string (consumer group)
"""
message = []
message.append(cls._encode_message_header(client_id, correlation_id,
KafkaProtocol.CONSUMER_METADATA_KEY))
message.append(struct.pack('>h%ds' % len(payloads), len(payloads), payloads))

msg = b''.join(message)
return write_int_string(msg)

@classmethod
def decode_consumer_metadata_response(cls, data):
"""
Decode bytes to a ConsumerMetadataResponse

Arguments:
data: bytes to decode
"""
((correlation_id, error, nodeId), cur) = relative_unpack('>ihi', data, 0)
(host, cur) = read_short_string(data, cur)
((port,), cur) = relative_unpack('>i', data, cur)

return ConsumerMetadataResponse(error, nodeId, host, port)

@classmethod
def encode_offset_commit_request(cls, client_id, correlation_id,
group, payloads):
Expand Down Expand Up @@ -481,21 +516,27 @@ def decode_offset_commit_response(cls, data):

@classmethod
def encode_offset_fetch_request(cls, client_id, correlation_id,
group, payloads):
group, payloads, from_kafka=False):
"""
Encode some OffsetFetchRequest structs
Encode some OffsetFetchRequest structs. The request is encoded using
version 0 if from_kafka is false, indicating a request for Zookeeper
offsets. It is encoded using version 1 otherwise, indicating a request
for Kafka offsets.

Arguments:
client_id: string
correlation_id: int
group: string, the consumer group you are fetching offsets for
payloads: list of OffsetFetchRequest
from_kafka: bool, default False, set True for Kafka-committed offsets
"""
grouped_payloads = group_by_topic_and_partition(payloads)

message = []
reqver = 1 if from_kafka else 0
message.append(cls._encode_message_header(client_id, correlation_id,
KafkaProtocol.OFFSET_FETCH_KEY))
KafkaProtocol.OFFSET_FETCH_KEY,
version=reqver))

message.append(write_short_string(group))
message.append(struct.pack('>i', len(grouped_payloads)))
Expand Down
30 changes: 29 additions & 1 deletion test/test_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
ProduceResponse, FetchResponse, OffsetAndMessage,
BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition,
KafkaUnavailableError, UnsupportedCodecError, ConsumerFetchSizeTooSmall,
ProtocolError
ProtocolError, ConsumerMetadataResponse
)
from kafka.protocol import (
ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol,
Expand Down Expand Up @@ -560,6 +560,34 @@ def test_decode_metadata_response(self):
decoded = KafkaProtocol.decode_metadata_response(encoded)
self.assertEqual(decoded, (node_brokers, topic_partitions))

def test_encode_consumer_metadata_request(self):
expected = b"".join([
struct.pack(">i", 17), # Total length of the request
struct.pack('>h', 10), # API key consumer metadata
struct.pack('>h', 0), # API version
struct.pack('>i', 4), # Correlation ID
struct.pack('>h3s', 3, b"cid"),# The client ID
struct.pack('>h2s', 2, b"g1"), # Group "g1"
])

encoded = KafkaProtocol.encode_consumer_metadata_request(b"cid", 4, b"g1")

self.assertEqual(encoded, expected)

def test_decode_consumer_metadata_response(self):
encoded = b"".join([
struct.pack(">i", 42), # Correlation ID
struct.pack(">h", 0), # No Error
struct.pack(">i", 1), # Broker ID
struct.pack(">h23s", 23, b"brokers1.kafka.rdio.com"), # Broker Host
struct.pack(">i", 1000), # Broker Port
])

results = KafkaProtocol.decode_consumer_metadata_response(encoded)
self.assertEqual(results,
ConsumerMetadataResponse(error = 0, nodeId = 1, host = b'brokers1.kafka.rdio.com', port = 1000)
)

def test_encode_offset_request(self):
expected = b"".join([
struct.pack(">i", 21), # Total length of the request
Expand Down