Skip to content

Commit

Permalink
Fix "Unable to detect disconnect when using NOTIFY/LISTEN", Closes #249
Browse files Browse the repository at this point in the history
  • Loading branch information
gjcarneiro committed Mar 30, 2021
1 parent 99bf4fa commit 9571a3c
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 4 deletions.
10 changes: 8 additions & 2 deletions aiopg/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@
import psycopg2.extras

from .cursor import Cursor
from .utils import _ContextManager, create_completed_future, get_running_loop
from .utils import (
ClosableQueue,
_ContextManager,
create_completed_future,
get_running_loop,
)

__all__ = ('connect',)

Expand Down Expand Up @@ -85,7 +90,7 @@ def __init__(
self._last_usage = self._loop.time()
self._writing = False
self._echo = echo
self._notifies = asyncio.Queue()
self._notifies = ClosableQueue()
self._weakref = weakref.ref(self)
self._loop.add_reader(self._fileno, self._ready, self._weakref)

Expand Down Expand Up @@ -128,6 +133,7 @@ def _ready(weak_self):
# chain exception otherwise
exc2.__cause__ = exc
exc = exc2
self.notifies.close(exc)
if waiter is not None and not waiter.done():
waiter.set_exception(exc)
else:
Expand Down
27 changes: 27 additions & 0 deletions aiopg/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,30 @@ def __exit__(self, *args):
self._pool = None
self._conn = None
self._cur = None


class ClosableQueue(asyncio.Queue):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__close_exception = None
self.__close_event = asyncio.Event()

def close(self, exception):
self.__close_exception = exception
self.__close_event.set()

async def get(self):
get = self._loop.create_task(super().get())
closed = self._loop.create_task(self.__close_event.wait())
_, pending = await asyncio.wait([get, closed],
return_when=asyncio.FIRST_COMPLETED)
for task in pending:
task.cancel()
if get.done():
return get.result()
assert closed.done()
ex = self.__close_exception
self.__close_exception = None
self.__close_event.clear()
raise ex
8 changes: 7 additions & 1 deletion docs/core.rst
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ Example::

.. attribute:: notifies

An :class:`asyncio.Queue` instance for received notifications.
An instance of an :class:`asyncio.Queue` subclass for received notifications.

.. seealso:: :ref:`aiopg-core-notifications`

Expand Down Expand Up @@ -983,6 +983,12 @@ Receiving part should establish listening on notification channel by
`LISTEN`_ call and wait notification events from
:attr:`Connection.notifies` queue.

.. note::

calling `await connection.notifies.get()` may raise a psycopg2 exception
if the underlying connection gets disconnected while you're waiting for
notifications.

There is usage example:

.. literalinclude:: ../examples/notify.py
Expand Down
8 changes: 7 additions & 1 deletion examples/notify.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio

import psycopg2

import aiopg

dsn = 'dbname=aiopg user=aiopg password=passwd host=127.0.0.1'
Expand All @@ -19,7 +21,11 @@ async def listen(conn):
async with conn.cursor() as cur:
await cur.execute("LISTEN channel")
while True:
msg = await conn.notifies.get()
try:
msg = await conn.notifies.get()
except psycopg2.Error as ex:
print("ERROR: ", ex)
return
if msg.payload == 'finish':
return
else:
Expand Down
35 changes: 35 additions & 0 deletions tests/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,3 +583,38 @@ async def test_connection_on_server_restart(connect, pg_server, docker):
delay *= 2
else:
pytest.fail("Cannot connect to the restarted server")


async def test_connection_notify_on_server_restart(connect, pg_server, docker,
loop):
conn = await connect()

async def read_notifies():
while True:
await conn.notifies.get()

reader = loop.create_task(read_notifies())
await asyncio.sleep(0.1)

docker.restart(container=pg_server['Id'])

try:
with pytest.raises(psycopg2.OperationalError):
await asyncio.wait_for(reader, 10)
finally:
conn.close()
reader.cancel()

# Wait for postgres to be up and running again before moving on
# so as the restart won't affect other tests
delay = 0.001
for i in range(100):
try:
conn = await connect()
conn.close()
break
except psycopg2.Error:
time.sleep(delay)
delay *= 2
else:
pytest.fail("Cannot connect to the restarted server")
31 changes: 31 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import asyncio

from aiopg.utils import ClosableQueue


async def test_closable_queue_noclose():
queue = ClosableQueue()
await queue.put(1)
v = await queue.get()
assert v == 1


async def test_closable_queue_close(loop):
queue = ClosableQueue()
v1 = None

async def read():
nonlocal v1
v1 = await queue.get()
await queue.get()

reader = loop.create_task(read())
await queue.put(1)
await asyncio.sleep(0.1)
assert v1 == 1

queue.close(RuntimeError("connection closed"))
try:
await reader
except RuntimeError as ex:
assert ex.args == ("connection closed",)

0 comments on commit 9571a3c

Please sign in to comment.