Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

speed up connector limiting #2937

Merged
merged 24 commits into from
Apr 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/2937.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up connector limiting
46 changes: 31 additions & 15 deletions aiohttp/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import sys
import traceback
import warnings
from collections import defaultdict
from collections import defaultdict, deque
from contextlib import suppress
from http.cookies import SimpleCookie
from itertools import cycle, islice
Expand Down Expand Up @@ -181,7 +181,9 @@ def __init__(self, *, keepalive_timeout=sentinel,
self._acquired_per_host = defaultdict(set)
self._keepalive_timeout = keepalive_timeout
self._force_close = force_close
self._waiters = defaultdict(list)

# {host_key: FIFO list of waiters}
self._waiters = defaultdict(deque)

self._loop = loop
self._factory = functools.partial(ResponseHandler, loop=loop)
Expand Down Expand Up @@ -392,12 +394,19 @@ async def connect(self, req, traces=None):

try:
await fut
finally:
# remove a waiter even if it was cancelled
waiters.remove(fut)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is slow because it's trying to find a future in a list for each request that's waiting on a connector

except BaseException:
# remove a waiter even if it was cancelled, normally it's
# removed when it's notified
try:
waiters.remove(fut)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just checked and there is no remove method in defaultdict.

except ValueError: # fut may no longer be in list
pass

if not waiters:
del self._waiters[key]

raise

if traces:
for trace in traces:
await trace.send_connection_queued_end()
Expand All @@ -423,10 +432,8 @@ async def connect(self, req, traces=None):
except BaseException:
# signal to waiter
if key in self._waiters:
for waiter in self._waiters[key]:
if not waiter.done():
waiter.set_result(None)
break
waiters = self._waiters[key]
self._release_key_waiter(key, waiters)
raise
finally:
if not self._closed:
Expand Down Expand Up @@ -470,25 +477,34 @@ def _get(self, key):
del self._conns[key]
return None

def _release_key_waiter(self, key, waiters):
if not waiters:
return False

waiter = waiters.popleft()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Umm, you have to persist till reaches a waiter not done or you run out of waiters, see the implementation of CPython [1].

[1] https://github.com/python/cpython/blob/master/Lib/asyncio/locks.py#L450

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you saying the old implementation was wrong: https://github.com/aio-libs/aiohttp/pull/2937/files#diff-7f25afde79f309e2f8722c26cf1f10adL481 ? I don't believe this is the case. There are two ways a waiter can be removed:

  1. An exception happened while waiting (in exception handler)
    1. a release was dispatched for said waiter (someone will see a release)
  2. through this method

What you describe would create a double release for 1.i. This is in fact the scenario you before alluded to

Copy link
Contributor

@pfreixes pfreixes Apr 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the issue was already there, indeed I can see the following issues with the code that we have in master:

  • The iteration till reach a none canceled waiter has to be done through all of the items of a list, right now is only done on top of the head of each list.
  • Each time that we try to release a waiter we have to calculate if the limit and the number of concurrent connections allows us to make it. This is done only in one when the release_waiter is called explicitly but not when we had an exception trying to make the connection.
  • The limit per host, TBH, i would say that is not well calculated.

So we have to fix them, but true that they were already there and would be nice if we decouple both things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya I have a feeling this is the tip of the iceberg :) I have a strong suspicious there's a leak in aiohttp or something aiohttp uses as right now we're leaking ~40MB / week in prod

if not waiter.done():
waiter.set_result(None)

if not waiters:
del self._waiters[key]

return True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't do anything with the result, don't implement a result at all, it's like dead code. This is just a small remark

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

? result is used in _release_waiter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see sry


def _release_waiter(self):
# always release only one waiter

if self._limit:
# if we have limit and we have available
if self._limit - len(self._acquired) > 0:
for key, waiters in self._waiters.items():
if waiters:
if not waiters[0].done():
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with the new model we can guarantee that there are only active waiters in the list so we can greatly simplify this to always popping and notifying the first item in the list

waiters[0].set_result(None)
if self._release_key_waiter(key, waiters):
break

elif self._limit_per_host:
# if we have dont have limit but have limit per host
# then release first available
for key, waiters in self._waiters.items():
if waiters:
if not waiters[0].done():
waiters[0].set_result(None)
if self._release_key_waiter(key, waiters):
break

def _release_acquired(self, key, proto):
Expand Down
48 changes: 38 additions & 10 deletions tests/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import socket
import ssl
import uuid
from collections import deque
from unittest import mock

import pytest
Expand Down Expand Up @@ -40,8 +41,8 @@ def ssl_key():


@pytest.fixture
def unix_sockname(tmpdir):
sock_path = tmpdir / 'socket.sock'
def unix_sockname(shorttmpdir):
sock_path = shorttmpdir / 'socket.sock'
return str(sock_path)


Expand Down Expand Up @@ -90,7 +91,7 @@ async def test_del_with_scheduled_cleanup(loop):
loop.set_debug(True)
conn = aiohttp.BaseConnector(loop=loop, keepalive_timeout=0.01)
transp = mock.Mock()
conn._conns['a'] = [(transp, 'proto', 123)]
conn._conns['a'] = [(transp, 123)]

conns_impl = conn._conns
exc_handler = mock.Mock()
Expand All @@ -115,7 +116,7 @@ async def test_del_with_scheduled_cleanup(loop):
def test_del_with_closed_loop(loop):
conn = aiohttp.BaseConnector(loop=loop)
transp = mock.Mock()
conn._conns['a'] = [(transp, 'proto', 123)]
conn._conns['a'] = [(transp, 123)]

conns_impl = conn._conns
exc_handler = mock.Mock()
Expand Down Expand Up @@ -319,7 +320,7 @@ def test_release_waiter(loop, key, key2):
w1, w2 = mock.Mock(), mock.Mock()
w1.done.return_value = False
w2.done.return_value = False
conn._waiters[key] = [w1, w2]
conn._waiters[key] = deque([w1, w2])
conn._release_waiter()
assert w1.set_result.called
assert not w2.set_result.called
Expand All @@ -330,7 +331,7 @@ def test_release_waiter(loop, key, key2):
w1, w2 = mock.Mock(), mock.Mock()
w1.done.return_value = True
w2.done.return_value = False
conn._waiters[key] = [w1, w2]
conn._waiters[key] = deque([w1, w2])
conn._release_waiter()
assert not w1.set_result.called
assert not w2.set_result.called
Expand All @@ -343,8 +344,8 @@ def test_release_waiter_per_host(loop, key, key2):
w1, w2 = mock.Mock(), mock.Mock()
w1.done.return_value = False
w2.done.return_value = False
conn._waiters[key] = [w1]
conn._waiters[key2] = [w2]
conn._waiters[key] = deque([w1])
conn._waiters[key2] = deque([w2])
conn._release_waiter()
assert ((w1.set_result.called and not w2.set_result.called) or
(not w1.set_result.called and w2.set_result.called))
Expand Down Expand Up @@ -960,7 +961,9 @@ async def test_connect_tracing(loop):
conn._create_connection.return_value = loop.create_future()
conn._create_connection.return_value.set_result(proto)

await conn.connect(req, traces=traces)
conn2 = await conn.connect(req, traces=traces)
conn2.release()

on_connection_create_start.assert_called_with(
session,
trace_config_ctx,
Expand Down Expand Up @@ -1411,7 +1414,8 @@ async def test_connect_reuseconn_tracing(loop, key):

conn = aiohttp.BaseConnector(loop=loop, limit=1)
conn._conns[key] = [(proto, loop.time())]
await conn.connect(req, traces=traces)
conn2 = await conn.connect(req, traces=traces)
conn2.release()

on_connection_reuseconn.assert_called_with(
session,
Expand Down Expand Up @@ -1736,6 +1740,29 @@ async def create_connection(req, traces=None):
assert ret._key == 'key'
assert ret.protocol == proto
assert proto in conn._acquired
ret.release()


async def test_cancelled_waiter(loop):
conn = aiohttp.BaseConnector(limit=1, loop=loop)
req = mock.Mock()
req.connection_key = 'key'
proto = mock.Mock()

async def create_connection(req, traces=None):
await asyncio.sleep(1)
return proto

conn._create_connection = create_connection

conn._acquired.add(proto)

conn2 = loop.create_task(conn.connect(req))
await asyncio.sleep(0, loop=loop)
conn2.cancel()

with pytest.raises(asyncio.CancelledError):
await conn2


async def test_error_on_connection_with_cancelled_waiter(loop):
Expand Down Expand Up @@ -1785,6 +1812,7 @@ async def create_connection(req, traces=None):
assert ret._key == 'key'
assert ret.protocol == proto
assert proto in conn._acquired
ret.release()


async def test_tcp_connector(aiohttp_client, loop):
Expand Down