From d20dec1c149a20a7f64477739f7a40dadf323076 Mon Sep 17 00:00:00 2001 From: Jiangzhou He Date: Wed, 16 Jul 2025 14:16:21 -0700 Subject: [PATCH] fix(annotation): make `execution_context.run()` have speicfic annotation --- examples/amazon_s3_embedding/main.py | 11 ++++++++++- python/cocoindex/flow.py | 7 ++----- python/cocoindex/runtime.py | 7 +++++-- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/examples/amazon_s3_embedding/main.py b/examples/amazon_s3_embedding/main.py index fd23654d9..48789dd1d 100644 --- a/examples/amazon_s3_embedding/main.py +++ b/examples/amazon_s3_embedding/main.py @@ -101,7 +101,16 @@ def _main() -> None: pool = ConnectionPool(os.getenv("COCOINDEX_DATABASE_URL")) amazon_s3_text_embedding_flow.setup() - with cocoindex.FlowLiveUpdater(amazon_s3_text_embedding_flow): + with cocoindex.FlowLiveUpdater(amazon_s3_text_embedding_flow) as updater: + updater.abort() + updater.wait() + + while True: + updates = updater.next_status_updates() + print(f"Updates: {updates}") + if not updates.active_sources: + break + # Run queries in a loop to demonstrate the query capabilities. while True: query = input("Enter search query (or Enter to quit): ") diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index d85198703..28791bf24 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -879,10 +879,7 @@ def update_all_flows( """ Update all flows. """ - return cast( - dict[str, _engine.IndexUpdateInfo], - execution_context.run(update_all_flows_async(options)), - ) + return execution_context.run(update_all_flows_async(options)) async def update_all_flows_async( @@ -1037,7 +1034,7 @@ def eval(self, *args: Any, **kwargs: Any) -> T: """ Evaluate the transform flow. """ - return cast(T, execution_context.run(self.eval_async(*args, **kwargs))) + return execution_context.run(self.eval_async(*args, **kwargs)) async def eval_async(self, *args: Any, **kwargs: Any) -> T: """ diff --git a/python/cocoindex/runtime.py b/python/cocoindex/runtime.py index c98288029..b52a11d03 100644 --- a/python/cocoindex/runtime.py +++ b/python/cocoindex/runtime.py @@ -5,7 +5,10 @@ import threading import asyncio -from typing import Any, Coroutine +from typing import Any, Coroutine, TypeVar + + +T = TypeVar("T") class _ExecutionContext: @@ -26,7 +29,7 @@ def event_loop(self) -> asyncio.AbstractEventLoop: ).start() return self._event_loop - def run(self, coro: Coroutine[Any, Any, Any]) -> Any: + def run(self, coro: Coroutine[Any, Any, T]) -> T: """Run a coroutine in the event loop, blocking until it finishes. Return its result.""" return asyncio.run_coroutine_threadsafe(coro, self.event_loop).result()