Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncElasticsearch client closes event loop when used with PyTest after only 2 queries #2051

Closed
dancaugherty opened this issue Aug 22, 2022 · 5 comments

Comments

@dancaugherty
Copy link

Elasticsearch version (bin/elasticsearch --version):
7.13.3, in its own container

elasticsearch-py version (elasticsearch.__versionstr__):
7.14.0

Please make sure the major version matches the Elasticsearch server you are running.

Description of the problem including expected versus actual behavior:

Under PyTest, the Async ElasticSearch client closes its event loop inexplicably.

Steps to reproduce:

  1. Create a client container (in this case, with uvicorn and FastAPI) with the following lines in requirements.txt:
aiofiles==0.7.0
aiohttp==3.7.4.post0
elasticsearch[async]===7.14.0
fastapi==0.79.0
pytest-asyncio==0.15.1
pytest==7.1.2
starlette-exporter==0.8.1
uvicorn[standard]===0.13.0
  1. Start ES in its own container, start client in separate container. Make sure client app can connect.
  2. Write a PyTest test that invokes a FastAPI route on the client that eventually queries the ES container.
  3. Attempt more than two calls to search() or count() within PyTest.

For fun, I extended the AsyncElasticsearch class with throwaway code to see what the internal transport loop looked like.
The code is:

from functools import wraps
from elasticsearch import AsyncElasticsearch

class NoisyAsync(AsyncElasticsearch):
    def __init__(
        self,
        *args,
        **kwargs,
    ):
        super().__init__(
            *args,
            **kwargs,
        )

    def reveal_loop(func):
        @wraps(func)
        async def wrapper(self, *args, **kwargs):
            print({f"{func.__name__}": {"args": args, "kwargs": kwargs}}, "\n")
            print(self.transport.loop.__dict__.items(), "\n")
            if self.transport.loop._closed:
                print(f"\nCLOSED ({func.__name__})\n")
            retval = await func(self, *args, **kwargs)
            return retval

        return wrapper

    @reveal_loop
    async def count(self, *args, **kwargs):
        return await super().count(*args, **kwargs)

    @reveal_loop
    async def search(self, *args, **kwargs):
        return await super().search(*args, **kwargs)

The resulting output from a typical PyTest run was disappointing (irrelevant details redacted):

{'count': {'args': (), 'kwargs': {'index': 'blah blah blah' }}}

dict_items([('_timer_cancelled_count', 2), ('_closed', False), ('_stopping', False), ('_ready', deque([])), ('_scheduled', [<TimerHandle cancelled when=20202>, <TimerHandle cancelled when=20250.282332225>, <TimerHandle when=20207 _weakref_handle((<weakref at 0...x7fa43e943a30>, '_cleanup')) at /usr/local/lib/python3.9/site-packages/aiohttp/helpers.py:595>]), ('_default_executor', <concurrent.futures.thread.ThreadPoolExecutor object at 0x7fa43e943160>), ('_internal_fds', 1), ('_thread_id', 140343566153472), ('_clock_resolution', 1e-09), ('_exception_handler', None), ('_debug', False), ('slow_callback_duration', 0.1), ('_current_handle', None), ('_task_factory', None), ('_coroutine_origin_tracking_enabled', False), ('_coroutine_origin_tracking_saved_depth', None), ('_asyncgens', {<weakref at 0x7fa43e621590; to 'async_generator' at 0x7fa43e65eca0 (contextmanager_in_threadpool)>, <weakref at 0x7fa43e621bd0; to 'async_generator' at 0x7fa45c93b550 (contextmanager_in_threadpool)>}), ('_asyncgens_shutdown_called', False), ('_executor_shutdown_called', False), ('_selector', <selectors.EpollSelector object at 0x7fa43e650310>), ('_ssock', <socket.socket fd=13, family=AddressFamily.AF_UNIX, type=SocketKind.SOCK_STREAM, proto=0>), ('_csock', <socket.socket fd=17, family=AddressFamily.AF_UNIX, type=SocketKind.SOCK_STREAM, proto=0>), ('_transports', <WeakValueDictionary at 0x7fa43e6507c0>), ('_signal_handlers', {})])

{'search': {'args': (), 'kwargs': {'index': 'blah blah blah' }}}

dict_items([('_timer_cancelled_count', 2), ('_closed', False), ('_stopping', False), ('_ready', deque([])), ('_scheduled', [<TimerHandle cancelled when=20202>, <TimerHandle cancelled when=20250.282332225>, <TimerHandle when=20207 _weakref_handle((<weakref at 0...x7fa43e943a30>, '_cleanup')) at /usr/local/lib/python3.9/site-packages/aiohttp/helpers.py:595>]), ('_default_executor', <concurrent.futures.thread.ThreadPoolExecutor object at 0x7fa43e943160>), ('_internal_fds', 1), ('_thread_id', 140343566153472), ('_clock_resolution', 1e-09), ('_exception_handler', None), ('_debug', False), ('slow_callback_duration', 0.1), ('_current_handle', None), ('_task_factory', None), ('_coroutine_origin_tracking_enabled', False), ('_coroutine_origin_tracking_saved_depth', None), ('_asyncgens', {<weakref at 0x7fa43e621590; to 'async_generator' at 0x7fa43e65eca0 (contextmanager_in_threadpool)>, <weakref at 0x7fa43e621bd0; to 'async_generator' at 0x7fa45c93b550 (contextmanager_in_threadpool)>}), ('_asyncgens_shutdown_called', False), ('_executor_shutdown_called', False), ('_selector', <selectors.EpollSelector object at 0x7fa43e650310>), ('_ssock', <socket.socket fd=13, family=AddressFamily.AF_UNIX, type=SocketKind.SOCK_STREAM, proto=0>), ('_csock', <socket.socket fd=17, family=AddressFamily.AF_UNIX, type=SocketKind.SOCK_STREAM, proto=0>), ('_transports', <WeakValueDictionary at 0x7fa43e6507c0>), ('_signal_handlers', {})])

{'count': {'args': (), 'kwargs': {'index': 'blah blah blah' }}}

dict_items([('_timer_cancelled_count', 1), ('_closed', True), ('_stopping', False), ('_ready', deque([])), ('_scheduled', []), ('_default_executor', None), ('_internal_fds', 0), ('_thread_id', None), ('_clock_resolution', 1e-09), ('_exception_handler', None), ('_debug', False), ('slow_callback_duration', 0.1), ('_current_handle', None), ('_task_factory', None), ('_coroutine_origin_tracking_enabled', False), ('_coroutine_origin_tracking_saved_depth', None), ('_asyncgens', set()), ('_asyncgens_shutdown_called', True), ('_executor_shutdown_called', True), ('_selector', None), ('_ssock', None), ('_csock', None), ('_transports', <WeakValueDictionary at 0x7fa43e6507c0>), ('_signal_handlers', {})])


CLOSED (count)

ConnectionError(Event loop is closed) caused by: RuntimeError(Event loop is closed)

It's not clear to me why the ES client would close the event loop after only two queries when running under PyTest, but works fine for much longer in other (functional) testing or local usage. Nor is it clear what, if anything, I can do about it from within PyTest.

Sorry if this is not the best of bug reports, but I'm stumped.

@sethmlarson
Copy link
Contributor

This happens because the asyncio event loop when using pytest-asyncio is created anew for each test execution and the client caches the first event loop it finds within each AIOHttpNode instance. To work-around this you'll need to create a new instance of AsyncElasticsearch for each test execution rather than using a single global instance. Hope this helps!

@djstrong
Copy link

djstrong commented Jan 3, 2024

Any other solution? I can't create new AsyncElasticsearch for every test.

@pquentin
Copy link
Member

pquentin commented Jan 3, 2024

Any other solution? I can't create new AsyncElasticsearch for every test.

pytest-asyncio 0.23.0 made it much easier to share an event loop for all tests in a class, module, package, or session. See the how-to guides: https://pytest-asyncio.readthedocs.io/en/v0.23.0/how-to-guides/index.html.

If that is somehow not an option, you could always use a function-scoped pytest fixture to instantiate a new AsyncElasticsearch for every test.

@djstrong
Copy link

djstrong commented Jan 3, 2024

Thank you. I have a web based API using FastAPI and AsyncElasticsearch is quite deep in some endpoints. I am doing system tests using FastAPI TestClient.
Unfortunately, sharing an event loop as mentioned in the link is not solving the problem.

@pquentin
Copy link
Member

pquentin commented Jan 3, 2024

Can you please open a new issue with minimal working code that shows your specific problem and what you want to achieve? It will help me help you. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants