Skip to content

Commit

Permalink
Rewrite async waiting functions using Futures
Browse files Browse the repository at this point in the history
Not sure we correctly stop monitoring the fd when the first future gets
its result set. Or maybe the other one remains running and would cancel
its monitoring?

[skip ci]
  • Loading branch information
dlax committed Nov 15, 2021
1 parent ddf70a4 commit 4630e68
Showing 1 changed file with 43 additions and 54 deletions.
97 changes: 43 additions & 54 deletions psycopg/psycopg/waiting.py
Expand Up @@ -13,7 +13,7 @@
import selectors
from enum import IntEnum
from typing import Optional
from asyncio import get_event_loop, wait_for, Event, TimeoutError
from asyncio import get_event_loop, wait as asyncio_wait, FIRST_COMPLETED
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

from . import errors as e
Expand Down Expand Up @@ -109,38 +109,31 @@ async def wait_async(gen: PQGen[RV], fileno: int) -> RV:
Behave like in `wait()`, but exposing an `asyncio` interface.
"""
# Use an event to block and restart after the fd state changes.
# Not sure this is the best implementation but it's a start.
ev = Event()
loop = get_event_loop()
ready: Ready
s: Wait

def wakeup(state: Ready) -> None:
nonlocal ready
ready = state
ev.set()

try:
s = next(gen)
while 1:
ev.clear()
if s == Wait.R:
loop.add_reader(fileno, wakeup, Ready.R)
await ev.wait()
loop.remove_reader(fileno)
elif s == Wait.W:
loop.add_writer(fileno, wakeup, Ready.W)
await ev.wait()
loop.remove_writer(fileno)
elif s == Wait.RW:
loop.add_reader(fileno, wakeup, Ready.R)
loop.add_writer(fileno, wakeup, Ready.W)
await ev.wait()
loop.remove_reader(fileno)
loop.remove_writer(fileno)
else:
raise e.InternalError("bad poll status: %s")
fs = []
reader = s & Wait.R
writer = s & Wait.W
if reader:
f = loop.create_future()
loop.add_reader(fileno, f.set_result, Ready.R)
f.add_done_callback(lambda _: loop.remove_reader(fileno))
fs.append(f)
if writer:
f = loop.create_future()
loop.add_writer(fileno, f.set_result, Ready.W)
f.add_done_callback(lambda _: loop.remove_writer(fileno))
fs.append(f)
if not fs:
raise e.InternalError(f"bad poll status: {s}")
done = (await asyncio_wait(fs, return_when=FIRST_COMPLETED))[0]
# TODO: loop on 'done' to build a Ready.RW
ready = done.pop().result()
s = gen.send(ready)

except StopIteration as ex:
Expand All @@ -163,44 +156,40 @@ async def wait_conn_async(
Behave like in `wait()`, but take the fileno to wait from the generator
itself, which might change during processing.
"""
# Use an event to block and restart after the fd state changes.
# Not sure this is the best implementation but it's a start.
ev = Event()
loop = get_event_loop()
ready: Ready
s: Wait

def wakeup(state: Ready) -> None:
nonlocal ready
ready = state
ev.set()

timeout = timeout or None
try:
fileno, s = next(gen)
while 1:
ev.clear()
if s == Wait.R:
loop.add_reader(fileno, wakeup, Ready.R)
await wait_for(ev.wait(), timeout)
loop.remove_reader(fileno)
elif s == Wait.W:
loop.add_writer(fileno, wakeup, Ready.W)
await wait_for(ev.wait(), timeout)
loop.remove_writer(fileno)
elif s == Wait.RW:
loop.add_reader(fileno, wakeup, Ready.R)
loop.add_writer(fileno, wakeup, Ready.W)
await wait_for(ev.wait(), timeout)
loop.remove_reader(fileno)
loop.remove_writer(fileno)
else:
raise e.InternalError("bad poll status: %s")
fs = []
reader = s & Wait.R
writer = s & Wait.W
if reader:
f = loop.create_future()
loop.add_reader(fileno, f.set_result, Ready.R)
f.add_done_callback(lambda _: loop.remove_reader(fileno))
fs.append(f)
if writer:
f = loop.create_future()
loop.add_writer(fileno, f.set_result, Ready.W)
f.add_done_callback(lambda _: loop.remove_reader(fileno))
fs.append(f)
if not fs:
raise e.InternalError(f"bad poll status: {s}")
done = (
await asyncio_wait(
fs, timeout=timeout, return_when=FIRST_COMPLETED
)
)[0]
if not done:
raise e.OperationalError("timeout expired")
# TODO: loop on 'done' to build a Ready.RW
ready = done.pop().result()
fileno, s = gen.send(ready)

except TimeoutError:
raise e.OperationalError("timeout expired")

except StopIteration as ex:
rv: RV = ex.args[0] if ex.args else None
return rv
Expand Down

0 comments on commit 4630e68

Please sign in to comment.