Skip to content

Conversation

fantix
Copy link
Member

@fantix fantix commented May 25, 2020

A quick recap of uvloop core:

uvloop handles (4)

  1. Loop is the uvloop-version of an asyncio event loop, exposing APIs to create UVHandles like TCP transports.
  2. UVHandle is the base class of uvloop wrappers of the libuv uv_handle_t structs, see the full family below.
  3. Each UVHandle references one zero or more Handle instances that encapsulate the actual callback, its arguments, and a PEP-567 context.
  4. There's at least one cdef function per UVHandle that is registered to the libuv uv_handle_t struct. This function usually just triggers running the Handle.
  5. [UPDATE] Not all UVHandle uses Handle for callbacks, and we don't want to change that.
UVHandle                                       (handle)
    +- UVAsync                                 (async_)
    +- UVCheck                                 (check)
    +- UVIdle                                  (idle)
    +- UVPoll                                  (poll)
    +- UVProcess                               (process)
    |      +- UVProcessTransport               (process)
    +- UVSocketHandle                          (handle)
    |      +- UVBaseTransport                  (basetransport)
    |      |      +- UDPTransport              (udp)
    |      |      +- UVStream                  (stream)
    |      |             +- TCPTransport       (tcp)
    |      |             +- UnixTransport      (pipe)
    |      |             +- ReadUnixTransport  (pipe)
    |      |             +- WriteUnixTransport (pipe)
    |      +- UVStreamServer                   (streamserver)
    |             +- TCPServer                 (tcp)
    |             +- UnixServer                (pipe)
    +- UVTimer                                 (timer)

What is missing is e.g. the listen_handle of some UVHandle subclasses like the UVStreamServer. For now, it calls the actual callback (_on_listen) directly without PEP-567 context. So this PR should add those missing Handles.

The tricky part is, the current Handle supports only fixed parameters provided at initialization, what is needed for callbacks like _on_listen or __uv_stream_on_read is a partial-like Handle._run_with_param().

List of UVHandles to check:

  • UVAsync
  • UVCheck
  • UVIdle
  • UVPoll
  • UVProcess
  • UVProcessTransport
  • UDPTransport
  • UVStream
  • UVStreamServer
  • UVTimer

Principals

  1. Each UVHandle instance (including transports, servers, etc) sticks to one context where it was created from and/or started to take effect.
  2. All user code triggered by UVHandle (mostly protocol callbacks, but also protocol factory) runs in the same context.
  3. Affiliated UVHandle instances share the same context - for example, protocol callbacks are always called in the same context even though the transport is upgraded by start_tls() in a different context (even multiple times, a.k.a. SSL over SSL).
  4. By "the same context", it's not necessarily the same Context instance - it can be a copied instance. Therefore, changes to a ContextVar are not always carried between different protocol callbacks. But all callbacks could see the same inherited value from the context where the UVHandle was created or started.
  5. When in need, we lean towards using Context.copy() or copy_context() in user-triggered events, and reuse existing Context instances for uvloop-triggered events, so as to avoid re-entering the same context twice.
  6. The above copy_context() is only applied to direct method calls, callbacks through e.g. loop.call_soon() are not considered necessary to copy the context.
  7. Don't over-optimize. The context copy operation is actually fast (underlying context data is not copied because of the copy-on-write design), it is okay to copy when unnecessary if it means to avoid complication.

Example 1: Client Protocol

import asyncio
from contextvars import ContextVar

import uvloop

uvloop.install()
cvar = ContextVar("cvar", default="in initial context")


class Protocol(asyncio.Protocol):
    def __init__(self):
        self.pipe = asyncio.Queue()

    def connection_made(self, transport):
        self.pipe.put_nowait("connection_made() " + cvar.get())

    def data_received(self, data):
        self.pipe.put_nowait("data_received() " + cvar.get())

    def connection_lost(self, exc):
        self.pipe.put_nowait("connection_lost() " + cvar.get())


async def main():
    cvar.set("in context of create_connection()")
    trans, proto = await asyncio.get_event_loop().create_connection(
        Protocol, "google.com", 80
    )
    print(await proto.pipe.get())

    cvar.set("in context of write()")
    trans.write(b"GET / HTTP.1.1\r\nHost: google.com\r\n\r\n")
    print(await proto.pipe.get())

    cvar.set("in context of close()")
    trans.close()
    print(await proto.pipe.get())


asyncio.run(main())

Expected result:

connection_made() in context of create_connection()
data_received() in context of create_connection()
connection_lost() in context of create_connection()

Example 2: Server Protocol

import asyncio
import socket
from contextvars import ContextVar

import uvloop

uvloop.install()
cvar = ContextVar("cvar", default="in initial context")


class Protocol(asyncio.Protocol):
    def __init__(self, pipe):
        self.pipe = pipe
        self.pipe.put_nowait("Protocol() " + cvar.get())

    def connection_made(self, transport):
        self.pipe.put_nowait("connection_made() " + cvar.get())

    def data_received(self, data):
        self.pipe.put_nowait("data_received() " + cvar.get())

    def connection_lost(self, exc):
        self.pipe.put_nowait("connection_lost() " + cvar.get())


async def main():
    loop = asyncio.get_event_loop()
    run = lambda *args: loop.run_in_executor(None, *args)
    pipe = asyncio.Queue()

    cvar.set("in context of create_server()")
    server = await loop.create_server(
        lambda: Protocol(pipe), "127.0.0.1", 0
    )

    s = socket.socket()
    await run(s.connect, server.sockets[0].getsockname())
    print(await pipe.get())
    print(await pipe.get())

    await run(s.send, b'data')
    print(await pipe.get())

    await run(s.close)
    print(await pipe.get())


asyncio.run(main())

Expected result:

Protocol() in context of create_server()
connection_made() in context of create_server()
data_received() in context of create_server()
connection_lost() in context of create_server()

Example 3: start_serving

Similar to example 2, but start serving in a different context:

async def main():
    ...

    cvar.set("in context of create_server()")
    server = await loop.create_server(
        lambda: Protocol(pipe), "127.0.0.1", 0, start_serving=False,
    )
    
    cvar.set("in context of start_serving()")
    await server.start_serving()

    ...

Expected result:

Protocol() in context of start_serving()
connection_made() in context of start_serving()
data_received() in context of start_serving()
connection_lost() in context of start_serving()

Example 4: TLS Upgrade

import asyncio
import socket
import ssl
from contextvars import ContextVar

import uvloop

uvloop.install()
cvar = ContextVar("cvar", default="in initial context")


class Protocol(asyncio.Protocol):
    def __init__(self, pipe):
        self.pipe = pipe
        self.pipe.put_nowait("Protocol() " + cvar.get())

    def connection_made(self, transport):
        self.pipe.put_nowait((transport, self))
        self.pipe.put_nowait("connection_made() " + cvar.get())

    def data_received(self, data):
        self.pipe.put_nowait("data_received() " + cvar.get())

    def connection_lost(self, exc):
        self.pipe.put_nowait("connection_lost() " + cvar.get())


async def main():
    loop = asyncio.get_event_loop()
    run = lambda *args: loop.run_in_executor(None, *args)
    pipe = asyncio.Queue()

    cvar.set("in context of create_server()")
    server = await loop.create_server(
        lambda: Protocol(pipe),
        "127.0.0.1",
        0,
        start_serving=False,
    )

    cvar.set("in context of start_serving()")
    await server.start_serving()

    s = socket.socket()
    await run(s.connect, server.sockets[0].getsockname())
    print(await pipe.get())
    trans, proto = await pipe.get()
    print(await pipe.get())

    await run(s.send, b"data")
    print(await pipe.get())

    sslctx = ssl.SSLContext()
    sslctx.load_cert_chain("tests/certs/ssl_cert.pem", "tests/certs/ssl_key.pem")
    client_sslctx = ssl.create_default_context()
    client_sslctx.check_hostname = False
    client_sslctx.verify_mode = ssl.CERT_NONE
    s = run(client_sslctx.wrap_socket, s)

    cvar.set("in context of start_tls()")
    await loop.start_tls(trans, proto, sslctx, server_side=True)
    s = await s
    await run(s.send, b"data")
    print(await pipe.get())

    await run(s.close)
    print(await pipe.get())


asyncio.run(main())

Expected result:

Protocol() in context of start_serving()
connection_made() in context of start_serving()
data_received() in context of start_serving()
data_received() in context of start_serving()
connection_lost() in context of start_serving()

@1st1
Copy link
Member

1st1 commented May 25, 2020

My memory here is a little rusty, but can we instead change

uvloop/uvloop/cbhandles.pyx

Lines 338 to 421 in 465717f

cdef new_Handle(Loop loop, object callback, object args, object context):
cdef Handle handle
handle = Handle.__new__(Handle)
handle._set_loop(loop)
handle._set_context(context)
handle.cb_type = 1
handle.arg1 = callback
handle.arg2 = args
return handle
cdef new_MethodHandle(Loop loop, str name, method_t callback, object ctx):
cdef Handle handle
handle = Handle.__new__(Handle)
handle._set_loop(loop)
handle._set_context(None)
handle.cb_type = 2
handle.meth_name = name
handle.callback = <void*> callback
handle.arg1 = ctx
return handle
cdef new_MethodHandle1(Loop loop, str name, method1_t callback,
object ctx, object arg):
cdef Handle handle
handle = Handle.__new__(Handle)
handle._set_loop(loop)
handle._set_context(None)
handle.cb_type = 3
handle.meth_name = name
handle.callback = <void*> callback
handle.arg1 = ctx
handle.arg2 = arg
return handle
cdef new_MethodHandle2(Loop loop, str name, method2_t callback, object ctx,
object arg1, object arg2):
cdef Handle handle
handle = Handle.__new__(Handle)
handle._set_loop(loop)
handle._set_context(None)
handle.cb_type = 4
handle.meth_name = name
handle.callback = <void*> callback
handle.arg1 = ctx
handle.arg2 = arg1
handle.arg3 = arg2
return handle
cdef new_MethodHandle3(Loop loop, str name, method3_t callback, object ctx,
object arg1, object arg2, object arg3):
cdef Handle handle
handle = Handle.__new__(Handle)
handle._set_loop(loop)
handle._set_context(None)
handle.cb_type = 5
handle.meth_name = name
handle.callback = <void*> callback
handle.arg1 = ctx
handle.arg2 = arg1
handle.arg3 = arg2
handle.arg4 = arg3
return handle
to automatically capture the current context?

@fantix
Copy link
Member Author

fantix commented May 26, 2020

Maybe yes - they're intended to bridge the native callbacks anyways, right? Submitted a commit to try this approach. I had to hack in the arg2 value before calling _run().

@1st1
Copy link
Member

1st1 commented May 26, 2020

Maybe yes - they're intended to bridge the native callbacks anyways, right? Submitted a commit to try this approach. I had to hack in the arg2 value before calling _run().

I think we need parallel APIs like new_MethodHandleNWithContext.

@1st1
Copy link
Member

1st1 commented May 26, 2020

Or add a trailing argument to all new_MethodHandleN methods which can be either a context, or a boolean with True/False, with True meaning "copy the current context", and False meaning run in empty context. As I said my knowledge of the code base is a bit rusty, but I believe that this is a way better approach than to handle context in UVHandles.

@fantix
Copy link
Member Author

fantix commented May 26, 2020

Got it, that makes sense - I'll then try to propose something more complete.

@fantix
Copy link
Member Author

fantix commented Dec 13, 2020

/me is trying to fix this one now.

@1st1
Copy link
Member

1st1 commented Dec 15, 2020

/me is trying to fix this one now.

Nice, what's the latest? ;)

| +- UnixServer (pipe)
+- UVTimer (timer)
"""

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is awesome, thanks for adding it

@fantix fantix force-pushed the uvhandle-context branch 4 times, most recently from 70b6ae4 to 81d3900 Compare January 26, 2021 19:54
@fantix fantix changed the title [WIP] Uvhandle context [WIP] UVHandle context Jan 27, 2021
@fantix fantix force-pushed the uvhandle-context branch 7 times, most recently from 6f41c33 to e9b4056 Compare February 3, 2021 16:08
Copy link
Member

@1st1 1st1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good so far!

@fantix fantix changed the title [WIP] UVHandle context Fix context in protocol callbacks Feb 5, 2021
@fantix fantix marked this pull request as ready for review February 5, 2021 19:10
Copy link
Member

@1st1 1st1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

versusvoid and others added 2 commits February 5, 2021 15:14
This is a combined fix to correct contexts from which protocal callbacks
are invoked. In short, callbacks like data_received() should always be
invoked from consistent contexts which are copied from the context where
the underlying UVHandle is created or started.

The new test case covers also asyncio, but skipping the failing ones.
@fantix fantix merged commit f691212 into MagicStack:master Feb 5, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants