diff --git a/python/cocoindex/cli.py b/python/cocoindex/cli.py index 9c960c1d..6f4e4960 100644 --- a/python/cocoindex/cli.py +++ b/python/cocoindex/cli.py @@ -4,6 +4,7 @@ from . import flow, lib from .setup import sync_setup, drop_setup, flow_names_with_setup, apply_setup_changes +from .runtime import execution_context @click.group() def cli(): @@ -113,11 +114,13 @@ def update(flow_name: str | None, live: bool, quiet: bool): Update the index to reflect the latest data from data sources. """ options = flow.FlowLiveUpdaterOptions(live_mode=live, print_stats=not quiet) - if flow_name is None: - asyncio.run(flow.update_all_flows(options)) - else: - updater = flow.FlowLiveUpdater(_flow_by_name(flow_name), options) - asyncio.run(updater.wait()) + async def _update(): + if flow_name is None: + await flow.update_all_flows(options) + else: + updater = await flow.FlowLiveUpdater.create(_flow_by_name(flow_name), options) + await updater.wait() + execution_context.run(_update()) @cli.command() @click.argument("flow_name", type=str, required=False) @@ -167,7 +170,7 @@ def server(address: str, live_update: bool, quiet: bool, cors_origin: str | None lib.start_server(lib.ServerSettings(address=address, cors_origin=cors_origin)) if live_update: options = flow.FlowLiveUpdaterOptions(live_mode=True, print_stats=not quiet) - asyncio.run(flow.update_all_flows(options)) + execution_context.run(flow.update_all_flows(options)) input("Press Enter to stop...") diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index 71e824c0..292d61f6 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -369,9 +369,22 @@ class FlowLiveUpdater: """ _engine_live_updater: _engine.FlowLiveUpdater - def __init__(self, fl: Flow, options: FlowLiveUpdaterOptions | None = None): - self._engine_live_updater = _engine.FlowLiveUpdater( - fl._lazy_engine_flow(), dump_engine_object(options or FlowLiveUpdaterOptions())) + def __init__(self, arg: Flow | _engine.FlowLiveUpdater, options: FlowLiveUpdaterOptions | None = None): + if isinstance(arg, _engine.FlowLiveUpdater): + self._engine_live_updater = arg + else: + self._engine_live_updater = execution_context.run(_engine.FlowLiveUpdater( + arg.internal_flow(), dump_engine_object(options or FlowLiveUpdaterOptions()))) + + @staticmethod + async def create(fl: Flow, options: FlowLiveUpdaterOptions | None = None) -> FlowLiveUpdater: + """ + Create a live updater for a flow. + """ + engine_live_updater = await _engine.FlowLiveUpdater.create( + await fl.ainternal_flow(), + dump_engine_object(options or FlowLiveUpdaterOptions())) + return FlowLiveUpdater(engine_live_updater) def __enter__(self) -> FlowLiveUpdater: return self @@ -450,7 +463,7 @@ async def update(self) -> _engine.IndexUpdateInfo: Update the index defined by the flow. Once the function returns, the indice is fresh up to the moment when the function is called. """ - updater = FlowLiveUpdater(self, FlowLiveUpdaterOptions(live_mode=False)) + updater = await FlowLiveUpdater.create(self, FlowLiveUpdaterOptions(live_mode=False)) await updater.wait() return updater.update_stats() @@ -466,6 +479,12 @@ def internal_flow(self) -> _engine.Flow: """ return self._lazy_engine_flow() + async def ainternal_flow(self) -> _engine.Flow: + """ + Get the engine flow. The async version. + """ + return await asyncio.to_thread(self.internal_flow) + def _create_lazy_flow(name: str | None, fl_def: Callable[[FlowBuilder, DataScope], None]) -> Flow: """ Create a flow without really building it yet. @@ -523,17 +542,23 @@ def ensure_all_flows_built() -> None: """ Ensure all flows are built. """ - with _flows_lock: - for fl in _flows.values(): - fl.internal_flow() + for fl in flows(): + fl.internal_flow() + +async def aensure_all_flows_built() -> None: + """ + Ensure all flows are built. + """ + for fl in flows(): + await fl.ainternal_flow() async def update_all_flows(options: FlowLiveUpdaterOptions) -> dict[str, _engine.IndexUpdateInfo]: """ Update all flows. """ - ensure_all_flows_built() + await aensure_all_flows_built() async def _update_flow(fl: Flow) -> _engine.IndexUpdateInfo: - updater = FlowLiveUpdater(fl, options) + updater = await FlowLiveUpdater.create(fl, options) await updater.wait() return updater.update_stats() fls = flows() diff --git a/python/cocoindex/lib.py b/python/cocoindex/lib.py index 391a4708..56fcd118 100644 --- a/python/cocoindex/lib.py +++ b/python/cocoindex/lib.py @@ -101,7 +101,8 @@ async def _inner(*args, **kwargs): try: if _should_run_cli(): # Schedule to a separate thread as it invokes nested event loop. - return await asyncio.to_thread(_run_cli) + # return await asyncio.to_thread(_run_cli) + return _run_cli() return await fn(*args, **kwargs) finally: stop() diff --git a/src/py/mod.rs b/src/py/mod.rs index 42672ebf..35eda644 100644 --- a/src/py/mod.rs +++ b/src/py/mod.rs @@ -97,24 +97,21 @@ pub struct FlowLiveUpdater(pub Arc, + #[staticmethod] + pub fn create<'py>( + py: Python<'py>, flow: &Flow, options: Pythonized, - ) -> PyResult { - py.allow_threads(|| { - let live_updater = get_runtime() - .block_on(async { - let live_updater = execution::FlowLiveUpdater::start( - flow.0.clone(), - &get_lib_context()?.pool, - options.into_inner(), - ) - .await?; - anyhow::Ok(live_updater) - }) - .into_py_result()?; + ) -> PyResult> { + let flow = flow.0.clone(); + future_into_py(py, async move { + let live_updater = execution::FlowLiveUpdater::start( + flow, + &get_lib_context().into_py_result()?.pool, + options.into_inner(), + ) + .await + .into_py_result()?; Ok(Self(Arc::new(tokio::sync::RwLock::new(live_updater)))) }) }