Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Not waiting for connection_lost() when transport closed #1925

Closed
omnicognate opened this issue May 26, 2017 · 34 comments
Closed

Not waiting for connection_lost() when transport closed #1925

omnicognate opened this issue May 26, 2017 · 34 comments
Milestone

Comments

@omnicognate
Copy link

@omnicognate omnicognate commented May 26, 2017

Long story short

aiohttp calls close() on the transport and then immediately discards it without waiting for connection_lost(). This is a problem for SSL transports, which have to shut down asynchronously, and it appears aiohttp only "works" with the standard library's SSL transport because of a bug in that transport.

Expected behaviour

aiohttp waiting for connection_lost() before discarding transport

Actual behaviour

It not doing so

Steps to reproduce

import aiohttp
import asyncio

async def main():
    conn = aiohttp.TCPConnector(verify_ssl=False)
    async with aiohttp.ClientSession(connector=conn) as session:
        async with session.get('https://httpbin.org/') as resp:
            resp.raise_for_status()

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

Run this program and breakpoint connection_made() and connection_lost() in client_proto.py. The former is called, but not the latter.

The program appears to run successfully, but to convince you that something is severely amiss, add a __del__ method to the class _SelectorSocketTransport in asyncio's selector_events.py and breakpoint it and that transport's close() method. You'll see that close() is never called and __del__ gets called during shutdown, after the loop has been closed and the module finalised, because it still has a reader registered and is therefore leaked (while still waiting on I/O) in a reference cycle with the event loop.

If I'm understanding it right, the overall behaviour appears to be due to two bugs, one in asyncio and one in aiohttp:

  1. When close() is called on the SSLProtocolTransport in asyncio's ssl_proto.py it initiates an asynchronous shutdown process, which should ultimately result in it calling connection_lost(). However, this doesn't progress for reasons I haven't fully figured out, so connection_lost() is never called. The shutdown process is clearly broken because it never results in close() being called on the underlying transport, which is leaked as shown above.
  2. aiohttp doesn't wait for connection_lost(), so the hang that one would expect as a result of this never happens. By luck, the broken shutdown sequence never actually causes any errors.

I'm actually more interested in the latter, the aiohttp behaviour, because I've written my own SSL transport (based on pyopenssl), which does a correct asynchronous shutdown sequence. When aiohttp discards my transport without waiting for connection_lost(), it causes errors, which I don't think are my fault.

Your environment

Tested on Windows and Linux with python 3.5.1 and 3.5.2 and aiohttp 2.0.7 and 2.1.0.

@omnicognate
Copy link
Author

@omnicognate omnicognate commented May 26, 2017

I think the waiting for the transport shutdown would have to happen within the aexit coroutine in ClientSession, which currently just calls an ordinary function to perform the close. I don't see any other stage in program execution where the wait could be done.

@fafhrd91
Copy link
Member

@fafhrd91 fafhrd91 commented Jun 18, 2017

What is the reason to wait until close procedure completes in client code?

BaseConnection accepts enable_cleanup_closed parameter, it adds special wait queue
for ssl connections https://github.com/aio-libs/aiohttp/blob/master/aiohttp/connector.py#L151
but that is work around buggy server which never complete close procedure.

@asvetlov
Copy link
Member

@asvetlov asvetlov commented Jun 26, 2017

Well, graceful closing all connections before exit from async with ClientSession() might be good idea.
But I see problems with implementation until ClientSession.close() is actually a function returning a future object.
This behavior should be supported until we drop Python 3.4 along with with ClientSession() etc.
So the issue implementation should be postponed.

@fafhrd91
Copy link
Member

@fafhrd91 fafhrd91 commented Jun 26, 2017

connection_lost has nothing to do with client session. Also why should we care?

@asvetlov adding graicful close support to session is bad idea.

@asvetlov
Copy link
Member

@asvetlov asvetlov commented Jun 26, 2017

Maybe graceful close is bad wording but I want to finish client.close() only after getting connection_lost notifications from all opened connections.

Otherwise people should put await sleep(0) but even this doesn't help for SSL connections -- they need more than one sleep for shut down.

@fafhrd91
Copy link
Member

@fafhrd91 fafhrd91 commented Jun 26, 2017

So what is so important in close? Is session is alive we can wait for connection_close, but if developer dropped session why aiohttp should wait for connection_lost? That is expected behavior otherwise will get same as with old release method implemention. What explicit behavior then add something like release_all_connections. Another question how often you drop session in normal implication? Once per process run? Sure it makes live of transport developers harder, but who cares?

@omnicognate
Copy link
Author

@omnicognate omnicognate commented Jun 27, 2017

Another problem with implementing this at the moment is that the default SSL transport never calls connection_lost() so you'll get a hang. I might get around to submitting an issue to python for that at some point, but I've had to move on from asyncio entirely as a result of this and numerous other bugs, and the fact that there appears to be little clarity on how to correctly implement and use the interfaces.

As to why proper shutdown behaviour is required, it's because without it it is not safe for the client to close the event loop after the session is closed, and the client has no reliable way to wait for a time at which it is safe to discard the loop.

SSL has a shutdown sequence that requires multiple steps, which in asyncio are naturally implemented asynchronously. Calling close() initiates that sequence, but the indication that it has finished is connection_lost(). If you don't wait for that sequence to complete before the aiohttp session's close has finished then the program will proceed without the SSL shutdown sequence having been completed. Worse, the program will proceed while the shutdown is ongoing, with tasks pending on the event loop. There is then no way for the client to wait for those pending tasks to be complete before destroying the event loop, and destroying the event loop without them completing will cause "Task was destroyed but it is pending" warnings and a host of other problems. As @asvetlov comments, even doing an await sleep(0) isn't sufficient in the client as there can be an arbitrary number of such awaits required before the queue is empty (2 in the particular case of SSL) and there is no way to explicitly check for the queue being empty.

The only reason that these problems are not currently occurring is that asyncio's default SSL transport isn't proceeding with its multi-step SSL shutdown sequence, so there are no tasks pending on the queue. This is a bug in that transport, and means that it never calls connection_lost().

If I'm understanding it correctly, this is a very nasty 2-way bug-bug dependency between aiohttp and asyncio. Fixing either of the two bugs will result in breakage: if you fix asyncio's SSL transport to do a correct SSL shutdown sequence without fixing aiohttp to wait for it to complete then existing clients who discard the event loop immediately after the aiohttp session is closed will be destroying the event loop with tasks pending; if you make aiohttp wait for the transport to do its shutdown without fixing the SSL transport to actually proceed with its shutdown, you will cause existing clients to hang.

It's a bit of a Gordian knot, if I'm understanding it correctly.

@omnicognate
Copy link
Author

@omnicognate omnicognate commented Jun 27, 2017

Oops, I didn't mean to close the issue!

@omnicognate omnicognate reopened this Jun 27, 2017
@asvetlov asvetlov added this to the 3.0 milestone Jun 27, 2017
@fafhrd91
Copy link
Member

@fafhrd91 fafhrd91 commented Jun 27, 2017

I dont think we should do anything until asyncio get fixed or changes. I still don't see why we should care about transports if developer dropped session.

@asvetlov asvetlov changed the title Not waiting for connection_lost() when transport closed [on hold] Not waiting for connection_lost() when transport closed Jun 27, 2017
@asvetlov
Copy link
Member

@asvetlov asvetlov commented Jun 27, 2017

Ok, let just put the issue on hold until async fixes SSL transport.
But please don't close -- I want to keep tracking.

@fafhrd91
Copy link
Member

@fafhrd91 fafhrd91 commented Jun 27, 2017

I still don't see reason why aiohttp needs to wait for connection_close call. lets just create issue in cpython repo and make reference.

@asvetlov
Copy link
Member

@asvetlov asvetlov commented Jun 27, 2017

At least because without it user will get a warning about non-closed resources in debug mode IIRC.

@asvetlov
Copy link
Member

@asvetlov asvetlov commented Jun 27, 2017

Or he need put a sleep after closing session but before program finishing.

@fafhrd91
Copy link
Member

@fafhrd91 fafhrd91 commented Jun 27, 2017

this will complicates logic, and reason only debug message. doesn't sound like a good win for me.

@omnicognate
Copy link
Author

@omnicognate omnicognate commented Jun 28, 2017

See https://docs.python.org/3/library/asyncio-dev.html#pending-task-destroyed

The warnings are there for a reason. Closing the queue with tasks pending means that those tasks will never run. The implications of that depend on exactly what the tasks were supposed to do, but could include leaks, incorrect protocol behaviour, crashes, anything really. I only listed the warnings because they're a specific, predictable result, but the warnings are there to alert you to the less predictable and more serious potential consequences.

If you don't think the warnings are serious enough to warrant any effort to avoid them you could raise an issue suggesting they be removed, but I doubt it would get very far.

@omnicognate
Copy link
Author

@omnicognate omnicognate commented Jun 28, 2017

Oh, just one detail: These aren't "debug messages" (with the implication that they are a detail that can be ignored in production). They are warnings.

@fafhrd91
Copy link
Member

@fafhrd91 fafhrd91 commented Jun 29, 2017

But close is not a task, it just bunch of callbacks, it is transpor's responsibility to execute all of them

@smurfix
Copy link

@smurfix smurfix commented Jan 19, 2018

This boils down to programming contracts. When I use async with foo() as bar: I expect foo()'s _aenter__() code to set up the "bar" object so that I can use it, and when the block goes out of scope I expect its __aexit__(…) to return after everything is closed so that I can do whatever I need to do next – including shutting down the program – and without requiring some arcane asyncio.sleep() call to pass the time for connection_lost() and similar callbacks to run.

If that's not possible due to the way asyncio is designed, fine, document that – and urge people to ultimately find a better way of handling async code. Like, well, https://github.com/python-trio/trio for example.

NB: is there an actual issue which this bug is on hold for?

@asvetlov asvetlov modified the milestones: 3.0, 3.1 Feb 9, 2018
@asvetlov asvetlov modified the milestones: 3.1, 3.2 Mar 22, 2018
@asvetlov asvetlov modified the milestones: 3.2, 3.3 May 7, 2018
@asvetlov
Copy link
Member

@asvetlov asvetlov commented Oct 2, 2018

We should support it finally.

@asvetlov asvetlov modified the milestones: 3.3, 3.5 Oct 18, 2018
@CheViana
Copy link

@CheViana CheViana commented Feb 14, 2019

Really need this to be properly supported ASAP and ready to assist in any way I can, project I’m working on heavily relies on async calls (via aiohttp), and since recently - on ssl async calls

@artemlops artemlops mentioned this issue May 24, 2019
5 of 5 tasks complete
@hivemall
Copy link

@hivemall hivemall commented Sep 12, 2019

Is the issue solved? If so the stable doc is not up to date (dunno):
https://buildmedia.readthedocs.org/media/pdf/aiohttp/stable/aiohttp.pdf on pdfreader page 44

# Zero-sleep to allow underlying connections to close
loop.run_until_complete(asyncio.sleep(0))
@asvetlov
Copy link
Member

@asvetlov asvetlov commented Sep 12, 2019

Only partially, something should be done still

@vmarkovtsev
Copy link

@vmarkovtsev vmarkovtsev commented Jan 17, 2020

Hacky workaround for those who want everything and right now:

transports = 0
all_is_lost = asyncio.Event()
for conn in session.connector._conns.values():
    for handler, _ in conn:
        transports += 1
        proto = handler.transport._ssl_protocol
        orig_lost = proto.connection_lost

        def connection_lost(exc):
            orig_lost(exc)
            nonlocal transports
            transports -= 1
            if transports == 0:
                all_is_lost.set()

        proto.connection_lost = connection_lost
await session.close()
await all_is_lost.wait()
@XCanG
Copy link

@XCanG XCanG commented Feb 28, 2020

@vmarkovtsev wow, this code is really helped me, I was spent so much time on this issue.
However it still have caveats needed to fixing it:

  1. during waiting all_is_lost I get AttributeError:
Fatal error: protocol.eof_received() call failed.
protocol: <asyncio.sslproto.SSLProtocol object at 0x00000163B1CC0FD0>
transport: <_ProactorSocketTransport fd=500>
Traceback (most recent call last):
  File "C:\Python\Python38\lib\asyncio\sslproto.py", line 572, in eof_received
    keep_open = self._app_protocol.eof_received()
AttributeError: 'NoneType' object has no attribute 'eof_received'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Python\Python38\lib\asyncio\proactor_events.py", line 235, in _eof_received
    keep_open = self._protocol.eof_received()
  File "C:\Python\Python38\lib\asyncio\sslproto.py", line 577, in eof_received
    self._transport.close()
AttributeError: 'NoneType' object has no attribute 'close'

It ignored, however still print in console.
2. session.connector may be None
3. session.connector is not none, however it does not have any values, so the entire for is skipped and all_is_lost in that case get never awaited.

Code with this fixes, also added Timeout to wait, so that it will not wait forever in any case.

transports = 0
all_is_lost = asyncio.Event()
sess_conn = session.connector
if sess_conn is not None:
    sess_conn_vals = sess_conn._conns.values()
    if len(sess_conn_vals) == 0:
        all_is_lost.set()
    for conn in sess_conn_vals:
        for handler, _ in conn:
            transports += 1
            proto = handler.transport._ssl_protocol # type: ignore
            orig_lost = proto.connection_lost

            def connection_lost(exc):
                orig_lost(exc)
                nonlocal transports
                transports -= 1
                if transports == 0:
                    all_is_lost.set()

            proto.connection_lost = connection_lost
else:
    all_is_lost.set()
try:
    async with aiohttp.ClientTimeout(total = 30.):
        await session.close()
        await all_is_lost.wait()
except (AttributeError, asyncio.TimeoutError):
    pass
@vmarkovtsev
Copy link

@vmarkovtsev vmarkovtsev commented Feb 28, 2020

Yes, I simplified the real working code to demo the main idea. Indeed, there were some unhandled edge cases.

@vmarkovtsev
Copy link

@vmarkovtsev vmarkovtsev commented Feb 28, 2020

FWIW, here is my full version that survived a month of testing:

transports = 0
all_is_lost = asyncio.Event()
if len(session.connector._conns) == 0:
    all_is_lost.set()
for conn in session.connector._conns.values():
    for handler, _ in conn:
        proto = getattr(handler.transport, "_ssl_protocol", None)
        if proto is None:
            continue
        transports += 1
        orig_lost = proto.connection_lost
        orig_eof_received = proto.eof_received

        def connection_lost(exc):
            orig_lost(exc)
            nonlocal transports
            transports -= 1
            if transports == 0:
                all_is_lost.set()

        def eof_received():
            try:
                orig_eof_received()
            except AttributeError:
                # It may happen that eof_received() is called after
                # _app_protocol and _transport are set to None.
                pass

        proto.connection_lost = connection_lost
        proto.eof_received = eof_received
await session.close()
if transports > 0:
    await all_is_lost.wait()
@timburke
Copy link

@timburke timburke commented Jun 4, 2020

@vmarkovtsev Thanks for posting your comment #1925 (comment) with the workaround. It was very helpful in pointing me in the right direction.

I saw lint errors when I implemented your fix that orig_lost and orig_eof_received refer to a loop cell variable.

My understanding is that this means the code will set orig_lost = connection_lost for the last connection in the loop and this function will be called by connection_lost for all connections (instead of each connection calling its own connection_lost callback as I believe was the intent.

I made a quick tweak to remove this issue. I'm posting in case people copy the above implementation without linting to discover the potential problem:

import asyncio
import functools

def create_aiohttp_closed_event(session) -> asyncio.Event:
    """Work around aiohttp issuethat doesn't properly close transports on exit.

    See https://github.com/aio-libs/aiohttp/issues/1925#issuecomment-592596034

    Returns:
       An event that will be set once all transports have been properly closed.
    """

    transports = 0
    all_is_lost = asyncio.Event()

    if len(session.connector._conns) == 0:
        all_is_lost.set()
        return all_is_lost

    def connection_lost(exc, orig_lost):
        nonlocal transports

        try:
            orig_lost(exc)
        finally:
            transports -= 1
            if transports == 0:
                all_is_lost.set()

    def eof_received(orig_eof_received):
        try:
            orig_eof_received()
        except AttributeError:
            # It may happen that eof_received() is called after
            # _app_protocol and _transport are set to None.
            pass

    for conn in session.connector._conns.values():
        for handler, _ in conn:
            proto = getattr(handler.transport, "_ssl_protocol", None)
            if proto is None:
                continue

            transports += 1
            orig_lost = proto.connection_lost
            orig_eof_received = proto.eof_received

            proto.connection_lost = functools.partial(connection_lost, orig_lost=orig_lost)
            proto.eof_received = functools.partial(eof_received, orig_eof_received=orig_eof_received)

    return all_is_lost
@vmarkovtsev
Copy link

@vmarkovtsev vmarkovtsev commented Jun 5, 2020

Indeed. Your linter is better than mine 😄 Which are you using @timburke?

vmarkovtsev added a commit to athenianco/athenian-api that referenced this issue Jun 5, 2020
@timburke
Copy link

@timburke timburke commented Jun 9, 2020

Which are you using @timburke?

@vmarkovtsev A combination of pylint and mypy in SublimeLinter. I'm 95% sure this one was flagged by pylint...

@leszekhanusz
Copy link

@leszekhanusz leszekhanusz commented Oct 14, 2020

@timburke

I noticed that with your function, if we only have one connection which is not using ssl,
then the event is never set.
You need to check if the transports are 0 at the end like this:

def create_aiohttp_closed_event(session) -> asyncio.Event:                                                              
    """Work around aiohttp issue that doesn't properly close transports on exit.                                        
                                                                                                                        
    See https://github.com/aio-libs/aiohttp/issues/1925#issuecomment-639080209                                          
                                                                                                                        
    Returns:                                                                                                            
       An event that will be set once all transports have been properly closed.                                         
    """                                                                                                                 
                                                                                                                        
    transports = 0                                                                                                      
    all_is_lost = asyncio.Event()                                                                                       
                                                                                                                        
    def connection_lost(exc, orig_lost):                                                                                
        nonlocal transports                                                                                             
                                                                                                                        
        try:                                                                                                            
            orig_lost(exc)                                                                                              
        finally:                                                                                                        
            transports -= 1                                                                                             
            if transports == 0:                                                                                         
                all_is_lost.set()                                                                                       
                                                                                                                        
    def eof_received(orig_eof_received):                                                                                
        try:                                                                                                            
            orig_eof_received()                                                                                         
        except AttributeError:                                                                                          
            # It may happen that eof_received() is called after                                                         
            # _app_protocol and _transport are set to None.                                                             
            pass                                                                                                        
                                                                                                                        
    for conn in session.connector._conns.values():                                                                      
        for handler, _ in conn:                                                                                         
            proto = getattr(handler.transport, "_ssl_protocol", None)                                                   
            if proto is None:                                                                                           
                continue                                                                                                
                                                                                                                        
            transports += 1                                                                                             
            orig_lost = proto.connection_lost                                                                           
            orig_eof_received = proto.eof_received                                                                      
                                                                                                                        
            proto.connection_lost = functools.partial(                                                                  
                connection_lost, orig_lost=orig_lost                                                                    
            )                                                                                                           
            proto.eof_received = functools.partial(                                                                     
                eof_received, orig_eof_received=orig_eof_received                                                       
            )                                                                                                           
                                                                                                                        
    if transports == 0:                                                                                                 
        all_is_lost.set()                                                                                               
                                                                                                                        
    return all_is_lost

this function can be used like this:

closed_event = create_aiohttp_closed_event(session)                                               
await session.close()                                                                                  
await closed_event.wait()
@timburke
Copy link

@timburke timburke commented Oct 14, 2020

You need to check if the transports are 0 at the end like this:

Agreed, we had to make that change internally as well and I forgot to update the comment. Thanks @leszekhanusz for posting it.

@asvetlov
Copy link
Member

@asvetlov asvetlov commented Oct 22, 2020

Please check master

@asvetlov asvetlov closed this Oct 22, 2020
@leszekhanusz
Copy link

@leszekhanusz leszekhanusz commented Oct 24, 2020

So to be clear, is this bug fixed in the release 3.7.0 and the workaround no longer necessary ?

@asvetlov
Copy link
Member

@asvetlov asvetlov commented Oct 24, 2020

It will be fixed in aiohttp 4.0.0 which is not released yet

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked pull requests

Successfully merging a pull request may close this issue.

None yet
10 participants