Skip to content

Commit

Permalink
Merge 77c1818 into 497ded9
Browse files Browse the repository at this point in the history
  • Loading branch information
dpkp committed Aug 14, 2017
2 parents 497ded9 + 77c1818 commit f8d20de
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 31 deletions.
12 changes: 3 additions & 9 deletions kafka/client_async.py
Expand Up @@ -495,7 +495,7 @@ def send(self, node_id, request):

return self._conns[node_id].send(request)

def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True):
def poll(self, timeout_ms=None, future=None, delayed_tasks=True):
"""Try to read and write to sockets.
This method will also attempt to complete node connections, refresh
Expand All @@ -507,9 +507,6 @@ def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True):
timeout will be the minimum of timeout, request timeout and
metadata timeout. Default: request_timeout_ms
future (Future, optional): if provided, blocks until future.is_done
sleep (bool): if True and there is nothing to do (no connections
or requests in flight), will sleep for duration timeout before
returning empty results. Default: False.
Returns:
list: responses received (can be empty)
Expand Down Expand Up @@ -553,7 +550,7 @@ def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True):
self.config['request_timeout_ms'])
timeout = max(0, timeout / 1000.0) # avoid negative timeouts

responses.extend(self._poll(timeout, sleep=sleep))
responses.extend(self._poll(timeout))

# If all we had was a timeout (future is None) - only do one poll
# If we do have a future, we keep looping until it is done
Expand All @@ -562,10 +559,7 @@ def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True):

return responses

def _poll(self, timeout, sleep=True):
# select on reads across all connected sockets, blocking up to timeout
assert self.in_flight_request_count() > 0 or self._connecting or sleep

def _poll(self, timeout):
responses = []
processed = set()

Expand Down
3 changes: 1 addition & 2 deletions kafka/consumer/fetcher.py
Expand Up @@ -275,8 +275,7 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):

if future.exception.invalid_metadata:
refresh_future = self._client.cluster.request_update()
self._client.poll(
future=refresh_future, sleep=True, timeout_ms=remaining_ms)
self._client.poll(future=refresh_future, timeout_ms=remaining_ms)
else:
time.sleep(self.config['retry_backoff_ms'] / 1000.0)

Expand Down
6 changes: 4 additions & 2 deletions kafka/consumer/group.py
Expand Up @@ -613,7 +613,7 @@ def _poll_once(self, timeout_ms, max_records):
# Send any new fetches (won't resend pending fetches)
self._fetcher.send_fetches()

self._client.poll(timeout_ms=timeout_ms, sleep=True)
self._client.poll(timeout_ms=timeout_ms)
records, _ = self._fetcher.fetched_records(max_records)
return records

Expand Down Expand Up @@ -1019,7 +1019,7 @@ def _message_generator(self):
poll_ms = 1000 * (self._consumer_timeout - time.time())
if not self._fetcher.in_flight_fetches():
poll_ms = 0
self._client.poll(timeout_ms=poll_ms, sleep=True)
self._client.poll(timeout_ms=poll_ms)

# We need to make sure we at least keep up with scheduled tasks,
# like heartbeats, auto-commits, and metadata refreshes
Expand All @@ -1045,6 +1045,8 @@ def _message_generator(self):
if time.time() > timeout_at:
log.debug("internal iterator timeout - breaking for poll")
break
if self._client.in_flight_request_count():
self._client.poll(timeout_ms=0)

# An else block on a for loop only executes if there was no break
# so this should only be called on a StopIteration from the fetcher
Expand Down
2 changes: 1 addition & 1 deletion kafka/producer/sender.py
Expand Up @@ -156,7 +156,7 @@ def run_once(self):
# difference between now and its linger expiry time; otherwise the
# select time will be the time difference between now and the
# metadata expiry time
self._client.poll(poll_timeout_ms, sleep=True)
self._client.poll(poll_timeout_ms)

def initiate_close(self):
"""Start closing the sender (won't complete until all data is sent)."""
Expand Down
33 changes: 16 additions & 17 deletions test/test_client_async.py
Expand Up @@ -259,23 +259,22 @@ def test_poll(mocker):
metadata.return_value = 1000
tasks.return_value = 2
cli.poll()
_poll.assert_called_with(1.0, sleep=True)
_poll.assert_called_with(1.0)

# user timeout wins
cli.poll(250)
_poll.assert_called_with(0.25, sleep=True)
_poll.assert_called_with(0.25)

# tasks timeout wins
tasks.return_value = 0
cli.poll(250)
_poll.assert_called_with(0, sleep=True)
_poll.assert_called_with(0)

# default is request_timeout_ms
metadata.return_value = 1000000
tasks.return_value = 10000
cli.poll()
_poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0,
sleep=True)
_poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0)


def test__poll():
Expand Down Expand Up @@ -337,33 +336,33 @@ def client(mocker):
def test_maybe_refresh_metadata_ttl(mocker, client):
client.cluster.ttl.return_value = 1234

client.poll(timeout_ms=12345678, sleep=True)
client._poll.assert_called_with(1.234, sleep=True)
client.poll(timeout_ms=12345678)
client._poll.assert_called_with(1.234)


def test_maybe_refresh_metadata_backoff(mocker, client):
now = time.time()
t = mocker.patch('time.time')
t.return_value = now

client.poll(timeout_ms=12345678, sleep=True)
client._poll.assert_called_with(2.222, sleep=True) # reconnect backoff
client.poll(timeout_ms=12345678)
client._poll.assert_called_with(2.222) # reconnect backoff


def test_maybe_refresh_metadata_in_progress(mocker, client):
client._metadata_refresh_in_progress = True

client.poll(timeout_ms=12345678, sleep=True)
client._poll.assert_called_with(9999.999, sleep=True) # request_timeout_ms
client.poll(timeout_ms=12345678)
client._poll.assert_called_with(9999.999) # request_timeout_ms


def test_maybe_refresh_metadata_update(mocker, client):
mocker.patch.object(client, 'least_loaded_node', return_value='foobar')
mocker.patch.object(client, '_can_send_request', return_value=True)
send = mocker.patch.object(client, 'send')

client.poll(timeout_ms=12345678, sleep=True)
client._poll.assert_called_with(9999.999, sleep=True) # request_timeout_ms
client.poll(timeout_ms=12345678)
client._poll.assert_called_with(9999.999) # request_timeout_ms
assert client._metadata_refresh_in_progress
request = MetadataRequest[0]([])
send.assert_called_once_with('foobar', request)
Expand All @@ -379,16 +378,16 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
t.return_value = now

# first poll attempts connection
client.poll(timeout_ms=12345678, sleep=True)
client._poll.assert_called_with(2.222, sleep=True) # reconnect backoff
client.poll(timeout_ms=12345678)
client._poll.assert_called_with(2.222) # reconnect backoff
client._can_connect.assert_called_once_with('foobar')
client._maybe_connect.assert_called_once_with('foobar')

# poll while connecting should not attempt a new connection
client._connecting.add('foobar')
client._can_connect.reset_mock()
client.poll(timeout_ms=12345678, sleep=True)
client._poll.assert_called_with(9999.999, sleep=True) # connection timeout (request timeout)
client.poll(timeout_ms=12345678)
client._poll.assert_called_with(9999.999) # connection timeout (request timeout)
assert not client._can_connect.called

assert not client._metadata_refresh_in_progress
Expand Down

0 comments on commit f8d20de

Please sign in to comment.