Skip to content
This repository has been archived by the owner on May 13, 2019. It is now read-only.

Add a Connection.listen() helper. #1

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

miracle2k
Copy link
Contributor

I want to use NOTIFY/LISTEN in postgres, and async should be great for this, we should be able to get the events as a stream. This is my attempt at implementing this.

There is a bug here. The finally clause is supposed to send the UNLISTEN command, and I would expect this to work in curio from my tests, because given a generator such as this:

async def foo():
    try:
        while True:
            yield 1
            await curio.sleep(2)
    finally:
        print('before shutdown')
        await curio.sleep(1)
        print('after shutdown')

called such as this:

async with curio.timeout_after(5):
    agen = foo()
    async with curio.meta.finalize(agen) as agen:
        async for x in agen:
            print(x)

Then the message after shutdown will be printed (curio enforces the curio.meta.finalize context manager to be used.

However, on trio the message will not be printed (or in our case, the UNLISTEN will not be sent), because when the await within the finally clause goes back to the event loop, the task is still considered to be cancelled, and the Cancelled exception is raised again.

In trio, we have to do:

async def foo():
    try:
        while True:
            yield 1
            await trio.sleep(2)
    finally:
        with trio.open_cancel_scope() as scope:
            scope.shield = True
            await trio.sleep(1)

Now, I wanted to add this ability to multio, and here is the snatch. In curio, disable_cancellation is an async context manager, in trio open_cancel_scope is a sync one. We cannot convert the trio one to an async one, because at that point, async with would go back to the trio event loop before the shield will go into effect, thus defeating the whole purpose here.

My best suggestion would be: Given that curio does the right thing here, multio needs to support some kind of feature that is a noop on curio, and helps us run a async code in a finally branch. Maybe something like:

with multio.asynclib.safe_finally:

@miracle2k
Copy link
Contributor Author

miracle2k commented Sep 9, 2018

Actually, it turned out that I needed to do more: I needed to be able to listen/unlisten to certain channels dynamically, but on the same connection. I came up with this:

import trio
from psycopg2 import sql


class SubscriptionListener:

    def __init__(self, conn):
        self.conn = conn
        self.subscriptions = {}

    async def __aenter__(self):
        # Use a contextstack like aiostream?
        self.conn._connection.autocommit = True
        self.nursery_context = trio.open_nursery()
        self.nursery = await self.nursery_context.__aenter__()
        self.nursery.start_soon(self.proc)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # self.nursery.cancel_scope.cancel()
        await self.nursery_context.__aexit__(exc_type, exc_val, exc_tb)

    async def proc(self):
        while True:
            await wait_for_notify_message(self.conn)
            while self.conn._connection.notifies:
                notification = self.conn._connection.notifies.pop(0)
                queue = self.subscriptions[notification.channel]
                await queue.put(notification)

    async def listen(self, *channels):
        queries = sql.SQL('; ').join([
            sql.SQL("LISTEN {}").format(sql.Identifier(channel))
            for channel in channels
        ])
        async with await self.conn.cursor() as cursor:
            await cursor.execute(queries)

        queue = trio.Queue(0)
        for channel in channels:
            self.subscriptions[channel] = queue

        try:
            while True:
                event = await queue.get()
                yield event

        finally:
            with trio.open_cancel_scope() as scope:
                scope.shield = True

                queries = sql.SQL('; ').join([
                    sql.SQL("UNLISTEN {}").format(sql.Identifier(channel))
                    for channel in channels
                ])
                async with await self.conn.cursor() as cursor:
                    await cursor.execute(queries)
                for channel in channels:
                    del self.subscriptions[channel]


async def wait_for_notify_message(conn):
    while True:
        await trio.hazmat.wait_readable(conn._sock)

        await conn.poll()
        if conn._connection.notifies:
            return

I am using it like this:

async with trio.open_nursery() as nursery:
    async with pool.acquire() as listen_conn:
        listener = SubscriptionListener(listen_conn)
        async with listener:
            gen = listener.listen(channel_name)
                async with aclosing(gen):
                    async for event in gen:
                        pass

It is mostly the `wait_for_notify_message helper that accesses library internals.

I am not sure which part would make sense as part of riopg - maybe just wait_for_notify_message, or maybe the whole SubscriptionListener - but it took me quite a while to iron out the kinks, so I do believe that there is a benefit to exposing this functionality properly;

@Fuyukai
Copy link
Owner

Fuyukai commented Sep 10, 2018

No clue what happened with CircleCI here, closing/re-opening to see if it picks it up

@Fuyukai Fuyukai closed this Sep 10, 2018
@Fuyukai Fuyukai reopened this Sep 10, 2018
@Fuyukai
Copy link
Owner

Fuyukai commented Sep 10, 2018

Personally, I would simply just not make it an async generator. It's a bit more work, but making a fully fledged class that a .listen would return (and then used as async with conn.listen(...) as listen:\n async for item in listen) makes more sense and is probably easier than the finally method listed above.

Also, this needs a test added.

@miracle2k
Copy link
Contributor Author

If I understand you correctly, by using this syntax:

async with conn.listen(...) as listen:
    async for item in listen:
        pass

We would make conn.listen return a context manager, and we can clean up the subscriptions in it's __aexit__, thus possibly avoiding the trouble with the finally clause? We can also integrate the aclosing step for the inner generator. Do I understand correctly?

@Fuyukai
Copy link
Owner

Fuyukai commented Sep 10, 2018

Yes, that's the idea.

@codecov
Copy link

codecov bot commented Sep 29, 2018

Codecov Report

Merging #1 into master will decrease coverage by 6.23%.
The diff coverage is 35.71%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master       #1      +/-   ##
==========================================
- Coverage    95.9%   89.66%   -6.24%     
==========================================
  Files           6        6              
  Lines         244      271      +27     
==========================================
+ Hits          234      243       +9     
- Misses         10       28      +18
Impacted Files Coverage Δ
riopg/connection.py 70.78% <35.71%> (-16.32%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 040ae88...14a9e30. Read the comment docs.

@miracle2k
Copy link
Contributor Author

miracle2k commented Sep 29, 2018

So after the async=True change caused some trouble with this I looked it over, and I think the only way this can be made to work reliably is if we change the way that riopg works by having a permanent background process that is always reading.

  • As it is, riopg is only reading from the socket after some interaction (such as sending a query).
  • But with notifications, the program is not doing anything, but needs to wait on the socket for a notification to come in.
  • If a user's program only submits a LISTEN instruction to postgres, and then waits for notifications, this would be no problem. We can just have a listen() method read from the socket, and any other interaction at the same time would be forbidden.
  • But I want to be able to listen on the socket for notifications, while at the same time issue additional LISTEN and UNLISTEN queries. Now we are in trouble, because we are going to have multiple
    trio.hazmat.wait_readable calls at the same time (the one that waits for notifications, and the one within the poll logic.

Sidenote: Maybe I can do this outside of riopg:

  • Task A does wait_readable on the socket.
  • Task B waits for new channels we are supposed to LISTEN or UNLISTEN.
  • When task 2 gets an instruction to listen/unlisten, it cancels task A, executes the query, then restarts task A.

But that seems very messy?

https://github.com/aio-libs/aiopg can solve this more easily, because they just have an asyncio reader registered, so they get a callback whenever the socket is readable. The callback then fullfills a waiter future that might exist (any postgres async call waiting for a result). Point is, they are reading permanently from the socket, where as for us, any attempt to read permanently from the socket will interfere with attempts to execute queries, which also need to read when the state switches to POLL_READ.

(There is I remember a trio ticket on Github about letting multiple readers wait the same wait_readable, but I can't find that right now. Not sure if that was just a vague idea or something more concrete).

So, the "permanently reading" approach in trio requires a nursery, and thus would change the API of riopg - the connection context manager would become required rather than being optional, as it is now.

I updated this patch with a solution where I tried to have my cake and eat it too: If a background task is started which reads, then the wait_callback will defer to it whenever it encounters POLL_READ. Otherwise, it continues to behave as it currently does and calls wait_readable directly.

In addition there is separate code to expose the notifications via a get_notifications getter(). This works even if the background task is not started, but then, you will only receive notifications if they are being read for some other reason, maybe as part of a query.

A couple ways this approach could be evolved:

  • If the async context manager is used for the connection, then the background reading task is automatically started in a nursery.
  • Alternatively, we could say that the background task really only makes sense if the user does want live-notifications. We could then rename the keep_reading() method to listen(), and make it return the notifications directly (no separate get_notifications() helper) - this would be similar to the suggestion you made a couple of posts ago. We'd still need this listen() helper to take over the reading from the socket, though.

Curious what you think.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants