Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor client request/response handling to support better retries #366

Merged
merged 3 commits into from
Apr 12, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
156 changes: 77 additions & 79 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,76 +150,100 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
List of response objects in the same order as the supplied payloads
"""

log.debug("Sending Payloads: %s" % payloads)

# Group the requests by topic+partition
original_keys = []
brokers_for_payloads = []
payloads_by_broker = collections.defaultdict(list)

for payload in payloads:
leader = self._get_leader_for_partition(payload.topic,
payload.partition)

payloads_by_broker[leader].append(payload)
original_keys.append((payload.topic, payload.partition))

# Accumulate the responses in a dictionary
acc = {}

# keep a list of payloads that were failed to be sent to brokers
failed_payloads = []
brokers_for_payloads.append(leader)

# For each broker, send the list of request payloads
# and collect the responses and errors
responses_by_broker = collections.defaultdict(list)
broker_failures = []
for broker, payloads in payloads_by_broker.items():
conn = self._get_conn(broker.host.decode('utf-8'), broker.port)
requestId = self._next_id()
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId, payloads=payloads)

failed = False
# Send the request, recv the response
try:
conn.send(requestId, request)

except ConnectionError as e:
broker_failures.append(broker)
log.warning("Could not send request [%s] to server %s: %s",
binascii.b2a_hex(request), conn, e)

for payload in payloads:
responses_by_broker[broker].append(FailedPayloadsError(payload))

# No exception, try to get response
else:

# decoder_fn=None signal that the server is expected to not
# send a response. This probably only applies to
# ProduceRequest w/ acks = 0
if decoder_fn is None:
for payload in payloads:
responses_by_broker[broker].append(None)
continue

try:
response = conn.recv(requestId)
except ConnectionError as e:
broker_failures.append(broker)
log.warning("Could not receive response to request [%s] "
"from server %s: %s", binascii.b2a_hex(request), conn, e)
failed = True
except ConnectionError as e:
log.warning("Could not send request [%s] to server %s: %s",
binascii.b2a_hex(request), conn, e)
failed = True
"from server %s: %s",
binascii.b2a_hex(request), conn, e)

if failed:
failed_payloads += payloads
self.reset_all_metadata()
continue
for payload in payloads:
responses_by_broker[broker].append(FailedPayloadsError(payload))

for response in decoder_fn(response):
acc[(response.topic, response.partition)] = response
else:

for payload_response in decoder_fn(response):
responses_by_broker[broker].append(payload_response)

if failed_payloads:
raise FailedPayloadsError(failed_payloads)
# Connection errors generally mean stale metadata
# although sometimes it means incorrect api request
# Unfortunately there is no good way to tell the difference
# so we'll just reset metadata on all errors to be safe
if broker_failures:
self.reset_all_metadata()

# Order the accumulated responses by the original key order
return (acc[k] for k in original_keys) if acc else ()
# Return responses in the same order as provided
responses_by_payload = [responses_by_broker[broker].pop(0)
for broker in brokers_for_payloads]
log.debug('Responses: %s' % responses_by_payload)
return responses_by_payload

def __repr__(self):
return '<KafkaClient client_id=%s>' % (self.client_id)

def _raise_on_response_error(self, resp):

# Response can be an unraised exception object (FailedPayloadsError)
if isinstance(resp, Exception):
raise resp

# Or a server api error response
try:
kafka.common.check_error(resp)
except (UnknownTopicOrPartitionError, NotLeaderForPartitionError):
self.reset_topic_metadata(resp.topic)
raise

# Return False if no error to enable list comprehensions
return False

#################
# Public API #
#################
Expand Down Expand Up @@ -396,14 +420,25 @@ def send_produce_request(self, payloads=[], acks=1, timeout=1000,
same order as the list of payloads specified

Arguments:
payloads: list of ProduceRequest
fail_on_error: boolean, should we raise an Exception if we
encounter an API error?
callback: function, instead of returning the ProduceResponse,
first pass it through this function
payloads (list of ProduceRequest): produce requests to send to kafka
acks (int, optional): how many acks the servers should receive from replica
brokers before responding to the request. If it is 0, the server
will not send any response. If it is 1, the server will wait
until the data is written to the local log before sending a
response. If it is -1, the server will wait until the message
is committed by all in-sync replicas before sending a response.
For any value > 1, the server will wait for this number of acks to
occur (but the server will never wait for more acknowledgements than
there are in-sync replicas). defaults to 1.
timeout (int, optional): maximum time in milliseconds the server can
await the receipt of the number of acks, defaults to 1000.
fail_on_error (bool, optional): raise exceptions on connection and
server response errors, defaults to True.
callback (function, optional): instead of returning the ProduceResponse,
first pass it through this function, defaults to None.

Returns:
list of ProduceResponse or callback(ProduceResponse), in the
list of ProduceResponses, or callback results if supplied, in the
order of input payloads
"""

Expand All @@ -419,16 +454,9 @@ def send_produce_request(self, payloads=[], acks=1, timeout=1000,

resps = self._send_broker_aware_request(payloads, encoder, decoder)

out = []
for resp in resps:
if fail_on_error is True:
self._raise_on_response_error(resp)

if callback is not None:
out.append(callback(resp))
else:
out.append(resp)
return out
return [resp if not callback else callback(resp) for resp in resps
if resp is not None and
(not fail_on_error or not self._raise_on_response_error(resp))]

def send_fetch_request(self, payloads=[], fail_on_error=True,
callback=None, max_wait_time=100, min_bytes=4096):
Expand All @@ -447,16 +475,8 @@ def send_fetch_request(self, payloads=[], fail_on_error=True,
payloads, encoder,
KafkaProtocol.decode_fetch_response)

out = []
for resp in resps:
if fail_on_error is True:
self._raise_on_response_error(resp)

if callback is not None:
out.append(callback(resp))
else:
out.append(resp)
return out
return [resp if not callback else callback(resp) for resp in resps
if not fail_on_error or not self._raise_on_response_error(resp)]

def send_offset_request(self, payloads=[], fail_on_error=True,
callback=None):
Expand All @@ -465,15 +485,8 @@ def send_offset_request(self, payloads=[], fail_on_error=True,
KafkaProtocol.encode_offset_request,
KafkaProtocol.decode_offset_response)

out = []
for resp in resps:
if fail_on_error is True:
self._raise_on_response_error(resp)
if callback is not None:
out.append(callback(resp))
else:
out.append(resp)
return out
return [resp if not callback else callback(resp) for resp in resps
if not fail_on_error or not self._raise_on_response_error(resp)]

def send_offset_commit_request(self, group, payloads=[],
fail_on_error=True, callback=None):
Expand All @@ -482,16 +495,8 @@ def send_offset_commit_request(self, group, payloads=[],
decoder = KafkaProtocol.decode_offset_commit_response
resps = self._send_broker_aware_request(payloads, encoder, decoder)

out = []
for resp in resps:
if fail_on_error is True:
self._raise_on_response_error(resp)

if callback is not None:
out.append(callback(resp))
else:
out.append(resp)
return out
return [resp if not callback else callback(resp) for resp in resps
if not fail_on_error or not self._raise_on_response_error(resp)]

def send_offset_fetch_request(self, group, payloads=[],
fail_on_error=True, callback=None):
Expand All @@ -501,12 +506,5 @@ def send_offset_fetch_request(self, group, payloads=[],
decoder = KafkaProtocol.decode_offset_fetch_response
resps = self._send_broker_aware_request(payloads, encoder, decoder)

out = []
for resp in resps:
if fail_on_error is True:
self._raise_on_response_error(resp)
if callback is not None:
out.append(callback(resp))
else:
out.append(resp)
return out
return [resp if not callback else callback(resp) for resp in resps
if not fail_on_error or not self._raise_on_response_error(resp)]