-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Description
elasticsearch-py
version: 7.14.1
when use AsyncConnection, connection pool is not working as excepted, we can see connections to port 9200 never has a limit.
watch 'netstat -anp | grep 9200 | grep ESTABLISHED | wc -l'
with the help of aiohttp' docs, I found the bug is at here.
elasticsearch-py/elasticsearch/_async/http_aiohttp.py
Lines 370 to 372 in 5c82166
connector=aiohttp.TCPConnector( | |
limit=self._limit, use_dns_cache=True, ssl=self._ssl_context | |
), |
should use limit_per_host
not the limit
, and also shoud mind the parameter enable_cleanup_closed
, when x-pack
is enabled, aiohttp has an issue about close the ssl connection.
https://docs.aiohttp.org/en/stable/client_reference.html?highlight=limit#baseconnector
limit - The total number for simultaneous connections. If limit is 0 the connector has no limit. The default limit size is 100.
limit_per_host - The limit for simultaneous connections to the same endpoint.
enable_cleanup_closed (bool) – some SSL servers do not properly complete SSL shutdown process, in that case asyncio leaks ssl connections. If this parameter is set to True, aiohttp additionally aborts underlining transport after 2 seconds. It is off by default.
POC:
class KeepAliveAIOHttpConnection(AIOHttpConnection):
async def _create_aiohttp_session(self):
"""Creates an aiohttp.ClientSession(). This is delayed until
the first call to perform_request() so that AsyncTransport has
a chance to set AIOHttpConnection.loop
"""
if self.loop is None:
self.loop = asyncio.get_running_loop()
self.session = aiohttp.ClientSession(
headers=self.headers,
skip_auto_headers=("accept", "accept-encoding"),
auto_decompress=True,
loop=self.loop,
cookie_jar=aiohttp.DummyCookieJar(),
response_class=ESClientResponse,
connector=aiohttp.TCPConnector(
limit_per_host=self._limit,
use_dns_cache=True,
ssl=self._ssl_context,
enable_cleanup_closed=True,
keepalive_timeout=150,
),
)
es = elasticsearch.AsyncElasticsearch(connection_class=KeepAliveAIOHttpConnection)
...