Skip to content

Commit

Permalink
Add a workaround for bpo-37658
Browse files Browse the repository at this point in the history
`asyncio.wait_for()` currently has a bug where it raises a
`CancelledError` even when the wrapped awaitable has completed.
The upstream fix is in python/cpython#21894.  This adds a workaround
until the aforementioned PR is merged, backported and released.

Co-authored-by: Adam Liddell <git@aliddell.com>
Fixes: MagicStack#467
Fixes: MagicStack#547
Related: MagicStack#468
Supersedes: MagicStack#548
  • Loading branch information
elprans and aaliddell committed Aug 27, 2020
1 parent c05d726 commit 2bac166
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 18 deletions.
16 changes: 16 additions & 0 deletions asyncpg/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,19 @@ async def wait_closed(stream):
# On Windows wait_closed() sometimes propagates
# ConnectionResetError which is totally unnecessary.
pass


# Workaround for https://bugs.python.org/issue37658
async def wait_for(fut, timeout):
if timeout is None:
return await fut

fut = asyncio.ensure_future(fut)

try:
return await asyncio.wait_for(fut, timeout)
except asyncio.CancelledError:
if fut.done():
return fut.result()
else:
raise
18 changes: 2 additions & 16 deletions asyncpg/connect_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,18 +636,13 @@ async def _connect_addr(

connector = asyncio.ensure_future(connector)
before = time.monotonic()
try:
tr, pr = await asyncio.wait_for(
connector, timeout=timeout)
except asyncio.CancelledError:
connector.add_done_callback(_close_leaked_connection)
raise
tr, pr = await compat.wait_for(connector, timeout=timeout)
timeout -= time.monotonic() - before

try:
if timeout <= 0:
raise asyncio.TimeoutError
await asyncio.wait_for(connected, timeout=timeout)
await compat.wait_for(connected, timeout=timeout)
except (Exception, asyncio.CancelledError):
tr.close()
raise
Expand Down Expand Up @@ -745,12 +740,3 @@ def _create_future(loop):
return asyncio.Future(loop=loop)
else:
return create_future()


def _close_leaked_connection(fut):
try:
tr, pr = fut.result()
if tr:
tr.close()
except asyncio.CancelledError:
pass # hide the exception
5 changes: 3 additions & 2 deletions asyncpg/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import time
import warnings

from . import compat
from . import connection
from . import connect_utils
from . import exceptions
Expand Down Expand Up @@ -198,7 +199,7 @@ async def release(self, timeout):
# If the connection is in cancellation state,
# wait for the cancellation
started = time.monotonic()
await asyncio.wait_for(
await compat.wait_for(
self._con._protocol._wait_for_cancellation(),
budget)
if budget is not None:
Expand Down Expand Up @@ -623,7 +624,7 @@ async def _acquire_impl():
if timeout is None:
return await _acquire_impl()
else:
return await asyncio.wait_for(
return await compat.wait_for(
_acquire_impl(), timeout=timeout)

async def release(self, connection, *, timeout=None):
Expand Down
20 changes: 20 additions & 0 deletions tests/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,26 @@ async def worker():
self.cluster.trust_local_connections()
self.cluster.reload()

async def test_pool_handles_task_cancel_in_acquire_with_timeout(self):
# See https://github.com/MagicStack/asyncpg/issues/547
pool = await self.create_pool(database='postgres',
min_size=1, max_size=1)

async def worker():
async with pool.acquire(timeout=100):
pass

# Schedule task
task = self.loop.create_task(worker())
# Yield to task, but cancel almost immediately
await asyncio.sleep(0.00000000001)
# Cancel the worker.
task.cancel()
# Wait to make sure the cleanup has completed.
await asyncio.sleep(0.4)
# Check that the connection has been returned to the pool.
self.assertEqual(pool._queue.qsize(), 1)

async def test_pool_handles_task_cancel_in_release(self):
# Use SlowResetConnectionPool to simulate
# the Task.cancel() and __aexit__ race.
Expand Down

0 comments on commit 2bac166

Please sign in to comment.