Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 66 additions & 12 deletions python/cocoindex/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import datetime

from . import flow, lib
from .setup import check_setup_status, CheckSetupStatusOptions, apply_setup_changes
from .setup import sync_setup, drop_setup, flow_names_with_setup, apply_setup_changes

@click.group()
def cli():
Expand All @@ -12,12 +12,42 @@ def cli():
"""

@cli.command()
def ls():
@click.option(
"-a", "--all", "show_all", is_flag=True, show_default=True, default=False,
help="Also show all flows with persisted setup, even if not defined in the current process.")
def ls(show_all: bool):
"""
List all available flows.
List all flows.
"""
for name in flow.flow_names():
click.echo(name)
current_flow_names = [fl.name for fl in flow.flows()]
persisted_flow_names = flow_names_with_setup()
remaining_persisted_flow_names = set(persisted_flow_names)

has_missing_setup = False
has_extra_setup = False

for name in current_flow_names:
if name in remaining_persisted_flow_names:
remaining_persisted_flow_names.remove(name)
suffix = ''
else:
suffix = ' [+]'
has_missing_setup = True
click.echo(f'{name}{suffix}')

if show_all:
for name in persisted_flow_names:
if name in remaining_persisted_flow_names:
click.echo(f'{name} [?]')
has_extra_setup = True

if has_missing_setup or has_extra_setup:
click.echo('')
click.echo('Notes:')
if has_missing_setup:
click.echo(' [+]: Flows present in the current process, but missing setup.')
if has_extra_setup:
click.echo(' [?]: Flows with persisted setup, but not in the current process.')

@cli.command()
@click.argument("flow_name", type=str, required=False)
Expand All @@ -28,17 +58,41 @@ def show(flow_name: str | None):
click.echo(str(_flow_by_name(flow_name)))

@cli.command()
def setup():
"""
Check and apply backend setup changes for flows, including the internal and target storage
(to export).
"""
status_check = sync_setup()
click.echo(status_check)
if status_check.is_up_to_date():
click.echo("No changes need to be pushed.")
return
if not click.confirm(
"Changes need to be pushed. Continue? [yes/N]", default=False, show_default=False):
return
apply_setup_changes(status_check)

@cli.command()
@click.argument("flow_name", type=str, nargs=-1)
@click.option(
"-D", "--delete_legacy_flows", is_flag=True, show_default=True, default=False,
help="Also check / delete flows existing before but no longer exist.")
def setup(delete_legacy_flows):
"-a", "--all", "drop_all", is_flag=True, show_default=True, default=False,
help="Drop all flows with persisted setup, even if not defined in the current process.")
def drop(flow_name: tuple[str, ...], drop_all: bool):
"""
Check and apply backend setup changes for flows, including the internal and target storage (to export).
Drop the backend for specified flows.
If no flow is specified, all flows defined in the current process will be dropped.
"""
options = CheckSetupStatusOptions(delete_legacy_flows=delete_legacy_flows)
status_check = check_setup_status(options)
print(status_check)
if drop_all:
flow_names = flow_names_with_setup()
elif len(flow_name) == 0:
flow_names = [fl.name for fl in flow.flows()]
else:
flow_names = list(flow_name)
status_check = drop_setup(flow_names)
click.echo(status_check)
if status_check.is_up_to_date():
click.echo("No flows need to be dropped.")
return
if not click.confirm(
"Changes need to be pushed. Continue? [yes/N]", default=False, show_default=False):
Expand Down
15 changes: 8 additions & 7 deletions python/cocoindex/setup.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
from dataclasses import dataclass

from . import flow
from . import _engine

@dataclass
class CheckSetupStatusOptions:
delete_legacy_flows: bool
def sync_setup() -> _engine.SetupStatusCheck:
flow.ensure_all_flows_built()
return _engine.sync_setup()

def check_setup_status(options: CheckSetupStatusOptions) -> _engine.SetupStatusCheck:
def drop_setup(flow_names: list[str]) -> _engine.SetupStatusCheck:
flow.ensure_all_flows_built()
return _engine.check_setup_status(vars(options))
return _engine.drop_setup(flow_names)

def flow_names_with_setup() -> list[str]:
return _engine.flow_names_with_setup()

def apply_setup_changes(status_check: _engine.SetupStatusCheck):
_engine.apply_setup_changes(status_check)
2 changes: 1 addition & 1 deletion src/builder/flow_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ impl FlowBuilder {
pub fn new(name: &str) -> PyResult<Self> {
let lib_context = get_lib_context().into_py_result()?;
let existing_flow_ss = lib_context
.combined_setup_states
.all_setup_states
.read()
.unwrap()
.flows
Expand Down
6 changes: 3 additions & 3 deletions src/lib_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ static TOKIO_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| Runtime::new().unwrap
pub struct LibContext {
pub pool: PgPool,
pub flows: Mutex<BTreeMap<String, Arc<FlowContext>>>,
pub combined_setup_states: RwLock<setup::AllSetupState<setup::ExistingMode>>,
pub all_setup_states: RwLock<setup::AllSetupState<setup::ExistingMode>>,
}

impl LibContext {
Expand Down Expand Up @@ -94,14 +94,14 @@ pub fn create_lib_context(settings: settings::Settings) -> Result<LibContext> {
pyo3_async_runtimes::tokio::init_with_runtime(get_runtime()).unwrap();
});

let (pool, all_css) = get_runtime().block_on(async {
let (pool, all_setup_states) = get_runtime().block_on(async {
let pool = PgPool::connect(&settings.database_url).await?;
let existing_ss = setup::get_existing_setup_state(&pool).await?;
anyhow::Ok((pool, existing_ss))
})?;
Ok(LibContext {
pool,
combined_setup_states: RwLock::new(all_css),
all_setup_states: RwLock::new(all_setup_states),
flows: Mutex::new(BTreeMap::new()),
})
}
Expand Down
29 changes: 22 additions & 7 deletions src/py/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,17 +278,30 @@ impl SetupStatusCheck {
}

#[pyfunction]
fn check_setup_status(
options: Pythonized<setup::CheckSetupStatusOptions>,
) -> PyResult<SetupStatusCheck> {
fn sync_setup() -> PyResult<SetupStatusCheck> {
let lib_context = get_lib_context().into_py_result()?;
let flows = lib_context.flows.lock().unwrap();
let all_css = lib_context.combined_setup_states.read().unwrap();
let setup_status =
setup::check_setup_status(&flows, &all_css, options.into_inner()).into_py_result()?;
let all_setup_states = lib_context.all_setup_states.read().unwrap();
let setup_status = setup::sync_setup(&flows, &all_setup_states).into_py_result()?;
Ok(SetupStatusCheck(setup_status))
}

#[pyfunction]
fn drop_setup(flow_names: Vec<String>) -> PyResult<SetupStatusCheck> {
let lib_context = get_lib_context().into_py_result()?;
let all_setup_states = lib_context.all_setup_states.read().unwrap();
let setup_status = setup::drop_setup(flow_names, &all_setup_states).into_py_result()?;
Ok(SetupStatusCheck(setup_status))
}

#[pyfunction]
fn flow_names_with_setup() -> PyResult<Vec<String>> {
let lib_context = get_lib_context().into_py_result()?;
let all_setup_states = lib_context.all_setup_states.read().unwrap();
let flow_names = all_setup_states.flows.keys().cloned().collect();
Ok(flow_names)
}

#[pyfunction]
fn apply_setup_changes(py: Python<'_>, setup_status: &SetupStatusCheck) -> PyResult<()> {
py.allow_threads(|| {
Expand All @@ -314,8 +327,10 @@ fn cocoindex_engine(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(start_server, m)?)?;
m.add_function(wrap_pyfunction!(stop, m)?)?;
m.add_function(wrap_pyfunction!(register_function_factory, m)?)?;
m.add_function(wrap_pyfunction!(check_setup_status, m)?)?;
m.add_function(wrap_pyfunction!(sync_setup, m)?)?;
m.add_function(wrap_pyfunction!(drop_setup, m)?)?;
m.add_function(wrap_pyfunction!(apply_setup_changes, m)?)?;
m.add_function(wrap_pyfunction!(flow_names_with_setup, m)?)?;

m.add_class::<builder::flow_builder::FlowBuilder>()?;
m.add_class::<builder::flow_builder::DataCollector>()?;
Expand Down
41 changes: 24 additions & 17 deletions src/setup/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,16 +285,9 @@ pub fn check_flow_setup_status(
})
}

#[derive(Debug, Deserialize, Default)]
pub struct CheckSetupStatusOptions {
/// If true, also check / clean up flows existing before but no longer exist.
pub delete_legacy_flows: bool,
}

pub fn check_setup_status(
pub fn sync_setup(
flows: &BTreeMap<String, Arc<FlowContext>>,
all_setup_state: &AllSetupState<ExistingMode>,
options: CheckSetupStatusOptions,
) -> Result<AllSetupStatusCheck> {
let mut flow_status_checks = BTreeMap::new();
for (flow_name, flow_context) in flows {
Expand All @@ -304,19 +297,33 @@ pub fn check_setup_status(
check_flow_setup_status(Some(&flow_context.flow.desired_state), existing_state)?,
);
}
if options.delete_legacy_flows {
for (flow_name, existing_state) in all_setup_state.flows.iter() {
if !flows.contains_key(flow_name) {
flow_status_checks.insert(
flow_name.clone(),
check_flow_setup_status(None, Some(existing_state))?,
);
}
Ok(AllSetupStatusCheck {
metadata_table: db_metadata::MetadataTableSetup {
metadata_table_missing: !all_setup_state.has_metadata_table,
},
flows: flow_status_checks,
})
}

pub fn drop_setup(
flow_names: impl IntoIterator<Item = String>,
all_setup_state: &AllSetupState<ExistingMode>,
) -> Result<AllSetupStatusCheck> {
if !all_setup_state.has_metadata_table {
api_bail!("CocoIndex metadata table is missing.");
}
let mut flow_status_checks = BTreeMap::new();
for flow_name in flow_names.into_iter() {
if let Some(existing_state) = all_setup_state.flows.get(&flow_name) {
flow_status_checks.insert(
flow_name,
check_flow_setup_status(None, Some(existing_state))?,
);
}
}
Ok(AllSetupStatusCheck {
metadata_table: db_metadata::MetadataTableSetup {
metadata_table_missing: !all_setup_state.has_metadata_table,
metadata_table_missing: false,
},
flows: flow_status_checks,
})
Expand Down