Skip to content

Commit

Permalink
[PR #8541/a561fa99 backport][3.10] Cleanup for #8495 (#8544)
Browse files Browse the repository at this point in the history
**This is a backport of PR #8541 as merged into master
(a561fa9).**

---------

Co-authored-by: Sam Bull <git@sambull.org>
  • Loading branch information
patchback[bot] and Dreamsorcerer committed Jul 29, 2024
1 parent df57b9f commit ed8de3a
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 36 deletions.
12 changes: 12 additions & 0 deletions CHANGES/8495.breaking.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
The shutdown logic in 3.9 waited on all tasks, which caused issues with some libraries.
In 3.10 we've changed this logic to only wait on request handlers. This means that it's
important for developers to correctly handle the lifecycle of background tasks using a
library such as ``aiojobs``. If an application is using ``handler_cancellation=True`` then
it is also a good idea to ensure that any :func:`asyncio.shield` calls are replaced with
:func:`aiojobs.aiohttp.shield`.

Please read the updated documentation on these points:
https://docs.aiohttp.org/en/stable/web_advanced.html#graceful-shutdown
https://docs.aiohttp.org/en/stable/web_advanced.html#web-handler-cancellation

-- by :user:`Dreamsorcerer`
66 changes: 36 additions & 30 deletions docs/web_advanced.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ socket closing on the peer side without reading the full server response.
except OSError:
# disconnected

.. _web-handler-cancellation:

Web handler cancellation
^^^^^^^^^^^^^^^^^^^^^^^^

Expand All @@ -68,38 +70,48 @@ needed to deal with them.

.. warning::

:term:`web-handler` execution could be canceled on every ``await``
if client drops connection without reading entire response's BODY.
:term:`web-handler` execution could be canceled on every ``await`` or
``async with`` if client drops connection without reading entire response's BODY.

Sometimes it is a desirable behavior: on processing ``GET`` request the
code might fetch data from a database or other web resource, the
fetching is potentially slow.

Canceling this fetch is a good idea: the peer dropped connection
Canceling this fetch is a good idea: the client dropped the connection
already, so there is no reason to waste time and resources (memory etc)
by getting data from a DB without any chance to send it back to peer.
by getting data from a DB without any chance to send it back to the client.

But sometimes the cancellation is bad: on ``POST`` request very often
it is needed to save data to a DB regardless of peer closing.
But sometimes the cancellation is bad: on ``POST`` requests very often
it is needed to save data to a DB regardless of connection closing.

Cancellation prevention could be implemented in several ways:

* Applying :func:`asyncio.shield` to a coroutine that saves data.
* Using aiojobs_ or another third party library.
* Applying :func:`aiojobs.aiohttp.shield` to a coroutine that saves data.
* Using aiojobs_ or another third party library to run a task in the background.

:func:`aiojobs.aiohttp.shield` can work well. The only disadvantage is you
need to split the web handler into two async functions: one for the handler
itself and another for protected code.

.. warning::

:func:`asyncio.shield` can work well. The only disadvantage is you
need to split web handler into exactly two async functions: one
for handler itself and other for protected code.
We don't recommend using :func:`asyncio.shield` for this because the shielded
task cannot be tracked by the application and therefore there is a risk that
the task will get cancelled during application shutdown. The function provided
by aiojobs_ operates in the same way except the inner task will be tracked
by the Scheduler and will get waited on during the cleanup phase.

For example the following snippet is not safe::

from aiojobs.aiohttp import shield

async def handler(request):
await asyncio.shield(write_to_redis(request))
await asyncio.shield(write_to_postgres(request))
await shield(request, write_to_redis(request))
await shield(request, write_to_postgres(request))
return web.Response(text="OK")

Cancellation might occur while saving data in REDIS, so
``write_to_postgres`` will not be called, potentially
Cancellation might occur while saving data in REDIS, so the
``write_to_postgres`` function will not be called, potentially
leaving your data in an inconsistent state.

Instead, you would need to write something like::
Expand All @@ -109,7 +121,7 @@ Instead, you would need to write something like::
await write_to_postgres(request)

async def handler(request):
await asyncio.shield(write_data(request))
await shield(request, write_data(request))
return web.Response(text="OK")

Alternatively, if you want to spawn a task without waiting for
Expand Down Expand Up @@ -160,7 +172,7 @@ restoring the default disconnection behavior only for specific handlers::
app.router.add_post("/", handler)

It prevents all of the ``handler`` async function from cancellation,
so ``write_to_db`` will be never interrupted.
so ``write_to_db`` will never be interrupted.

.. _aiojobs: http://aiojobs.readthedocs.io/en/latest/

Expand Down Expand Up @@ -936,30 +948,24 @@ always satisfactory.
When aiohttp is run with :func:`run_app`, it will attempt a graceful shutdown
by following these steps (if using a :ref:`runner <aiohttp-web-app-runners>`,
then calling :meth:`AppRunner.cleanup` will perform these steps, excluding
steps 4 and 7).
step 7).

1. Stop each site listening on sockets, so new connections will be rejected.
2. Close idle keep-alive connections (and set active ones to close upon completion).
3. Call the :attr:`Application.on_shutdown` signal. This should be used to shutdown
long-lived connections, such as websockets (see below).
4. Wait a short time for running tasks to complete. This allows any pending handlers
or background tasks to complete successfully. The timeout can be adjusted with
``shutdown_timeout`` in :func:`run_app`.
4. Wait a short time for running handlers to complete. This allows any pending handlers
to complete successfully. The timeout can be adjusted with ``shutdown_timeout``
in :func:`run_app`.
5. Close any remaining connections and cancel their handlers. It will wait on the
canceling handlers for a short time, again adjustable with ``shutdown_timeout``.
6. Call the :attr:`Application.on_cleanup` signal. This should be used to cleanup any
resources (such as DB connections). This includes completing the
:ref:`cleanup contexts<aiohttp-web-cleanup-ctx>`.
:ref:`cleanup contexts<aiohttp-web-cleanup-ctx>` which may be used to ensure
background tasks are completed successfully (see
:ref:`handler cancellation<web-handler-cancellation>` or aiojobs_ for examples).
7. Cancel any remaining tasks and wait on them to complete.

.. note::

When creating new tasks in a handler which _should_ be cancelled on server shutdown,
then it is important to keep track of those tasks and explicitly cancel them in a
:attr:`Application.on_shutdown` callback. As we can see from the above steps,
without this the server will wait on those new tasks to complete before it continues
with server shutdown.

Websocket shutdown
^^^^^^^^^^^^^^^^^^

Expand Down
85 changes: 79 additions & 6 deletions tests/test_run_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,23 @@ async def stop(self, request: web.Request) -> web.Response:
return web.Response()

def run_app(self, port: int, timeout: int, task, extra_test=None) -> asyncio.Task:
num_connections = -1

class DictRecordClear(dict):
def clear(self):
nonlocal num_connections
# During Server.shutdown() we want to know how many connections still
# remained before it got cleared. If the handler completed successfully
# the connection should've been removed already. If not, this may
# indicate a memory leak.
num_connections = len(self)
super().clear()

class ServerWithRecordClear(web.Server):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._connections = DictRecordClear()

async def test() -> None:
await asyncio.sleep(0.5)
async with ClientSession() as sess:
Expand Down Expand Up @@ -954,9 +971,10 @@ async def handler(request: web.Request) -> web.Response:
app.router.add_get("/", handler)
app.router.add_get("/stop", self.stop)

web.run_app(app, port=port, shutdown_timeout=timeout)
with mock.patch("aiohttp.web_app.Server", ServerWithRecordClear):
web.run_app(app, port=port, shutdown_timeout=timeout)
assert test_task.exception() is None
return t
return t, num_connections

def test_shutdown_wait_for_handler(
self, aiohttp_unused_port: Callable[[], int]
Expand All @@ -969,11 +987,12 @@ async def task():
await asyncio.sleep(2)
finished = True

t = self.run_app(port, 3, task)
t, connection_count = self.run_app(port, 3, task)

assert finished is True
assert t.done()
assert not t.cancelled()
assert connection_count == 0

def test_shutdown_timeout_handler(
self, aiohttp_unused_port: Callable[[], int]
Expand All @@ -986,11 +1005,12 @@ async def task():
await asyncio.sleep(2)
finished = True

t = self.run_app(port, 1, task)
t, connection_count = self.run_app(port, 1, task)

assert finished is False
assert t.done()
assert t.cancelled()
assert connection_count == 1

def test_shutdown_timeout_not_reached(
self, aiohttp_unused_port: Callable[[], int]
Expand All @@ -1004,10 +1024,11 @@ async def task():
finished = True

start_time = time.time()
t = self.run_app(port, 15, task)
t, connection_count = self.run_app(port, 15, task)

assert finished is True
assert t.done()
assert connection_count == 0
# Verify run_app has not waited for timeout.
assert time.time() - start_time < 10

Expand All @@ -1032,10 +1053,11 @@ async def test(sess: ClientSession) -> None:
pass
assert finished is False

t = self.run_app(port, 10, task, test)
t, connection_count = self.run_app(port, 10, task, test)

assert finished is True
assert t.done()
assert connection_count == 0

def test_shutdown_pending_handler_responds(
self, aiohttp_unused_port: Callable[[], int]
Expand Down Expand Up @@ -1168,3 +1190,54 @@ async def run_test(app: web.Application) -> None:
assert time.time() - start < 5
assert client_finished
assert server_finished

def test_shutdown_handler_cancellation_suppressed(
self, aiohttp_unused_port: Callable[[], int]
) -> None:
port = aiohttp_unused_port()
actions = []

async def test() -> None:
async def test_resp(sess):
t = ClientTimeout(total=0.4)
with pytest.raises(asyncio.TimeoutError):
async with sess.get(f"http://localhost:{port}/", timeout=t) as resp:
assert await resp.text() == "FOO"
actions.append("CANCELLED")

async with ClientSession() as sess:
t = asyncio.create_task(test_resp(sess))
await asyncio.sleep(0.5)
# Handler is in-progress while we trigger server shutdown.
actions.append("PRESTOP")
async with sess.get(f"http://localhost:{port}/stop"):
pass

actions.append("STOPPING")
# Handler should still complete and produce a response.
await t

async def run_test(app: web.Application) -> None:
nonlocal t
t = asyncio.create_task(test())
yield
await t

async def handler(request: web.Request) -> web.Response:
try:
await asyncio.sleep(5)
except asyncio.CancelledError:
actions.append("SUPPRESSED")
await asyncio.sleep(2)
actions.append("DONE")
return web.Response(text="FOO")

t = None
app = web.Application()
app.cleanup_ctx.append(run_test)
app.router.add_get("/", handler)
app.router.add_get("/stop", self.stop)

web.run_app(app, port=port, shutdown_timeout=2, handler_cancellation=True)
assert t.exception() is None
assert actions == ["CANCELLED", "SUPPRESSED", "PRESTOP", "STOPPING", "DONE"]

0 comments on commit ed8de3a

Please sign in to comment.