Skip to content

Commit

Permalink
Merge pull request #861 from KeepSafe/fix-conn-limit-leak
Browse files Browse the repository at this point in the history
Fix leak of connection slot during connection error
  • Loading branch information
fafhrd91 committed May 8, 2016
2 parents 8ca24a4 + 5ca2441 commit e07e5db
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 29 deletions.
63 changes: 36 additions & 27 deletions aiohttp/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,27 +297,33 @@ def connect(self, req):
# This connection will now count towards the limit.
waiters.append(fut)

yield from fut

transport, proto = self._get(key)
if transport is None:
try:
if self._conn_timeout:
transport, proto = yield from asyncio.wait_for(
self._create_connection(req),
self._conn_timeout, loop=self._loop)
else:
transport, proto = yield from self._create_connection(req)

except asyncio.TimeoutError as exc:
raise ClientTimeoutError(
'Connection timeout to host {0[0]}:{0[1]} ssl:{0[2]}'
.format(key)) from exc
except OSError as exc:
raise ClientOSError(
exc.errno,
'Cannot connect to host {0[0]}:{0[1]} ssl:{0[2]} [{1}]'
.format(key, exc.strerror)) from exc
try:
if limit is not None:
yield from fut

transport, proto = self._get(key)
if transport is None:
try:
if self._conn_timeout:
transport, proto = yield from asyncio.wait_for(
self._create_connection(req),
self._conn_timeout, loop=self._loop)
else:
transport, proto = \
yield from self._create_connection(req)

except asyncio.TimeoutError as exc:
raise ClientTimeoutError(
'Connection timeout to host {0[0]}:{0[1]} ssl:{0[2]}'
.format(key)) from exc
except OSError as exc:
raise ClientOSError(
exc.errno,
'Cannot connect to host {0[0]}:{0[1]} ssl:{0[2]} [{1}]'
.format(key, exc.strerror)) from exc
except:
self._release_waiter(key)
raise

self._acquired[key].add(transport)
conn = Connection(self, key, req, transport, proto, self._loop)
Expand All @@ -344,6 +350,14 @@ def _get(self, key):
del self._conns[key]
return None, None

def _release_waiter(self, key):
waiters = self._waiters[key]
while waiters:
waiter = waiters.pop(0)
if not waiter.done():
waiter.set_result(None)
break

def _release(self, key, req, transport, protocol, *, should_close=False):
if self._closed:
# acquired connection is already released on connector closing
Expand All @@ -358,12 +372,7 @@ def _release(self, key, req, transport, protocol, *, should_close=False):
pass
else:
if self._limit is not None and len(acquired) < self._limit:
waiters = self._waiters[key]
while waiters:
waiter = waiters.pop(0)
if not waiter.done():
waiter.set_result(None)
break
self._release_waiter(key)

resp = req.response

Expand Down
21 changes: 19 additions & 2 deletions tests/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,11 +560,28 @@ class Req:
# limit exhausted
yield from asyncio.wait_for(conn.connect(Req), 0.01,
loop=self.loop)

connection.close()

self.loop.run_until_complete(go())

def test_connect_with_limit_release_waiters(self):

def check_with_exc(err):
conn = aiohttp.BaseConnector(limit=1, loop=self.loop)
conn._create_connection = unittest.mock.Mock()
conn._create_connection.return_value = \
asyncio.Future(loop=self.loop)
conn._create_connection.return_value.set_exception(err)

with self.assertRaises(Exception):
req = unittest.mock.Mock()
self.loop.run_until_complete(conn.connect(req))
key = (req.host, req.port, req.ssl)
self.assertFalse(conn._waiters[key])

check_with_exc(OSError(1, 'permission error'))
check_with_exc(RuntimeError())
check_with_exc(asyncio.TimeoutError())

def test_connect_with_limit_concurrent(self):

@asyncio.coroutine
Expand Down

0 comments on commit e07e5db

Please sign in to comment.