Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/cocoindex/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""
Cocoindex is a framework for building and running indexing pipelines.
"""
from . import flow, functions, query, sources, storages, cli
from . import functions, query, sources, storages, cli
from .flow import FlowBuilder, DataScope, DataSlice, Flow, flow_def
from .flow import EvaluateAndDumpOptions, GeneratedField, SourceRefreshOptions
from .flow import update_all_flows, FlowLiveUpdaterOptions
from .llm import LlmSpec, LlmApiType
from .vector import VectorSimilarityMetric
from .lib import *
Expand Down
33 changes: 17 additions & 16 deletions python/cocoindex/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,17 @@ def setup(delete_legacy_flows):
help="Continuously watch changes from data sources and apply to the target index.")
@click.option(
"-q", "--quiet", is_flag=True, show_default=True, default=False,
help="Avoid printing anything to the output, e.g. statistics.")
help="Avoid printing anything to the standard output, e.g. statistics.")
def update(flow_name: str | None, live: bool, quiet: bool):
"""
Update the index to reflect the latest data from data sources.
"""
async def _update_all():
async def _update_flow(fl: flow.Flow):
updater = flow.FlowLiveUpdater(
fl,
flow.FlowLiveUpdaterOptions(live_mode=live, print_stats=not quiet))
await updater.wait()
await asyncio.gather(*(_update_flow(fl) for fl in _flows_by_name(flow_name)))
asyncio.run(_update_all())
options = flow.FlowLiveUpdaterOptions(live_mode=live, print_stats=not quiet)
if flow_name is None:
asyncio.run(flow.update_all_flows(options))
else:
updater = flow.FlowLiveUpdater(_flow_by_name(flow_name), options)
asyncio.run(updater.wait())

@cli.command()
@click.argument("flow_name", type=str, required=False)
Expand Down Expand Up @@ -99,13 +97,22 @@ def evaluate(flow_name: str | None, output_dir: str | None, use_cache: bool = Tr
"-c", "--cors-origin", type=str, default=_default_server_settings.cors_origin,
help="The origin of the client (e.g. CocoInsight UI) to allow CORS from. "
"e.g. `http://cocoindex.io` if you want to allow CocoInsight to access the server.")
def server(address: str, cors_origin: str | None):
@click.option(
"-L", "--live-update", is_flag=True, show_default=True, default=False,
help="Continuously watch changes from data sources and apply to the target index.")
@click.option(
"-q", "--quiet", is_flag=True, show_default=True, default=False,
help="Avoid printing anything to the standard output, e.g. statistics.")
def server(address: str, live_update: bool, quiet: bool, cors_origin: str | None):
"""
Start a HTTP server providing REST APIs.

It will allow tools like CocoInsight to access the server.
"""
lib.start_server(lib.ServerSettings(address=address, cors_origin=cors_origin))
if live_update:
options = flow.FlowLiveUpdaterOptions(live_mode=True, print_stats=not quiet)
asyncio.run(flow.update_all_flows(options))
input("Press Enter to stop...")


Expand All @@ -124,9 +131,3 @@ def _flow_name(name: str | None) -> str:

def _flow_by_name(name: str | None) -> flow.Flow:
return flow.flow_by_name(_flow_name(name))

def _flows_by_name(name: str | None) -> list[flow.Flow]:
if name is None:
return flow.flows()
else:
return [flow.flow_by_name(name)]
20 changes: 17 additions & 3 deletions python/cocoindex/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from __future__ import annotations

import asyncio
import re
import inspect
import datetime
Expand Down Expand Up @@ -382,13 +383,13 @@ def __init__(self, fl: Flow, options: FlowLiveUpdaterOptions):
self._engine_live_updater = _engine.FlowLiveUpdater(
fl._lazy_engine_flow(), _dump_engine_object(options))

async def wait(self):
async def wait(self) -> None:
"""
Wait for the live updater to finish.
"""
return await self._engine_live_updater.wait()
await self._engine_live_updater.wait()

def abort(self):
def abort(self) -> None:
"""
Abort the live updater.
"""
Expand Down Expand Up @@ -522,6 +523,19 @@ def ensure_all_flows_built() -> None:
for fl in _flows.values():
fl.internal_flow()

async def update_all_flows(options: FlowLiveUpdaterOptions) -> dict[str, _engine.IndexUpdateInfo]:
"""
Update all flows.
"""
ensure_all_flows_built()
async def _update_flow(fl: Flow) -> _engine.IndexUpdateInfo:
updater = FlowLiveUpdater(fl, options)
await updater.wait()
return updater.update_stats()
fls = flows()
all_stats = await asyncio.gather(*(_update_flow(fl) for fl in fls))
return {fl.name: stats for fl, stats in zip(fls, all_stats)}

_transient_flow_name_builder = _NameBuilder()
class TransientFlow:
"""
Expand Down