From 002725b8858411a1aed9f31117d239d209e27abb Mon Sep 17 00:00:00 2001 From: LJ Date: Wed, 2 Apr 2025 09:32:55 -0700 Subject: [PATCH] Support `-L`/`--live-update` for `cocoindex server` subcommand. --- python/cocoindex/__init__.py | 3 ++- python/cocoindex/cli.py | 33 +++++++++++++++++---------------- python/cocoindex/flow.py | 20 +++++++++++++++++--- 3 files changed, 36 insertions(+), 20 deletions(-) diff --git a/python/cocoindex/__init__.py b/python/cocoindex/__init__.py index 27b93478..3b88969f 100644 --- a/python/cocoindex/__init__.py +++ b/python/cocoindex/__init__.py @@ -1,9 +1,10 @@ """ Cocoindex is a framework for building and running indexing pipelines. """ -from . import flow, functions, query, sources, storages, cli +from . import functions, query, sources, storages, cli from .flow import FlowBuilder, DataScope, DataSlice, Flow, flow_def from .flow import EvaluateAndDumpOptions, GeneratedField, SourceRefreshOptions +from .flow import update_all_flows, FlowLiveUpdaterOptions from .llm import LlmSpec, LlmApiType from .vector import VectorSimilarityMetric from .lib import * diff --git a/python/cocoindex/cli.py b/python/cocoindex/cli.py index 3bff88d2..d0ba7dd2 100644 --- a/python/cocoindex/cli.py +++ b/python/cocoindex/cli.py @@ -52,19 +52,17 @@ def setup(delete_legacy_flows): help="Continuously watch changes from data sources and apply to the target index.") @click.option( "-q", "--quiet", is_flag=True, show_default=True, default=False, - help="Avoid printing anything to the output, e.g. statistics.") + help="Avoid printing anything to the standard output, e.g. statistics.") def update(flow_name: str | None, live: bool, quiet: bool): """ Update the index to reflect the latest data from data sources. """ - async def _update_all(): - async def _update_flow(fl: flow.Flow): - updater = flow.FlowLiveUpdater( - fl, - flow.FlowLiveUpdaterOptions(live_mode=live, print_stats=not quiet)) - await updater.wait() - await asyncio.gather(*(_update_flow(fl) for fl in _flows_by_name(flow_name))) - asyncio.run(_update_all()) + options = flow.FlowLiveUpdaterOptions(live_mode=live, print_stats=not quiet) + if flow_name is None: + asyncio.run(flow.update_all_flows(options)) + else: + updater = flow.FlowLiveUpdater(_flow_by_name(flow_name), options) + asyncio.run(updater.wait()) @cli.command() @click.argument("flow_name", type=str, required=False) @@ -99,13 +97,22 @@ def evaluate(flow_name: str | None, output_dir: str | None, use_cache: bool = Tr "-c", "--cors-origin", type=str, default=_default_server_settings.cors_origin, help="The origin of the client (e.g. CocoInsight UI) to allow CORS from. " "e.g. `http://cocoindex.io` if you want to allow CocoInsight to access the server.") -def server(address: str, cors_origin: str | None): +@click.option( + "-L", "--live-update", is_flag=True, show_default=True, default=False, + help="Continuously watch changes from data sources and apply to the target index.") +@click.option( + "-q", "--quiet", is_flag=True, show_default=True, default=False, + help="Avoid printing anything to the standard output, e.g. statistics.") +def server(address: str, live_update: bool, quiet: bool, cors_origin: str | None): """ Start a HTTP server providing REST APIs. It will allow tools like CocoInsight to access the server. """ lib.start_server(lib.ServerSettings(address=address, cors_origin=cors_origin)) + if live_update: + options = flow.FlowLiveUpdaterOptions(live_mode=True, print_stats=not quiet) + asyncio.run(flow.update_all_flows(options)) input("Press Enter to stop...") @@ -124,9 +131,3 @@ def _flow_name(name: str | None) -> str: def _flow_by_name(name: str | None) -> flow.Flow: return flow.flow_by_name(_flow_name(name)) - -def _flows_by_name(name: str | None) -> list[flow.Flow]: - if name is None: - return flow.flows() - else: - return [flow.flow_by_name(name)] diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index 8f4ee45f..2bf91d48 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -4,6 +4,7 @@ from __future__ import annotations +import asyncio import re import inspect import datetime @@ -382,13 +383,13 @@ def __init__(self, fl: Flow, options: FlowLiveUpdaterOptions): self._engine_live_updater = _engine.FlowLiveUpdater( fl._lazy_engine_flow(), _dump_engine_object(options)) - async def wait(self): + async def wait(self) -> None: """ Wait for the live updater to finish. """ - return await self._engine_live_updater.wait() + await self._engine_live_updater.wait() - def abort(self): + def abort(self) -> None: """ Abort the live updater. """ @@ -522,6 +523,19 @@ def ensure_all_flows_built() -> None: for fl in _flows.values(): fl.internal_flow() +async def update_all_flows(options: FlowLiveUpdaterOptions) -> dict[str, _engine.IndexUpdateInfo]: + """ + Update all flows. + """ + ensure_all_flows_built() + async def _update_flow(fl: Flow) -> _engine.IndexUpdateInfo: + updater = FlowLiveUpdater(fl, options) + await updater.wait() + return updater.update_stats() + fls = flows() + all_stats = await asyncio.gather(*(_update_flow(fl) for fl in fls)) + return {fl.name: stats for fl, stats in zip(fls, all_stats)} + _transient_flow_name_builder = _NameBuilder() class TransientFlow: """