From dbadd212492d4e43c1eb8bfb3efe5f7ba761df1b Mon Sep 17 00:00:00 2001 From: jakelorocco Date: Fri, 17 Oct 2025 09:21:59 -0400 Subject: [PATCH 1/3] feat: ollama generate_from_raw uses existing event loop --- mellea/backends/ollama.py | 37 ++++++++++++++++++------------------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/mellea/backends/ollama.py b/mellea/backends/ollama.py index f58d6513..86c4509b 100644 --- a/mellea/backends/ollama.py +++ b/mellea/backends/ollama.py @@ -23,6 +23,7 @@ get_current_event_loop, send_to_queue, ) +from mellea.helpers.event_loop_helper import _run_async_in_thread from mellea.helpers.fancy_logger import FancyLogger from mellea.stdlib.base import ( CBlock, @@ -404,28 +405,26 @@ def _generate_from_raw( # See https://github.com/ollama/ollama/blob/main/docs/faq.md#how-does-ollama-handle-concurrent-requests. prompts = [self.formatter.print(action) for action in actions] - async def get_response(coroutines): + async def get_response(): + # Run async so that we can make use of Ollama's concurrency. + coroutines: list[Coroutine[Any, Any, ollama.GenerateResponse]] = [] + for prompt in prompts: + co = self._async_client.generate( + model=self._get_ollama_model_id(), + prompt=prompt, + raw=True, + think=model_opts.get(ModelOption.THINKING, None), + format=format.model_json_schema() if format is not None else None, + options=self._make_backend_specific_and_remove(model_opts), + ) + coroutines.append(co) + responses = await asyncio.gather(*coroutines, return_exceptions=True) return responses - async_client = ollama.AsyncClient(self._base_url) - # Run async so that we can make use of Ollama's concurrency. - coroutines = [] - for prompt in prompts: - co = async_client.generate( - model=self._get_ollama_model_id(), - prompt=prompt, - raw=True, - think=model_opts.get(ModelOption.THINKING, None), - format=format.model_json_schema() if format is not None else None, - options=self._make_backend_specific_and_remove(model_opts), - ) - coroutines.append(co) - - # Revisit this once we start using async elsewhere. Only one asyncio event - # loop can be running in a given thread. - responses: list[ollama.GenerateResponse | BaseException] = asyncio.run( - get_response(coroutines) + # Run in the same event_loop like other Mellea async code called from a sync function. + responses: list[ollama.GenerateResponse | BaseException] = _run_async_in_thread( + get_response() ) results = [] From f4dc004b3fd4f0e497ecba3bcdb62103f13bd0d5 Mon Sep 17 00:00:00 2001 From: jakelorocco Date: Mon, 20 Oct 2025 09:39:24 -0400 Subject: [PATCH 2/3] fix: add blocking prevention mech --- mellea/helpers/event_loop_helper.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/mellea/helpers/event_loop_helper.py b/mellea/helpers/event_loop_helper.py index 2c97085d..7ff66dc0 100644 --- a/mellea/helpers/event_loop_helper.py +++ b/mellea/helpers/event_loop_helper.py @@ -5,6 +5,8 @@ from collections.abc import Coroutine from typing import Any, TypeVar +from mellea.helpers.async_helpers import get_current_event_loop + R = TypeVar("R") @@ -52,6 +54,9 @@ async def finalize_tasks(): def __call__(self, co: Coroutine[Any, Any, R]) -> R: """Runs the coroutine in the event loop.""" + if self._event_loop == get_current_event_loop(): + # If this gets called from the same event loop, launch in a separate thread to prevent blocking. + return _EventLoopHandler()(co) return asyncio.run_coroutine_threadsafe(co, self._event_loop).result() From 013dd91d4d57d04fe616110903450436ef89dfe5 Mon Sep 17 00:00:00 2001 From: jakelorocco Date: Wed, 22 Oct 2025 09:14:26 -0400 Subject: [PATCH 3/3] fix: test issues with cache --- test/backends/test_ollama.py | 3 +-- test/backends/test_openai_ollama.py | 2 +- test/backends/test_watsonx.py | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/test/backends/test_ollama.py b/test/backends/test_ollama.py index 6731f2fc..4887c542 100644 --- a/test/backends/test_ollama.py +++ b/test/backends/test_ollama.py @@ -193,8 +193,7 @@ async def get_client_async(): fourth_client = asyncio.run(get_client_async()) assert fourth_client in backend._client_cache.cache.values() - assert second_client not in backend._client_cache.cache.values() - + assert len(backend._client_cache.cache.values()) == 2 if __name__ == "__main__": pytest.main([__file__]) diff --git a/test/backends/test_openai_ollama.py b/test/backends/test_openai_ollama.py index 02bf6d86..b2883e4e 100644 --- a/test/backends/test_openai_ollama.py +++ b/test/backends/test_openai_ollama.py @@ -206,7 +206,7 @@ async def get_client_async(): fourth_client = asyncio.run(get_client_async()) assert fourth_client in backend._client_cache.cache.values() - assert second_client not in backend._client_cache.cache.values() + assert len(backend._client_cache.cache.values()) == 2 if __name__ == "__main__": import pytest diff --git a/test/backends/test_watsonx.py b/test/backends/test_watsonx.py index 274374a5..a0c43fd1 100644 --- a/test/backends/test_watsonx.py +++ b/test/backends/test_watsonx.py @@ -167,7 +167,7 @@ async def get_client_async(): fourth_client = asyncio.run(get_client_async()) assert fourth_client in backend._client_cache.cache.values() - assert second_client not in backend._client_cache.cache.values() + assert len(backend._client_cache.cache.values()) == 2 if __name__ == "__main__": import pytest