Skip to content
/ psycopg Public
forked from psycopg/psycopg

Commit

Permalink
Handle RW-ready in wait(,_conn)_async()
Browse files Browse the repository at this point in the history
  • Loading branch information
dlax committed Nov 11, 2021
1 parent 443b54e commit e0bea08
Showing 1 changed file with 21 additions and 25 deletions.
46 changes: 21 additions & 25 deletions psycopg/psycopg/waiting.py
Expand Up @@ -119,29 +119,27 @@ async def wait_async(gen: PQGen[RV], fileno: int) -> RV:

def wakeup(state: Ready) -> None:
nonlocal ready
ready = state
ready |= state # type: ignore[assignment]
ev.set()

try:
s = next(gen)
while 1:
reader = s & Wait.R
writer = s & Wait.W
if not reader and not writer:
raise e.InternalError(f"bad poll status: {s}")
ev.clear()
if s == Wait.R:
loop.add_reader(fileno, wakeup, Ready.R)
await ev.wait()
loop.remove_reader(fileno)
elif s == Wait.W:
loop.add_writer(fileno, wakeup, Ready.W)
await ev.wait()
loop.remove_writer(fileno)
elif s == Wait.RW:
ready = 0 # type: ignore[assignment]
if reader:
loop.add_reader(fileno, wakeup, Ready.R)
if writer:
loop.add_writer(fileno, wakeup, Ready.W)
await ev.wait()
await ev.wait()
if reader:
loop.remove_reader(fileno)
if writer:
loop.remove_writer(fileno)
else:
raise e.InternalError("bad poll status: %s")
s = gen.send(ready)

except StopIteration as ex:
Expand Down Expand Up @@ -180,23 +178,21 @@ def wakeup(state: Ready) -> None:
try:
fileno, s = next(gen)
while 1:
reader = s & Wait.R
writer = s & Wait.W
if not reader and not writer:
raise e.InternalError(f"bad poll status: {s}")
ev.clear()
if s == Wait.R:
loop.add_reader(fileno, wakeup, Ready.R)
await wait_for(ev.wait(), timeout)
loop.remove_reader(fileno)
elif s == Wait.W:
loop.add_writer(fileno, wakeup, Ready.W)
await wait_for(ev.wait(), timeout)
loop.remove_writer(fileno)
elif s == Wait.RW:
ready = 0 # type: ignore[assignment]
if reader:
loop.add_reader(fileno, wakeup, Ready.R)
if writer:
loop.add_writer(fileno, wakeup, Ready.W)
await wait_for(ev.wait(), timeout)
await wait_for(ev.wait(), timeout)
if reader:
loop.remove_reader(fileno)
if writer:
loop.remove_writer(fileno)
else:
raise e.InternalError("bad poll status: %s")
fileno, s = gen.send(ready)

except TimeoutError:
Expand Down

0 comments on commit e0bea08

Please sign in to comment.