Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ensure connection is released #415

Closed
wants to merge 11 commits into from
3 changes: 1 addition & 2 deletions aiopg/pool.py
Expand Up @@ -189,8 +189,7 @@ def _fill_free_pool(self, override_min):
conn = self._free[-1] conn = self._free[-1]
if conn.closed: if conn.closed:
self._free.pop() self._free.pop()
elif self._recycle > -1 \ elif -1 < self._recycle < self._loop.time() - conn.last_usage:
and self._loop.time() - conn.last_usage > self._recycle:
conn.close() conn.close()
self._free.pop() self._free.pop()
else: else:
Expand Down
31 changes: 17 additions & 14 deletions aiopg/utils.py
@@ -1,5 +1,6 @@
import asyncio import asyncio
import sys import sys
import psycopg2


PY_35 = sys.version_info >= (3, 5) PY_35 = sys.version_info >= (3, 5)
PY_352 = sys.version_info >= (3, 5, 2) PY_352 = sys.version_info >= (3, 5, 2)
Expand Down Expand Up @@ -142,24 +143,18 @@ def __aexit__(self, exc_type, exc, tb):




class _PoolAcquireContextManager(_ContextManager): class _PoolAcquireContextManager(_ContextManager):
__slots__ = ('_coro', '_conn', '_pool') __slots__ = ('_coro', '_obj', '_pool')


def __init__(self, coro, pool): def __init__(self, coro, pool):
self._coro = coro super().__init__(coro)
self._conn = None
self._pool = pool self._pool = pool


if PY_35: if PY_35:
@asyncio.coroutine
def __aenter__(self):
self._conn = yield from self._coro
return self._conn

@asyncio.coroutine @asyncio.coroutine
def __aexit__(self, exc_type, exc, tb): def __aexit__(self, exc_type, exc, tb):
yield from self._pool.release(self._conn) yield from self._pool.release(self._obj)
self._pool = None self._pool = None
self._conn = None self._obj = None




class _PoolConnectionContextManager: class _PoolConnectionContextManager:
Expand Down Expand Up @@ -238,11 +233,19 @@ def __enter__(self):
def __exit__(self, *args): def __exit__(self, *args):
try: try:
self._cur.close() self._cur.close()
self._pool.release(self._conn) except psycopg2.ProgrammingError:
# seen instances where the cursor fails to close:
# https://github.com/aio-libs/aiopg/issues/364
# We close it here so we don't return a bad connection to the pool
self._conn.close()
raise
finally: finally:
self._pool = None try:
self._conn = None self._pool.release(self._conn)
self._cur = None finally:
self._pool = None
self._conn = None
self._cur = None




if not PY_35: if not PY_35:
Expand Down
82 changes: 52 additions & 30 deletions tests/conftest.py
Expand Up @@ -12,6 +12,7 @@
import uuid import uuid
import warnings import warnings



from docker import APIClient from docker import APIClient


import aiopg import aiopg
Expand Down Expand Up @@ -127,41 +128,62 @@ def pytest_generate_tests(metafunc):
def pg_server(unused_port, docker, session_id, pg_tag, request): def pg_server(unused_port, docker, session_id, pg_tag, request):
if not request.config.option.no_pull: if not request.config.option.no_pull:
docker.pull('postgres:{}'.format(pg_tag)) docker.pull('postgres:{}'.format(pg_tag))
container = docker.create_container(
container_args = dict(
image='postgres:{}'.format(pg_tag), image='postgres:{}'.format(pg_tag),
name='aiopg-test-server-{}-{}'.format(pg_tag, session_id), name='aiopg-test-server-{}-{}'.format(pg_tag, session_id),
ports=[5432], ports=[5432],
detach=True, detach=True,
) )
docker.start(container=container['Id'])
inspection = docker.inspect_container(container['Id']) is_darwin = sys.platform == "darwin"
host = inspection['NetworkSettings']['IPAddress']
pg_params = dict(database='postgres', # bound IPs do not work on OSX
user='postgres', host = "127.0.0.1"
password='mysecretpassword', host_port = 5432
host=host,
port=5432) if is_darwin:
delay = 0.001 host_port = 55432
for i in range(100): container_args['host_config'] = docker.create_host_config(port_bindings={5432: (host, host_port)})
try:
conn = psycopg2.connect(**pg_params) container = docker.create_container(**container_args)
cur = conn.cursor()
cur.execute("CREATE EXTENSION hstore;") try:
cur.close() docker.start(container=container['Id'])
conn.close()
break # This does not work on OSX
except psycopg2.Error: if not is_darwin:
time.sleep(delay) inspection = docker.inspect_container(container['Id'])
delay *= 2 host = inspection['NetworkSettings']['IPAddress']
else:
pytest.fail("Cannot start postgres server") server_params = dict(database='postgres',
container['host'] = host user='postgres',
container['port'] = 5432 password='mysecretpassword',
container['pg_params'] = pg_params host=host,
yield container port=host_port)

delay = 0.001
docker.kill(container=container['Id']) for i in range(100):
docker.remove_container(container['Id']) try:
conn = psycopg2.connect(**server_params)
cur = conn.cursor()
cur.execute("CREATE EXTENSION hstore;")
cur.close()
conn.close()
break
except psycopg2.Error:
time.sleep(delay)
delay *= 2
else:
pytest.fail("Cannot start postgres server")

container['host'] = host
container['port'] = host_port
container['pg_params'] = server_params

yield container
finally:
docker.kill(container=container['Id'])
docker.remove_container(container['Id'])




@pytest.fixture @pytest.fixture
Expand Down
18 changes: 18 additions & 0 deletions tests/pep492/test_async_await.py
Expand Up @@ -53,6 +53,24 @@ async def test_cursor_create_with_context_manager(make_connection):
assert cursor.closed assert cursor.closed




@asyncio.coroutine
async def test_pool_context_manager_timeout(pg_params, loop: asyncio.BaseEventLoop):
async with aiopg.create_pool(loop=loop, **pg_params, minsize=1, maxsize=1) as pool:
cursor_ctx = await pool.cursor()
with cursor_ctx as cursor:
hung_task = cursor.execute('SELECT pg_sleep(1000);')
# start task
fut = loop.create_task(hung_task)

# sleep for a bit so it gets going
await asyncio.sleep(1)

print()

assert cursor.closed
assert pool.closed


@asyncio.coroutine @asyncio.coroutine
async def test_cursor_with_context_manager(make_connection): async def test_cursor_with_context_manager(make_connection):
conn = await make_connection() conn = await make_connection()
Expand Down