-
-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix closing client session #3733
Changes from all commits
4e4d043
b74542b
9748dec
53ff05c
ca25cef
d1f1f5d
62e2d4d
13dcd7a
ea33a9d
2f6fcd7
dc66fc5
88e815b
a031cd0
b37ebad
27484ff
00b5088
a46ee7e
a94df81
fd2f90e
9472026
b7fedd8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Make `BaseConnector.close()` a coroutine and wait until the client closes all connections. Drop deprecated "with Connector():" syntax. |
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
@@ -1,5 +1,6 @@ | ||||||||||
import asyncio | ||||||||||
import functools | ||||||||||
import logging | ||||||||||
import random | ||||||||||
import sys | ||||||||||
import traceback | ||||||||||
|
@@ -49,7 +50,6 @@ | |||||||||
CeilTimeout, | ||||||||||
get_running_loop, | ||||||||||
is_ip_address, | ||||||||||
noop2, | ||||||||||
sentinel, | ||||||||||
) | ||||||||||
from .http import RESPONSES | ||||||||||
|
@@ -185,6 +185,10 @@ def closed(self) -> bool: | |||||||||
|
||||||||||
class _TransportPlaceholder: | ||||||||||
""" placeholder for BaseConnector.connect function """ | ||||||||||
def __init__(self, loop: asyncio.AbstractEventLoop) -> None: | ||||||||||
fut = loop.create_future() | ||||||||||
fut.set_result(None) | ||||||||||
self.closed = fut # type: asyncio.Future[Optional[Exception]] # noqa | ||||||||||
|
||||||||||
def close(self) -> None: | ||||||||||
pass | ||||||||||
|
@@ -264,7 +268,7 @@ def __del__(self, _warnings: Any=warnings) -> None: | |||||||||
|
||||||||||
conns = [repr(c) for c in self._conns.values()] | ||||||||||
|
||||||||||
self._close() | ||||||||||
self._close_immediately() | ||||||||||
|
||||||||||
if PY_36: | ||||||||||
kwargs = {'source': self} | ||||||||||
|
@@ -281,13 +285,11 @@ def __del__(self, _warnings: Any=warnings) -> None: | |||||||||
self._loop.call_exception_handler(context) | ||||||||||
|
||||||||||
def __enter__(self) -> 'BaseConnector': | ||||||||||
warnings.warn('"with Connector():" is deprecated, ' | ||||||||||
'use "async with Connector():" instead', | ||||||||||
DeprecationWarning) | ||||||||||
return self | ||||||||||
raise TypeError('use "async with Connector():" instead') | ||||||||||
|
||||||||||
def __exit__(self, *exc: Any) -> None: | ||||||||||
self.close() | ||||||||||
# __exit__ should exist in pair with __enter__ but never executed | ||||||||||
pass # pragma: no cover | ||||||||||
|
||||||||||
async def __aenter__(self) -> 'BaseConnector': | ||||||||||
return self | ||||||||||
|
@@ -386,20 +388,29 @@ def _cleanup_closed(self) -> None: | |||||||||
self, '_cleanup_closed', | ||||||||||
self._cleanup_closed_period, self._loop) | ||||||||||
|
||||||||||
def close(self) -> Awaitable[None]: | ||||||||||
async def close(self) -> None: | ||||||||||
"""Close all opened transports.""" | ||||||||||
self._close() | ||||||||||
return _DeprecationWaiter(noop2()) | ||||||||||
waiters = self._close_immediately() | ||||||||||
if waiters: | ||||||||||
results = await asyncio.gather(*waiters, | ||||||||||
loop=self._loop, | ||||||||||
return_exceptions=True) | ||||||||||
for res in results: | ||||||||||
if isinstance(res, Exception): | ||||||||||
err_msg = "Error while closing connector: " + repr(res) | ||||||||||
logging.error(err_msg) | ||||||||||
|
||||||||||
def _close_immediately(self) -> List['asyncio.Future[None]']: | ||||||||||
waiters = [] # type: List['asyncio.Future[None]'] | ||||||||||
|
||||||||||
def _close(self) -> None: | ||||||||||
if self._closed: | ||||||||||
return | ||||||||||
return waiters | ||||||||||
|
||||||||||
self._closed = True | ||||||||||
|
||||||||||
try: | ||||||||||
if self._loop.is_closed(): | ||||||||||
return | ||||||||||
return waiters | ||||||||||
|
||||||||||
# cancel cleanup task | ||||||||||
if self._cleanup_handle: | ||||||||||
|
@@ -412,14 +423,19 @@ def _close(self) -> None: | |||||||||
for data in self._conns.values(): | ||||||||||
for proto, t0 in data: | ||||||||||
proto.close() | ||||||||||
waiters.append(proto.closed) | ||||||||||
|
||||||||||
for proto in self._acquired: | ||||||||||
proto.close() | ||||||||||
waiters.append(proto.closed) | ||||||||||
|
||||||||||
# TODO (A.Yushovskiy, 24-May-2019) collect transp. closing futures | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. incomplete task ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. work with transports here, especially Lines 345 to 348 in dcce5ea
Also, transports in Good news is that most likely these calls are redundant since we close all protocols (i.e., save them to the list of futures: |
||||||||||
for transport in self._cleanup_closed_transports: | ||||||||||
if transport is not None: | ||||||||||
transport.abort() | ||||||||||
|
||||||||||
return waiters | ||||||||||
|
||||||||||
finally: | ||||||||||
self._conns.clear() | ||||||||||
self._acquired.clear() | ||||||||||
|
@@ -510,7 +526,8 @@ async def connect(self, req: 'ClientRequest', | |||||||||
|
||||||||||
proto = self._get(key) | ||||||||||
if proto is None: | ||||||||||
placeholder = cast(ResponseHandler, _TransportPlaceholder()) | ||||||||||
placeholder = cast(ResponseHandler, | ||||||||||
_TransportPlaceholder(self._loop)) | ||||||||||
self._acquired.add(placeholder) | ||||||||||
self._acquired_per_host[key].add(placeholder) | ||||||||||
|
||||||||||
|
@@ -741,12 +758,10 @@ def __init__(self, *, | |||||||||
self._family = family | ||||||||||
self._local_addr = local_addr | ||||||||||
|
||||||||||
def close(self) -> Awaitable[None]: | ||||||||||
"""Close all ongoing DNS calls.""" | ||||||||||
def _close_immediately(self) -> List['asyncio.Future[None]']: | ||||||||||
for ev in self._throttle_dns_events.values(): | ||||||||||
ev.cancel() | ||||||||||
|
||||||||||
return super().close() | ||||||||||
return super()._close_immediately() | ||||||||||
|
||||||||||
@property | ||||||||||
def family(self) -> int: | ||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please delete
helpers.noop2
function as well.