From 626f69e73330ce27e93ee927ed14c3479c2948f4 Mon Sep 17 00:00:00 2001 From: LJ Date: Thu, 3 Apr 2025 17:50:08 -0700 Subject: [PATCH] Make setup management more flexible and straightforward. --- python/cocoindex/cli.py | 78 +++++++++++++++++++++++++++++++------ python/cocoindex/setup.py | 15 +++---- src/builder/flow_builder.rs | 2 +- src/lib_context.rs | 6 +-- src/py/mod.rs | 29 ++++++++++---- src/setup/driver.rs | 41 +++++++++++-------- 6 files changed, 124 insertions(+), 47 deletions(-) diff --git a/python/cocoindex/cli.py b/python/cocoindex/cli.py index d0ba7dd2d..97d6884ef 100644 --- a/python/cocoindex/cli.py +++ b/python/cocoindex/cli.py @@ -3,7 +3,7 @@ import datetime from . import flow, lib -from .setup import check_setup_status, CheckSetupStatusOptions, apply_setup_changes +from .setup import sync_setup, drop_setup, flow_names_with_setup, apply_setup_changes @click.group() def cli(): @@ -12,12 +12,42 @@ def cli(): """ @cli.command() -def ls(): +@click.option( + "-a", "--all", "show_all", is_flag=True, show_default=True, default=False, + help="Also show all flows with persisted setup, even if not defined in the current process.") +def ls(show_all: bool): """ - List all available flows. + List all flows. """ - for name in flow.flow_names(): - click.echo(name) + current_flow_names = [fl.name for fl in flow.flows()] + persisted_flow_names = flow_names_with_setup() + remaining_persisted_flow_names = set(persisted_flow_names) + + has_missing_setup = False + has_extra_setup = False + + for name in current_flow_names: + if name in remaining_persisted_flow_names: + remaining_persisted_flow_names.remove(name) + suffix = '' + else: + suffix = ' [+]' + has_missing_setup = True + click.echo(f'{name}{suffix}') + + if show_all: + for name in persisted_flow_names: + if name in remaining_persisted_flow_names: + click.echo(f'{name} [?]') + has_extra_setup = True + + if has_missing_setup or has_extra_setup: + click.echo('') + click.echo('Notes:') + if has_missing_setup: + click.echo(' [+]: Flows present in the current process, but missing setup.') + if has_extra_setup: + click.echo(' [?]: Flows with persisted setup, but not in the current process.') @cli.command() @click.argument("flow_name", type=str, required=False) @@ -28,17 +58,41 @@ def show(flow_name: str | None): click.echo(str(_flow_by_name(flow_name))) @cli.command() +def setup(): + """ + Check and apply backend setup changes for flows, including the internal and target storage + (to export). + """ + status_check = sync_setup() + click.echo(status_check) + if status_check.is_up_to_date(): + click.echo("No changes need to be pushed.") + return + if not click.confirm( + "Changes need to be pushed. Continue? [yes/N]", default=False, show_default=False): + return + apply_setup_changes(status_check) + +@cli.command() +@click.argument("flow_name", type=str, nargs=-1) @click.option( - "-D", "--delete_legacy_flows", is_flag=True, show_default=True, default=False, - help="Also check / delete flows existing before but no longer exist.") -def setup(delete_legacy_flows): + "-a", "--all", "drop_all", is_flag=True, show_default=True, default=False, + help="Drop all flows with persisted setup, even if not defined in the current process.") +def drop(flow_name: tuple[str, ...], drop_all: bool): """ - Check and apply backend setup changes for flows, including the internal and target storage (to export). + Drop the backend for specified flows. + If no flow is specified, all flows defined in the current process will be dropped. """ - options = CheckSetupStatusOptions(delete_legacy_flows=delete_legacy_flows) - status_check = check_setup_status(options) - print(status_check) + if drop_all: + flow_names = flow_names_with_setup() + elif len(flow_name) == 0: + flow_names = [fl.name for fl in flow.flows()] + else: + flow_names = list(flow_name) + status_check = drop_setup(flow_names) + click.echo(status_check) if status_check.is_up_to_date(): + click.echo("No flows need to be dropped.") return if not click.confirm( "Changes need to be pushed. Continue? [yes/N]", default=False, show_default=False): diff --git a/python/cocoindex/setup.py b/python/cocoindex/setup.py index b877e112e..529751a8b 100644 --- a/python/cocoindex/setup.py +++ b/python/cocoindex/setup.py @@ -1,15 +1,16 @@ -from dataclasses import dataclass - from . import flow from . import _engine -@dataclass -class CheckSetupStatusOptions: - delete_legacy_flows: bool +def sync_setup() -> _engine.SetupStatusCheck: + flow.ensure_all_flows_built() + return _engine.sync_setup() -def check_setup_status(options: CheckSetupStatusOptions) -> _engine.SetupStatusCheck: +def drop_setup(flow_names: list[str]) -> _engine.SetupStatusCheck: flow.ensure_all_flows_built() - return _engine.check_setup_status(vars(options)) + return _engine.drop_setup(flow_names) + +def flow_names_with_setup() -> list[str]: + return _engine.flow_names_with_setup() def apply_setup_changes(status_check: _engine.SetupStatusCheck): _engine.apply_setup_changes(status_check) diff --git a/src/builder/flow_builder.rs b/src/builder/flow_builder.rs index b87509b04..778b9f1af 100644 --- a/src/builder/flow_builder.rs +++ b/src/builder/flow_builder.rs @@ -340,7 +340,7 @@ impl FlowBuilder { pub fn new(name: &str) -> PyResult { let lib_context = get_lib_context().into_py_result()?; let existing_flow_ss = lib_context - .combined_setup_states + .all_setup_states .read() .unwrap() .flows diff --git a/src/lib_context.rs b/src/lib_context.rs index 7b3a248a2..9b71ece40 100644 --- a/src/lib_context.rs +++ b/src/lib_context.rs @@ -63,7 +63,7 @@ static TOKIO_RUNTIME: LazyLock = LazyLock::new(|| Runtime::new().unwrap pub struct LibContext { pub pool: PgPool, pub flows: Mutex>>, - pub combined_setup_states: RwLock>, + pub all_setup_states: RwLock>, } impl LibContext { @@ -94,14 +94,14 @@ pub fn create_lib_context(settings: settings::Settings) -> Result { pyo3_async_runtimes::tokio::init_with_runtime(get_runtime()).unwrap(); }); - let (pool, all_css) = get_runtime().block_on(async { + let (pool, all_setup_states) = get_runtime().block_on(async { let pool = PgPool::connect(&settings.database_url).await?; let existing_ss = setup::get_existing_setup_state(&pool).await?; anyhow::Ok((pool, existing_ss)) })?; Ok(LibContext { pool, - combined_setup_states: RwLock::new(all_css), + all_setup_states: RwLock::new(all_setup_states), flows: Mutex::new(BTreeMap::new()), }) } diff --git a/src/py/mod.rs b/src/py/mod.rs index 0b48f4037..12b5a1f85 100644 --- a/src/py/mod.rs +++ b/src/py/mod.rs @@ -278,17 +278,30 @@ impl SetupStatusCheck { } #[pyfunction] -fn check_setup_status( - options: Pythonized, -) -> PyResult { +fn sync_setup() -> PyResult { let lib_context = get_lib_context().into_py_result()?; let flows = lib_context.flows.lock().unwrap(); - let all_css = lib_context.combined_setup_states.read().unwrap(); - let setup_status = - setup::check_setup_status(&flows, &all_css, options.into_inner()).into_py_result()?; + let all_setup_states = lib_context.all_setup_states.read().unwrap(); + let setup_status = setup::sync_setup(&flows, &all_setup_states).into_py_result()?; Ok(SetupStatusCheck(setup_status)) } +#[pyfunction] +fn drop_setup(flow_names: Vec) -> PyResult { + let lib_context = get_lib_context().into_py_result()?; + let all_setup_states = lib_context.all_setup_states.read().unwrap(); + let setup_status = setup::drop_setup(flow_names, &all_setup_states).into_py_result()?; + Ok(SetupStatusCheck(setup_status)) +} + +#[pyfunction] +fn flow_names_with_setup() -> PyResult> { + let lib_context = get_lib_context().into_py_result()?; + let all_setup_states = lib_context.all_setup_states.read().unwrap(); + let flow_names = all_setup_states.flows.keys().cloned().collect(); + Ok(flow_names) +} + #[pyfunction] fn apply_setup_changes(py: Python<'_>, setup_status: &SetupStatusCheck) -> PyResult<()> { py.allow_threads(|| { @@ -314,8 +327,10 @@ fn cocoindex_engine(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_function(wrap_pyfunction!(start_server, m)?)?; m.add_function(wrap_pyfunction!(stop, m)?)?; m.add_function(wrap_pyfunction!(register_function_factory, m)?)?; - m.add_function(wrap_pyfunction!(check_setup_status, m)?)?; + m.add_function(wrap_pyfunction!(sync_setup, m)?)?; + m.add_function(wrap_pyfunction!(drop_setup, m)?)?; m.add_function(wrap_pyfunction!(apply_setup_changes, m)?)?; + m.add_function(wrap_pyfunction!(flow_names_with_setup, m)?)?; m.add_class::()?; m.add_class::()?; diff --git a/src/setup/driver.rs b/src/setup/driver.rs index b002e2b73..578a4c2da 100644 --- a/src/setup/driver.rs +++ b/src/setup/driver.rs @@ -285,16 +285,9 @@ pub fn check_flow_setup_status( }) } -#[derive(Debug, Deserialize, Default)] -pub struct CheckSetupStatusOptions { - /// If true, also check / clean up flows existing before but no longer exist. - pub delete_legacy_flows: bool, -} - -pub fn check_setup_status( +pub fn sync_setup( flows: &BTreeMap>, all_setup_state: &AllSetupState, - options: CheckSetupStatusOptions, ) -> Result { let mut flow_status_checks = BTreeMap::new(); for (flow_name, flow_context) in flows { @@ -304,19 +297,33 @@ pub fn check_setup_status( check_flow_setup_status(Some(&flow_context.flow.desired_state), existing_state)?, ); } - if options.delete_legacy_flows { - for (flow_name, existing_state) in all_setup_state.flows.iter() { - if !flows.contains_key(flow_name) { - flow_status_checks.insert( - flow_name.clone(), - check_flow_setup_status(None, Some(existing_state))?, - ); - } + Ok(AllSetupStatusCheck { + metadata_table: db_metadata::MetadataTableSetup { + metadata_table_missing: !all_setup_state.has_metadata_table, + }, + flows: flow_status_checks, + }) +} + +pub fn drop_setup( + flow_names: impl IntoIterator, + all_setup_state: &AllSetupState, +) -> Result { + if !all_setup_state.has_metadata_table { + api_bail!("CocoIndex metadata table is missing."); + } + let mut flow_status_checks = BTreeMap::new(); + for flow_name in flow_names.into_iter() { + if let Some(existing_state) = all_setup_state.flows.get(&flow_name) { + flow_status_checks.insert( + flow_name, + check_flow_setup_status(None, Some(existing_state))?, + ); } } Ok(AllSetupStatusCheck { metadata_table: db_metadata::MetadataTableSetup { - metadata_table_missing: !all_setup_state.has_metadata_table, + metadata_table_missing: false, }, flows: flow_status_checks, })