From 0645b5dc83d0768709714caa33de26c612e82492 Mon Sep 17 00:00:00 2001 From: LJ Date: Wed, 2 Apr 2025 09:55:48 -0700 Subject: [PATCH] Change gdrive example to continously watch change in main. --- examples/gdrive_text_embedding/main.py | 30 ++++++++++++++------------ python/cocoindex/__init__.py | 2 +- python/cocoindex/flow.py | 13 ++++++++--- src/execution/live_updater.rs | 10 +++++++-- 4 files changed, 35 insertions(+), 20 deletions(-) diff --git a/examples/gdrive_text_embedding/main.py b/examples/gdrive_text_embedding/main.py index e16c6b6e..1dddac24 100644 --- a/examples/gdrive_text_embedding/main.py +++ b/examples/gdrive_text_embedding/main.py @@ -51,21 +51,23 @@ def gdrive_text_embedding_flow(flow_builder: cocoindex.FlowBuilder, data_scope: @cocoindex.main_fn() def _run(): - # Run queries in a loop to demonstrate the query capabilities. - while True: - try: - query = input("Enter search query (or Enter to quit): ") - if query == '': + # Use a `FlowLiveUpdater` to keep the flow data updated. + with cocoindex.FlowLiveUpdater(gdrive_text_embedding_flow): + # Run queries in a loop to demonstrate the query capabilities. + while True: + try: + query = input("Enter search query (or Enter to quit): ") + if query == '': + break + results, _ = query_handler.search(query, 10) + print("\nSearch results:") + for result in results: + print(f"[{result.score:.3f}] {result.data['filename']}") + print(f" {result.data['text']}") + print("---") + print() + except KeyboardInterrupt: break - results, _ = query_handler.search(query, 10) - print("\nSearch results:") - for result in results: - print(f"[{result.score:.3f}] {result.data['filename']}") - print(f" {result.data['text']}") - print("---") - print() - except KeyboardInterrupt: - break if __name__ == "__main__": load_dotenv(override=True) diff --git a/python/cocoindex/__init__.py b/python/cocoindex/__init__.py index 3b88969f..98b8421b 100644 --- a/python/cocoindex/__init__.py +++ b/python/cocoindex/__init__.py @@ -4,7 +4,7 @@ from . import functions, query, sources, storages, cli from .flow import FlowBuilder, DataScope, DataSlice, Flow, flow_def from .flow import EvaluateAndDumpOptions, GeneratedField, SourceRefreshOptions -from .flow import update_all_flows, FlowLiveUpdaterOptions +from .flow import update_all_flows, FlowLiveUpdater, FlowLiveUpdaterOptions from .llm import LlmSpec, LlmApiType from .vector import VectorSimilarityMetric from .lib import * diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index 2bf91d48..75e4b9ff 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -370,7 +370,7 @@ class FlowLiveUpdaterOptions: """ Options for live updating a flow. """ - live_mode: bool = False + live_mode: bool = True print_stats: bool = False class FlowLiveUpdater: @@ -379,9 +379,16 @@ class FlowLiveUpdater: """ _engine_live_updater: _engine.FlowLiveUpdater - def __init__(self, fl: Flow, options: FlowLiveUpdaterOptions): + def __init__(self, fl: Flow, options: FlowLiveUpdaterOptions | None = None): self._engine_live_updater = _engine.FlowLiveUpdater( - fl._lazy_engine_flow(), _dump_engine_object(options)) + fl._lazy_engine_flow(), _dump_engine_object(options or FlowLiveUpdaterOptions())) + + def __enter__(self) -> FlowLiveUpdater: + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.abort() + asyncio.run(self.wait()) async def wait(self) -> None: """ diff --git a/src/execution/live_updater.rs b/src/execution/live_updater.rs index 9794019d..fe3e30fb 100644 --- a/src/execution/live_updater.rs +++ b/src/execution/live_updater.rs @@ -177,8 +177,14 @@ impl FlowLiveUpdater { pub async fn wait(&mut self) -> Result<()> { while let Some(result) = self.tasks.join_next().await { - if let Err(e) = (|| anyhow::Ok(result??))() { - error!("{:?}", e.context("Error in applying changes from a source")); + match result { + Err(e) if !e.is_cancelled() => { + error!("{:?}", e); + } + Ok(Err(e)) => { + error!("{:?}", e.context("Error in applying changes from a source")); + } + _ => {} } } Ok(())