Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

speed up connector limiting #2937

Merged
merged 24 commits into from
Apr 27, 2018

Conversation

thehesiod
Copy link
Contributor

see conversation in #1821

@@ -392,11 +396,18 @@ def closed(self):

try:
await fut
finally:
# remove a waiter even if it was cancelled
waiters.remove(fut)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is slow because it's trying to find a future in a list for each request that's waiting on a connector

def _release_waiter(self):
# always release only one waiter

if self._limit:
# if we have limit and we have available
if self._limit - len(self._acquired) > 0:
for key, waiters in self._waiters.items():
if waiters:
if not waiters[0].done():
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with the new model we can guarantee that there are only active waiters in the list so we can greatly simplify this to always popping and notifying the first item in the list

return False

waiter = waiters.pop(0)
waiter.set_result(None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can not guarantee that the element is a none canceled future. Even you are removing from the list here [1].

Once the Future is set with an exception the callback [2] that will trigger this Exception will be scheduled by the loop, so before the callback that cancels the Task is really executed, the task that was holding a connection and wants to release it might pop a canceled future.

[1] https://github.com/aio-libs/aiohttp/pull/2937/files#diff-7f25afde79f309e2f8722c26cf1f10adR399
[2] https://github.com/python/cpython/blob/master/Lib/asyncio/futures.py#L254

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how this can happen because the async stack ends on the waiter for Future and there are no done callbacks associated with it, see this example: https://ideone.com/izIfqw

Think of how this would work, currently no-one sets an exception on this future besides cancelling the waiter at [1], we can verify this by seeing the future is only available from self._waiters and no one sets an exception on it.

so, if you cancel task at [1], you have two options:

  1. you don't wait on the cancelled task:
    1.1) you can release it (ok, removed from list), and then wait on cancelled (item already gone from list)
    1.2) you can not release it (ok), then wait on cancelled (item will be removed from list)
  2. you wait on the cancelled task, in which case the baseexcept runs and removes item from list. Nothing else can happen because we're just waiting on the future and no-one else is waiting on said future or has callbacks against said future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me know if I'm missing something in my logic, I know it can get complex

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task will end up once the reactor schedules again the task, the result that is given back to the task depends on the future taht yielded the task before. The cancel is only a signal that wakes up the task with an CanceledError exception.

So the task might be still there just waiting for its turn in the reactor/loop. While its happenning you have chances to have the situation that I told tou.

Please review the asyncio lock code, all of the different implementations take this into consideration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @pfreixes for that example, so it seems it depends on the ordering of the tasks called during cancellation. Here's a question, can you have an outstanding release for a single connector? It seems like you could end up releasing two connectors for a single cancel no if the except clause executes before wakeup_next_waiter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about the question, but once the race condition is considered and mitigated the code should work as is expected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pfreixes my "picture" :)

import asyncio

loop = asyncio.get_event_loop()
waiters = []


async def task(waiter):
    try:
        await waiter
    except:
        print("task finalized")
        if waiter in waiters:
            print("waiter removed")
            try:
                waiters.remove(waiter)
            except ValueError:
                print("Waiter already removed")
        else:
            print("waiter not present")
        raise


def wakeup_next_waiter():
    if not waiters:
        return

    waiter = waiters.pop(0)
    try:
        if not waiter.done():
            waiter.set_result(None)
    except Exception as e:
        print(f"Exception calling set_result {e!r}")
        raise


async def main(loop):
    # We add two waiters
    waiter = loop.create_future()
    waiters.append(waiter)

    waiter = loop.create_future()
    waiters.append(waiter)

    # create the task that will wait till either the waiter
    # is finished or the task is cancelled.
    t = asyncio.ensure_future(task(waiter))

    # make a loop iteration to allow the task reach the
    #     await waiter
    await asyncio.sleep(0)

    # put in the loop a callback to wake up the waiter.
    loop.call_later(0.1, wakeup_next_waiter)

    # cancel the task, this will mark the task as cancelled
    # but will be pending a loop iteration to wake up the
    # task, having as a result a CanceledError exception.
    # This implicitly will schedule the Task._wakeup function
    # to be executed in the next loop iteration.
    t.cancel()

    try:
        await t
    except asyncio.CancelledError:
        pass

    # wait for the release to run
    await asyncio.sleep(1)

    # now we have zero waiters even though only one was cancelled
    print(len(waiters))

loop.run_until_complete(main(loop))

note in this example I add two waiters, cancel one, but at the end none are left because I ensure that the wakeup_next_waiter happens after task

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the issue in the code - be carefull with extra ifs that are not needed in thte task function. If you are claiming that both waiters are removed this IMO is perfectly fine.

The first waiter that is created is removed through the happy path of the wakeup_next_waiter, obviously being removed by waiter = waiters.pop(0), no task is wakened up because you didn't start any task with that waiter. The second waiter is canceled and the task related will remove the waiter by the waiters.remove(waiter).

Just to put all of us in the same page, the release of a used connection is done automatically by the context manager of a request calling the resp.release() [1]. So every time that a code path goes out of the context will release the connection and will try to wake up pending requests.

[1] https://github.com/aio-libs/aiohttp/blob/master/aiohttp/client_reqrep.py#L786

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I think ya the way to think about this is that there are N waiters and M connectors, while they're represented in self._waiters the "ownership" on each side is unique. You may inefficiently wake up a cancelled waiter with the current algo but it's probably easier to deal with. Thanks ago for convo was enlightening. Will work on unittest fixes and recommended change today.

if not waiters:
del self._waiters[key]
except BaseException:
# remove a waiter even if it was cancelled, normally it's
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the future has been canceled, we do need to wake up another waiter. Take a look at the semaphore implementation [1]

[1] https://github.com/python/cpython/blob/master/Lib/asyncio/locks.py#L478

Copy link
Contributor Author

@thehesiod thehesiod Apr 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method created the future, why would we need to wake up another waiter? That doesn't make sense as it would imply yet another connector is available. This is 1-1, one waiter was added, one removed. Also note that code is if the future was not cancelled, in this scenario it can only be cancelled

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My mistake, the wake up will be done automariaclly by the exit of the context manager in whatever scenario. So forget about this


# {host_key: FIFO list of waiters}
# NOTE: this is not a true FIFO because the true order is lost amongst
# the dictionary keys
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest using the deque [1] data strucutre that is the one used by al of the implementations of asyncio.locks. More likely because lists have the following constraint:

lists incur O(n) memory movement costs for pop(0)

[1] https://github.com/python/cpython/blob/master/Lib/asyncio/locks.py#L437

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great idea, done, slight speed improvement :)

waiters.remove(fut)
if not waiters:
del self._waiters[key]
except BaseException:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't understand why the deletion is moved from finally to except.
Why shouldn't we remove the waiter if no exceptions was raised?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asvetlov this is for two reasons: performance, and that if no exception is thrown the removal happened by the release method.

@pfreixes
Copy link
Contributor

@thehesiod will be nice if we can get the CI in green, right now tests related to stuff that you are improving are failing.

If the numbers that you claim [1] improve the Aiohttp in such a measure we would need to release ASAP a new version.

So, let's try to focus on resolving the MR comments and move ahead this MR.

[1] #1821 (comment)

@thehesiod
Copy link
Contributor Author

failures don't seem like my fault:
AttributeError: module 'asyncio.coroutines' has no attribute 'debug_wrapper'

@codecov-io
Copy link

codecov-io commented Apr 18, 2018

Codecov Report

Merging #2937 into master will increase coverage by <.01%.
The diff coverage is 80.95%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #2937      +/-   ##
==========================================
+ Coverage   97.99%   97.99%   +<.01%     
==========================================
  Files          40       40              
  Lines        7520     7531      +11     
  Branches     1318     1317       -1     
==========================================
+ Hits         7369     7380      +11     
- Misses         48       49       +1     
+ Partials      103      102       -1
Impacted Files Coverage Δ
aiohttp/connector.py 96.86% <80.95%> (+0.04%) ⬆️
aiohttp/web_app.py 99.09% <0%> (+0.01%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 26802e0...66333b5. Read the comment docs.

- fix tests on OSX
- add new test for coverage
- fix wrong number of items in _conns
@pfreixes
Copy link
Contributor

@thehesiod seems that you are still having a linter issue

12.70s$ flake8 aiohttp examples tests demos
tests/test_connector.py:1752:5: F841 local variable 'i' is assigned to but never used

if not waiters:
del self._waiters[key]

return True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't do anything with the result, don't implement a result at all, it's like dead code. This is just a small remark

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

? result is used in _release_waiter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see sry


# {host_key: FIFO list of waiters}
# NOTE: this is not a true FIFO because the true order is lost amongst
# the dictionary keys
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that this can be removed, the FIFO is for each dictionary key. Different keys mean different tuple values of (hosts, port) which FIFO does not make sense.

Copy link
Contributor Author

@thehesiod thehesiod Apr 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

each deque is indeed a FIFO as the first one in will be the first one out (amongst items in that list), however across keys it's not a FIFO because it currently iterates across keys (which theoretically can be in any order) when choosing from which FIFO to release.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which deque to release is not a random choice and its based on the hash of the host and port, so those connections that are waiting for a free connection and match the host and port will share the same deque in a FIFO way.

Yes we are saying the same, the dictionary is just the structure that keeps all of the FIFO queues.

Let's save the comments for what is really not understandable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't say it was random, I said it wasn't a true FIFO queue because it's choosing which queue to release a connector from in dictionary key order, and not in FIFO order. Anyways, removed the comment and people will have to figure this out themselves now. If this were to be "correct" there would need to be a separate priority queue with pointers back to these queues....or perhaps a multiindex priority queue :)

@pfreixes
Copy link
Contributor

@asvetlov this MR is quite critical and once merged will ship a good performance improvement when the client suffers a lot of concurrent connections and starts to apply backpressure, the figures speak by them self

Before

limit_builtin:
  Responses: Counter({200: 9999})
  Took:      49.72031831741333 seconds
limit_semaphore:
  Responses: Counter({200: 9999})
  Took:      22.222461223602295 seconds

After

limit_builtin:
  Responses: Counter({200: 9999})
  Took:      19.374465942382812 seconds
limit_semaphore:
  Responses: Counter({200: 9999})
  Took:      23.060685396194458 seconds

Let's keep pushing/discussing the changes that are asked to allow Aiohttp merge at some point this improvement. Also as a critical part of the code, I would like to have a second approval from @asvetlov , mine will be there once the two pending issues that I've commented are solved.

Copy link
Member

@asvetlov asvetlov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@pfreixes please merge when you will be ok with all changes.

if not waiters:
del self._waiters[key]
except ValueError: # fut may no longer be in list
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be careful with that, in case a ValueError exception you will mask the original one

try:
    val = 1 / 0
except Exception:
    try:
        a = {}
        a['foo']
    except KeyError:
        pass
    raise

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @pfreixes , try to scope what you are catching as much as possible.

This is much better imo:

try:
    ...

    if not waiters:
        try:
            del self._waiters[key]
        except ValueError:
            pass

    ...

except:
    ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see what you're saying here, waiters.remove(fut) throws the ValueError, del self._waiters[key] could throw a KeyError, not another ValueError. Not going to change this unless it's really needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Read my first comment, unless you make it explicit with a raise from e, the second try/except masks the original exception in case of ValueError.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotchya, thanks


# {host_key: FIFO list of waiters}
# NOTE: this is not a true FIFO because the true order is lost amongst
# the dictionary keys
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which deque to release is not a random choice and its based on the hash of the host and port, so those connections that are waiting for a free connection and match the host and port will share the same deque in a FIFO way.

Yes we are saying the same, the dictionary is just the structure that keeps all of the FIFO queues.

Let's save the comments for what is really not understandable.

@pfreixes
Copy link
Contributor

@thehesiod thanks for the hard work that has been done, let's try to close the opened discussions.

I don't want to block the PR for just one source code comment, but the exception masked its IMO something that needs to be addressed.

Copy link
Contributor

@cecton cecton left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No method remove in defaultdict

# remove a waiter even if it was cancelled, normally it's
# removed when it's notified
try:
waiters.remove(fut)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just checked and there is no remove method in defaultdict.

if not waiters:
del self._waiters[key]
except ValueError: # fut may no longer be in list
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @pfreixes , try to scope what you are catching as much as possible.

This is much better imo:

try:
    ...

    if not waiters:
        try:
            del self._waiters[key]
        except ValueError:
            pass

    ...

except:
    ...

Copy link
Contributor

@cecton cecton left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahhh that waiters is a list. My bad, sry again

@thehesiod
Copy link
Contributor Author

thehesiod commented Apr 24, 2018

enlightening PR for all of us it seems, mostly me I guess :) thanks guys

if not waiters:
return False

waiter = waiters.popleft()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Umm, you have to persist till reaches a waiter not done or you run out of waiters, see the implementation of CPython [1].

[1] https://github.com/python/cpython/blob/master/Lib/asyncio/locks.py#L450

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you saying the old implementation was wrong: https://github.com/aio-libs/aiohttp/pull/2937/files#diff-7f25afde79f309e2f8722c26cf1f10adL481 ? I don't believe this is the case. There are two ways a waiter can be removed:

  1. An exception happened while waiting (in exception handler)
    1. a release was dispatched for said waiter (someone will see a release)
  2. through this method

What you describe would create a double release for 1.i. This is in fact the scenario you before alluded to

Copy link
Contributor

@pfreixes pfreixes Apr 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the issue was already there, indeed I can see the following issues with the code that we have in master:

  • The iteration till reach a none canceled waiter has to be done through all of the items of a list, right now is only done on top of the head of each list.
  • Each time that we try to release a waiter we have to calculate if the limit and the number of concurrent connections allows us to make it. This is done only in one when the release_waiter is called explicitly but not when we had an exception trying to make the connection.
  • The limit per host, TBH, i would say that is not well calculated.

So we have to fix them, but true that they were already there and would be nice if we decouple both things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya I have a feeling this is the tip of the iceberg :) I have a strong suspicious there's a leak in aiohttp or something aiohttp uses as right now we're leaking ~40MB / week in prod

@pfreixes
Copy link
Contributor

LGTM, I cant merge it because of the coverage issue. Can you help us @asvetlov?

In any case, let's hold on a new release I would like to work on the issues that I've commented here [1]

[1] #2937 (comment)

@asvetlov
Copy link
Member

Sure, I can merge it but what prevents from adding new tests for ensuring the full coverage?
@pfreixes if you want to do it in separate PR -- I'm fine with it.

@pfreixes
Copy link
Contributor

I will provide a new MR with the fixes and more coverage.

@asvetlov asvetlov merged commit 03d590e into aio-libs:master Apr 27, 2018
@asvetlov
Copy link
Member

Ok. Merged

@asvetlov
Copy link
Member

thanks to all

thehesiod added a commit to thehesiod-forks/aiohttp that referenced this pull request Apr 28, 2018
@thehesiod thehesiod deleted the thehesiod-connector-speed2 branch June 1, 2018 23:27
@lock
Copy link

lock bot commented Oct 28, 2019

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a [new issue] for related bugs.
If you feel like there's important points made in this discussion, please include those exceprts into that [new issue].
[new issue]: https://github.com/aio-libs/aiohttp/issues/new

@lock lock bot added the outdated label Oct 28, 2019
@lock lock bot locked as resolved and limited conversation to collaborators Oct 28, 2019
@psf-chronographer psf-chronographer bot added the bot:chronographer:provided There is a change note present in this PR label Oct 28, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
bot:chronographer:provided There is a change note present in this PR outdated
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants