Skip to content

Commit

Permalink
Add support for connections_max_idle_ms config.
Browse files Browse the repository at this point in the history
  • Loading branch information
tvoinarovskyi committed Feb 17, 2017
1 parent 70600d4 commit 8693bde
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 7 deletions.
10 changes: 8 additions & 2 deletions aiokafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ class AIOKafkaClient:
ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
socket connections. For more information see :ref:`ssl_auth`.
Default: None.
connections_max_idle_ms (int): Close idle connections after the number
of milliseconds specified by this config. Specifying `None` will
disable idle checks. Default: 540000 (9hours).
"""

def __init__(self, *, loop, bootstrap_servers='localhost',
Expand All @@ -64,7 +67,8 @@ def __init__(self, *, loop, bootstrap_servers='localhost',
retry_backoff_ms=100,
ssl_context=None,
security_protocol='PLAINTEXT',
api_version='auto'):
api_version='auto',
connections_max_idle_ms=540000):
if security_protocol not in ('SSL', 'PLAINTEXT'):
raise ValueError("`security_protocol` should be SSL or PLAINTEXT")
if security_protocol == "SSL" and ssl_context is None:
Expand All @@ -78,6 +82,7 @@ def __init__(self, *, loop, bootstrap_servers='localhost',
self._security_protocol = security_protocol
self._ssl_context = ssl_context
self._retry_backoff = retry_backoff_ms / 1000
self._connections_max_idle_ms = connections_max_idle_ms

self.cluster = ClusterMetadata(metadata_max_age_ms=metadata_max_age_ms)
self._topics = set() # empty set will fetch all topic metadata
Expand Down Expand Up @@ -133,7 +138,8 @@ def bootstrap(self):
host, port, loop=self._loop, client_id=self._client_id,
request_timeout_ms=self._request_timeout_ms,
ssl_context=self._ssl_context,
security_protocol=self._security_protocol)
security_protocol=self._security_protocol,
max_idle_ms=self._connections_max_idle_ms)
except (OSError, asyncio.TimeoutError) as err:
log.error('Unable connect to "%s:%s": %s', host, port, err)
continue
Expand Down
37 changes: 34 additions & 3 deletions aiokafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
@asyncio.coroutine
def create_conn(host, port, *, loop=None, client_id='aiokafka',
request_timeout_ms=40000, api_version=(0, 8, 2),
ssl_context=None, security_protocol="PLAINTEXT"):
ssl_context=None, security_protocol="PLAINTEXT",
max_idle_ms=None):
if loop is None:
loop = asyncio.get_event_loop()
conn = AIOKafkaConnection(
host, port, loop=loop, client_id=client_id,
request_timeout_ms=request_timeout_ms,
api_version=api_version,
ssl_context=ssl_context, security_protocol=security_protocol)
ssl_context=ssl_context, security_protocol=security_protocol,
max_idle_ms=max_idle_ms)
yield from conn.connect()
return conn

Expand All @@ -51,7 +53,8 @@ class AIOKafkaConnection:

def __init__(self, host, port, *, loop, client_id='aiokafka',
request_timeout_ms=40000, api_version=(0, 8, 2),
ssl_context=None, security_protocol="PLAINTEXT"):
ssl_context=None, security_protocol="PLAINTEXT",
max_idle_ms=None):
self._loop = loop
self._host = host
self._port = port
Expand All @@ -67,6 +70,10 @@ def __init__(self, host, port, *, loop, client_id='aiokafka',
self._correlation_id = 0
self._closed_fut = None

self._max_idle_ms = max_idle_ms
self._last_action = loop.time()
self._idle_handle = None

@asyncio.coroutine
def connect(self):
loop = self._loop
Expand All @@ -88,8 +95,28 @@ def connect(self):
self._reader, self._writer, self._protocol = reader, writer, protocol
# Start reader task.
self._read_task = ensure_future(self._read(), loop=loop)
# Start idle checker
if self._max_idle_ms is not None:
self._idle_handle = self._loop.call_soon(self._idle_check)
return reader, writer

def _idle_check(self):
idle_for = self._loop.time() - self._last_action
timeout = self._max_idle_ms / 1000
# If we have any pending requests, we are assumed to be not idle.
# it's up to `request_timeout_ms` to break those.
if (idle_for >= timeout) and not self._requests:
self.close()
else:
if self._requests:
# We must wait at least max_idle_ms anyway. Mostly this setting
# is quite high so we shouldn't spend many CPU on this
wake_up_in = timeout
else:
wake_up_in = timeout - idle_for
self._idle_handle = self._loop.call_later(
wake_up_in, self._idle_check)

def __repr__(self):
return "<AIOKafkaConnection host={0.host} port={0.port}>".format(self)

Expand Down Expand Up @@ -143,6 +170,8 @@ def close(self):
if not fut.done():
fut.set_exception(error)
self._requests = []
if self._idle_handle is not None:
self._idle_handle.cancel()
# transport.close() will close socket, but not right ahead. Return
# a future in case we need to wait on it.
return self._closed_fut
Expand Down Expand Up @@ -182,6 +211,8 @@ def _read(self):
self.log.debug('%s Response %d: %s',
self, correlation_id, response)
fut.set_result(response)
# Update idle timer.
self._last_action = self._loop.time()
except (OSError, EOFError, ConnectionError) as exc:
conn_exc = Errors.ConnectionError(
"Connection at {0}:{1} broken".format(self._host, self._port))
Expand Down
8 changes: 6 additions & 2 deletions aiokafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ class AIOKafkaConsumer(object):
socket connections. Directly passed into asyncio's
`create_connection`_. For more information see :ref:`ssl_auth`.
Default: None.
connections_max_idle_ms (int): Close idle connections after the number
of milliseconds specified by this config. Default: 540000 (9hours).
Note:
Many configuration parameters are taken from Java Client:
Expand Down Expand Up @@ -152,7 +154,8 @@ def __init__(self, *topics, loop,
max_poll_records=None,
ssl_context=None,
security_protocol='PLAINTEXT',
api_version='auto'):
api_version='auto',
connections_max_idle_ms=540000):
if api_version not in ('auto', '0.9', '0.10'):
raise ValueError("Unsupported Kafka API version")
self._client = AIOKafkaClient(
Expand All @@ -162,7 +165,8 @@ def __init__(self, *topics, loop,
retry_backoff_ms=retry_backoff_ms,
api_version=api_version,
ssl_context=ssl_context,
security_protocol=security_protocol)
security_protocol=security_protocol,
connections_max_idle_ms=connections_max_idle_ms)

self._group_id = group_id
self._heartbeat_interval_ms = heartbeat_interval_ms
Expand Down
28 changes: 28 additions & 0 deletions tests/test_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,34 @@ def test_basic_connection_load_meta(self):
conn.close()
self.assertIsInstance(response, MetadataResponse)

@run_until_complete
def test_connections_max_idle_ms(self):
host, port = self.kafka_host, self.kafka_port
conn = yield from create_conn(
host, port, loop=self.loop, max_idle_ms=200)
self.assertEqual(conn.connected(), True)
yield from asyncio.sleep(0.1, loop=self.loop)
# Do some work
request = MetadataRequest([])
yield from conn.send(request)
yield from asyncio.sleep(0.15, loop=self.loop)
# Check if we're stil connected after 250ms, as we were not idle
self.assertEqual(conn.connected(), True)

# It shouldn't break if we have a long running call either
readexactly = conn._reader.readexactly
with mock.patch.object(conn._reader, 'readexactly') as mocked:
@asyncio.coroutine
def long_read(n):
yield from asyncio.sleep(0.2, loop=self.loop)
return (yield from readexactly(n))
mocked.side_effect = long_read
yield from conn.send(MetadataRequest([]))
self.assertEqual(conn.connected(), True)

yield from asyncio.sleep(0.2, loop=self.loop)
self.assertEqual(conn.connected(), False)

@run_until_complete
def test_send_without_response(self):
"""Imitate producer without acknowledge, in this case client produces
Expand Down

0 comments on commit 8693bde

Please sign in to comment.