From 5adf79f165fdccd83d3b090f57762421275131aa Mon Sep 17 00:00:00 2001 From: LJ Date: Sat, 19 Apr 2025 23:51:18 -0700 Subject: [PATCH] fix(py-async): minor robust enhancement for awaiting live updater --- examples/gdrive_text_embedding/main.py | 2 +- python/cocoindex/flow.py | 8 ++++---- python/cocoindex/runtime.py | 16 ++++++++++++---- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/examples/gdrive_text_embedding/main.py b/examples/gdrive_text_embedding/main.py index d86249a3..546e8538 100644 --- a/examples/gdrive_text_embedding/main.py +++ b/examples/gdrive_text_embedding/main.py @@ -1,5 +1,6 @@ from dotenv import load_dotenv +import asyncio import cocoindex import datetime import os @@ -73,5 +74,4 @@ async def _run(): if __name__ == "__main__": load_dotenv(override=True) - import asyncio asyncio.run(_run()) diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index 7e10e69f..71e824c0 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -19,7 +19,7 @@ from . import op from .convert import dump_engine_object from .typing import encode_enriched_type -from .runtime import op_execution_context +from .runtime import execution_context class _NameBuilder: _existing_names: set[str] @@ -378,7 +378,7 @@ def __enter__(self) -> FlowLiveUpdater: def __exit__(self, exc_type, exc_value, traceback): self.abort() - asyncio.run(self.wait()) + execution_context.run(self.wait()) async def __aenter__(self) -> FlowLiveUpdater: return self @@ -476,7 +476,7 @@ def _create_engine_flow() -> _engine.Flow: root_scope = DataScope( flow_builder_state, flow_builder_state.engine_flow_builder.root_scope()) fl_def(FlowBuilder(flow_builder_state), root_scope) - return flow_builder_state.engine_flow_builder.build_flow(op_execution_context.event_loop) + return flow_builder_state.engine_flow_builder.build_flow(execution_context.event_loop) return Flow(_create_engine_flow) @@ -572,7 +572,7 @@ def __init__( flow_builder_state.engine_flow_builder.set_direct_output( _data_slice_state(output).engine_data_slice) self._engine_flow = flow_builder_state.engine_flow_builder.build_transient_flow( - op_execution_context.event_loop) + execution_context.event_loop) def __str__(self): return str(self._engine_flow) diff --git a/python/cocoindex/runtime.py b/python/cocoindex/runtime.py index 48bf5f57..9f19f2d8 100644 --- a/python/cocoindex/runtime.py +++ b/python/cocoindex/runtime.py @@ -1,7 +1,12 @@ +""" +This module provides a standalone execution runtime for executing coroutines in a thread-safe +manner. +""" + import threading import asyncio - -class _OpExecutionContext: +from typing import Coroutine +class _ExecutionContext: _lock: threading.Lock _event_loop: asyncio.AbstractEventLoop | None = None @@ -14,8 +19,11 @@ def event_loop(self) -> asyncio.AbstractEventLoop: with self._lock: if self._event_loop is None: self._event_loop = asyncio.new_event_loop() - asyncio.set_event_loop(self._event_loop) threading.Thread(target=self._event_loop.run_forever, daemon=True).start() return self._event_loop -op_execution_context = _OpExecutionContext() + def run(self, coro: Coroutine): + """Run a coroutine in the event loop, blocking until it finishes. Return its result.""" + return asyncio.run_coroutine_threadsafe(coro, self.event_loop).result() + +execution_context = _ExecutionContext()