Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 30 additions & 18 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import copy
import functools
import logging
import select
import time

import kafka.common
Expand Down Expand Up @@ -177,6 +178,10 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
# For each broker, send the list of request payloads
# and collect the responses and errors
broker_failures = []

# For each KafkaConnection keep the real socket so that we can use
# a select to perform unblocking I/O
connections_by_socket = {}
for broker, payloads in payloads_by_broker.items():
requestId = self._next_id()
log.debug('Request %s to %s: %s', requestId, broker, payloads)
Expand Down Expand Up @@ -210,27 +215,34 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = None
continue
else:
connections_by_socket[conn.get_connected_socket()] = (conn, broker)

try:
response = conn.recv(requestId)
except ConnectionError as e:
broker_failures.append(broker)
log.warning('ConnectionError attempting to receive a '
'response to request %s from server %s: %s',
requestId, broker, e)
conn = None
while connections_by_socket:
sockets = connections_by_socket.keys()
rlist, _, _ = select.select(sockets, [], [], None)
conn, broker = connections_by_socket.pop(rlist[0])
try:
response = conn.recv(requestId)
except ConnectionError as e:
broker_failures.append(broker)
log.warning('ConnectionError attempting to receive a '
'response to request %s from server %s: %s',
requestId, broker, e)

for payload in payloads:
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = FailedPayloadsError(payload)
for payload in payloads_by_broker[broker]:
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = FailedPayloadsError(payload)

else:
_resps = []
for payload_response in decoder_fn(response):
topic_partition = (payload_response.topic,
payload_response.partition)
responses[topic_partition] = payload_response
_resps.append(payload_response)
log.debug('Response %s: %s', requestId, _resps)
else:
_resps = []
for payload_response in decoder_fn(response):
topic_partition = (payload_response.topic,
payload_response.partition)
responses[topic_partition] = payload_response
_resps.append(payload_response)
log.debug('Response %s: %s', requestId, _resps)

# Connection errors generally mean stale metadata
# although sometimes it means incorrect api request
Expand Down
5 changes: 5 additions & 0 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ def _read_bytes(self, num_bytes):

# TODO multiplex socket communication to allow for multi-threaded clients

def get_connected_socket(self):
if not self._sock:
self.reinit()
return self._sock

def send(self, request_id, payload):
"""
Send a request to Kafka
Expand Down
17 changes: 17 additions & 0 deletions test/test_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,23 @@ def test_recv__doesnt_consume_extra_data_in_stream(self):
self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload'])
self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload2'])

def test_get_connected_socket(self):
s = self.conn.get_connected_socket()

self.assertEqual(s, self.MockCreateConn())

def test_get_connected_socket_on_dirty_conn(self):
# Dirty the connection
try:
self.conn._raise_connection_error()
except ConnectionError:
pass

# Test that get_connected_socket tries to connect
self.assertEqual(self.MockCreateConn.call_count, 0)
self.conn.get_connected_socket()
self.assertEqual(self.MockCreateConn.call_count, 1)

def test_close__object_is_reusable(self):

# test that sending to a closed connection
Expand Down