Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix response clean up #704

Closed

Conversation

karpetrosyan
Copy link
Member

closes #642 and #658

@notonamap
Copy link

Stumbled across this too today. Would be great to get this fixed!

@karpetrosyan karpetrosyan requested a review from a team June 5, 2023 13:16
@karpetrosyan
Copy link
Member Author

Our problem

httpcore/_async/http11.py

async def _response_closed(self) -> None:
    async with self._state_lock:
        if (
            self._h11_state.our_state is h11.DONE
            and self._h11_state.their_state is h11.DONE
        ):
            self._state = HTTPConnectionState.IDLE
            self._h11_state.start_next_cycle()
            if self._keepalive_expiry is not None:
                now = time.monotonic()
                self._expire_at = now + self._keepalive_expiry
        else:
            await self.aclose()

Let's cancel this coroutine in the middle of its execution.

Oops, now we have a problem: our connection is in the ACTIVE state when it should be in the IDLE or CLOSED state, and our file descriptor was not closed because Connection.aclose was not called.

But wait, we were smart enough and put finally block for those cases, now the finally block should be executed in ConnectionPool.aclose

async def aclose(self) -> None:
    try:
        if hasattr(self._stream, "aclose"):
            await self._stream.aclose()
    finally:
        await self._pool.response_closed(self._status)

Unfortunately, our response_closed function will not work as expected because our nursery object has now closed all tasks.

Simple example

import trio

async def checkpoint():
    await trio.sleep(0)

async def clean_up():
    await checkpoint()  # in our case: async with self._pool_lock:
    print('done')

async def main():
    with trio.move_on_after(1):
        try:
            await trio.sleep(1.1)
        finally:
            await clean_up()
trio.run(main)

In this case, the word 'done' will never be printed.
We can fix it by wrapping "await clean_up" in "CancelScope(shield=True)" to prevent cancellation from top-level scopes.

This code will function properly.

async def main():
    with trio.move_on_after(1):
        try:
            await trio.sleep(1.1)
        finally:
            with trio.CancelScope(shield=True):
                await clean_up()

The same solution was used to solve our issue; I simply wrapped the clean up function in "CancelScope(shielf=True)"

@karpetrosyan
Copy link
Member Author

What should we pay attention to?

If the clean up function, which we call in the cancel isolated environment, hangs, that's bad; nursery also hangs forever, so we need to avoid that.

How?

  • timeout: We can use timeout to ensure that our clean up function does not run forever.
  • do nothing: We are confident that our cleaning function is functioning properly, so please do nothing.

I chose the second option in this solution because we know our clean up function will work as expected.

What are the guarantees?

Let's take a look at our cleanup function, which consists of three lines.

if cancelled and hasattr(self._stream, "_connection"):  # pragma: no cover
    await self._stream._connection.aclose()

await self._pool.response_closed(self._status)

Because we know that synchronous "if" statements do not block, let's look at await self._stream._connection.aclose() and await self._pool.response_closed(self._status)

What await self._stream._connection.aclose() does?

    async def aclose(self) -> None:
        # Note that this method unilaterally closes the connection, and does
        # not have any kind of locking in place around it.
        self._state = HTTPConnectionState.CLOSED
        await self._network_stream.aclose()

So it simply sets the connection state to CLOSED and attempts to close the network stream.

According to https://trio.readthedocs.io/en/stable/reference-io.html?highlight=AsyncResource#trio.abc.AsyncResource, we can be certain that aclose() will not block execution indefinitely.

What await self._pool.response_closed(self._status) does?
This function is also safe because it is awaiting a lock as well as "stream.aclose()," which cannot hang, so it is safe to wait for it.

@karpetrosyan
Copy link
Member Author

This PR also resolves the issue described in #658 (comment).

@tomchristie
Copy link
Member

I'm starting to understand this more clearly, thanks. Let's get this worked through. 💪
Okay, we want to shield certain parts of our cleanup from cancellation.

Rather than adding API to the network backends interface, I think we want to treat this in the same kind of way that we handle async locking and semaphores.

Currently we have the following primitives available to us there...

  • AsyncLock
  • AsyncEvent
  • AsyncSemaphore

As well as the corresponding...

  • Lock
  • Event
  • Semaphore

It seems to me that we ought to mirror the with trio.CancelScope(shield=True): style, by adding (something like) AsyncShield and Shield primitives there. (Rather than API on the network backend)

Does that seem like a reasonable suggestion to you?

@karpetrosyan
Copy link
Member Author

Is it necessary to create a Shield class, or can we simply add another subsitution to unasync.py that simply removes AsyncShield?
For example, instead of replacing "@pytest.mark.anyio" with "@pytest.mark.sync", we simply remove it.

@karpetrosyan
Copy link
Member Author

I agree with you; I believe it is more intuitive to combine this with synchronizations.

@tomchristie
Copy link
Member

Is it necessary to create a Shield class, or can we simply add another subsitution to unasync.py that simply removes AsyncShield?

I'm not sure if we'd be able to do that easily because of the indentation.

A different way to approach this would be to...

  • Start by considering only _async/connection_pool.py.
  • Assume we already have a trio-like shielding API available.
  • Demonstrate what the changes in implementation around clean-up would look like in that case.

Once we've review that we think we have the correct set of changes in place based on that, then we can work out the implementation details of how to make that happen.

@karpetrosyan
Copy link
Member Author

We can simply encapsulate the cleanup logic in the special function "X" that should be marked with the decorator, and then write a regex to remove that decorator.

possible regex

(r"\s*AsyncShield", '')

@tomchristie
Copy link
Member

Are we able to resolve this with a decorator syntax, or do we need a context-manager syntax?

@karpetrosyan
Copy link
Member Author

I am offer to use decorator, because it's easy to remove with the regex, which we can not say about context managers.
As a result, the distinction between async and sync interfaces is reduced to a single line.

@tomchristie
Copy link
Member

Okay, so supposing we have a @shield_cancellation decorator available to us - what changes to _async/connection_pool.py would be required?

@tomchristie
Copy link
Member

(I'm not sure this is viable, since the function call would need to be async, which makes it a potential switchpoint?)

@karpetrosyan
Copy link
Member Author

(I'm not sure this is viable, since the function call would need to be async, which makes it a potential switchpoint?)

Is it enough to be a switchpoint? Can't we rely on that function?

@karpetrosyan
Copy link
Member Author

How?

  • timeout: We can use timeout to ensure that our clean up function does not run forever.
  • do nothing: We are confident that our cleaning function is functioning properly, so please do nothing.

I misunderstood how 'aclose' for 'trio.abc.AsyncResource' works; we should most likely use timeouts.

@tomchristie
Copy link
Member

Is it enough to be a switchpoint? Can't we rely on that function?

I don't think? we can take that approach, because...

https://trio.readthedocs.io/en/stable/reference-core.html#checkpoints

"Third-party async functions / iterators / context managers can act as checkpoints"

So if we were calling into a decorated @shield_cancellation method, using await my_method() then we might get cancelled before the shielding decorator takes effect.

My go-to with something like this would typically be "trio gets this just right. how would this look if we're using trio" and to work from there.

@karpetrosyan
Copy link
Member Author

I think I'm missing something; isn't this decorator safe?

    @staticmethod
    def shield_cancellation(
        fnc: typing.Callable[..., typing.Awaitable[_R]]
    ) -> typing.Callable[..., typing.Awaitable[_R]]:
        # Makes an async function that runs in a cancellation-isolated environment.

        @wraps(fnc)
        async def inner(*args: typing.Any, **kwargs: typing.Any) -> _R:
            with anyio.CancelScope(shield=True):
                return await fnc(*args, **kwargs)

        return inner

@@ -11,6 +12,8 @@
from .connection import AsyncHTTPConnection
from .interfaces import AsyncConnectionInterface, AsyncRequestInterface

GRACEFUL_CLOSE_TIMEOUT = 2
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of two, we should probably think about which timeout to use here.

@karpetrosyan
Copy link
Member Author

karpetrosyan commented Jun 7, 2023

I am offer to use decorator, because it's easy to remove with the regex, which we can not say about context managers. As a result, the distinction between async and sync interfaces is reduced to a single line.

Lets just avoid removing the decorator from the synchronous interface because it does not interfrare us.

@tomchristie
Copy link
Member

manager hat on

Okay, lovely stuff tho let's slow this down...

You've given a great example over here. I think that's the first time I've seen a nice simple "look, we're not supporting cancellation at the moment".

My suggestion would be...

  • The title of the PR as "Support async cancellations". That's more clear about exactly what issue we're resolving.
  • The description of the PR to include a nice little example like the one you've given for trio.
  • We start with a PR that just includes a failing test case, before we look at the resolution.

@karpetrosyan
Copy link
Member Author

We start with a PR that just includes a failing test case, before we look at the resolution.

Do we need to open another PR and close this one?

@tomchristie
Copy link
Member

tomchristie commented Jun 12, 2023

Up to you.

(🤷‍♂️ Might be a little cleaner that way???)

@karpetrosyan
Copy link
Member Author

Okay, I'll do it for clarity.

@karpetrosyan
Copy link
Member Author

Was re-opened in #719

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support async cancellations.
3 participants