Skip to content

Commit

Permalink
Properly cleanup connections closed by remote
Browse files Browse the repository at this point in the history
When a connection is terminated by the remote peer, asyncpg must not forget
to perform all the necessary client-side cleanup procedures.

Fixes: #385.
  • Loading branch information
elprans committed Nov 9, 2018
1 parent b12bf6d commit 7ba5dd4
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 1 deletion.
10 changes: 9 additions & 1 deletion asyncpg/protocol/protocol.pyx
Expand Up @@ -111,7 +111,10 @@ cdef class BaseProtocol(CoreProtocol):
self.conref = weakref.ref(connection)

cdef get_connection(self):
return self.conref()
if self.conref is not None:
return self.conref()
else:
return None

def get_server_pid(self):
return self.backend_pid
Expand Down Expand Up @@ -867,6 +870,11 @@ cdef class BaseProtocol(CoreProtocol):
# terminated or due to another error;
# Throw an error in any awaiting waiter.
self.closing = True
# Cleanup the connection resources, including, possibly,
# releasing the pool holder.
con = self.get_connection()
if con is not None:
con._cleanup()
self._handle_waiter_on_connection_lost(exc)

cdef _write(self, buf):
Expand Down
25 changes: 25 additions & 0 deletions tests/test_pool.py
Expand Up @@ -881,6 +881,31 @@ async def test_pool_init_and_use_race(self):
await pool_task
await pool.close()

async def test_pool_remote_close(self):
pool = await self.create_pool(min_size=1, max_size=1)
backend_pid_fut = self.loop.create_future()

async def worker():
async with pool.acquire() as conn:
pool_backend_pid = await conn.fetchval(
'SELECT pg_backend_pid()')
backend_pid_fut.set_result(pool_backend_pid)
await asyncio.sleep(0.2, loop=self.loop)

task = self.loop.create_task(worker())
try:
conn = await self.connect()
backend_pid = await backend_pid_fut
await conn.execute('SELECT pg_terminate_backend($1)', backend_pid)
finally:
await conn.close()

await task

# Check that connection_lost has released the pool holder.
conn = await pool.acquire(timeout=0.1)
await pool.release(conn)


@unittest.skipIf(os.environ.get('PGHOST'), 'using remote cluster for testing')
class TestHotStandby(tb.ClusterTestCase):
Expand Down

0 comments on commit 7ba5dd4

Please sign in to comment.