diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index 031f4187..0c94e0c6 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -286,7 +286,8 @@ def collect(self, **kwargs): def export(self, name: str, target_spec: op.StorageSpec, /, *, primary_key_fields: Sequence[str] | None = None, - vector_index: Sequence[tuple[str, vector.VectorSimilarityMetric]] = ()): + vector_index: Sequence[tuple[str, vector.VectorSimilarityMetric]] = (), + setup_by_user: bool = False): """ Export the collected data to the specified target. """ @@ -298,7 +299,7 @@ def export(self, name: str, target_spec: op.StorageSpec, /, *, for field_name, metric in vector_index] self._flow_builder_state.engine_flow_builder.export( name, _spec_kind(target_spec), _dump_engine_object(target_spec), - index_options, self._engine_data_collector) + index_options, self._engine_data_collector, setup_by_user) _flow_name_builder = _NameBuilder() diff --git a/src/base/spec.rs b/src/base/spec.rs index 020be201..1d8b344f 100644 --- a/src/base/spec.rs +++ b/src/base/spec.rs @@ -231,6 +231,7 @@ pub struct ExportOpSpec { pub collector_name: FieldName, pub target: OpSpec, pub index_options: IndexOptions, + pub setup_by_user: bool, } /// A reactive operation reacts on given input values. diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index 5a7f1efc..973407ef 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -904,10 +904,15 @@ impl AnalyzerContext<'_> { let mut compatible_target_ids = HashSet::>::new(); let mut reusable_schema_version_ids = HashSet::>::new(); for existing_state in existing_target_states.iter().flat_map(|v| v.iter()) { - let compatibility = export_factory.check_state_compatibility( - &setup_output.desired_setup_state, - &existing_state.state, - )?; + let compatibility = + if export_op.spec.setup_by_user == existing_state.common.setup_by_user { + export_factory.check_state_compatibility( + &setup_output.desired_setup_state, + &existing_state.state, + )? + } else { + SetupStateCompatibility::NotCompatible + }; let compatible_target_id = if compatibility != SetupStateCompatibility::NotCompatible { reusable_schema_version_ids.insert( @@ -962,6 +967,7 @@ impl AnalyzerContext<'_> { target_id, schema_version_id, max_schema_version_id: max_schema_version_id.max(schema_version_id), + setup_by_user: export_op.spec.setup_by_user, }, state: setup_output.desired_setup_state, }); diff --git a/src/builder/flow_builder.rs b/src/builder/flow_builder.rs index 778b9f1a..2735a95e 100644 --- a/src/builder/flow_builder.rs +++ b/src/builder/flow_builder.rs @@ -579,6 +579,7 @@ impl FlowBuilder { Ok(()) } + #[pyo3(signature = (name, kind, op_spec, index_options, input, setup_by_user=false))] pub fn export( &mut self, name: String, @@ -586,6 +587,7 @@ impl FlowBuilder { op_spec: py::Pythonized>, index_options: py::Pythonized, input: &DataCollector, + setup_by_user: bool, ) -> PyResult<()> { let spec = spec::OpSpec { kind, @@ -603,6 +605,7 @@ impl FlowBuilder { collector_name: input.name.clone(), target: spec, index_options: index_options.into_inner(), + setup_by_user, }, }); Ok(()) diff --git a/src/ops/factory_bases.rs b/src/ops/factory_bases.rs index db5e06d0..fcee2ae4 100644 --- a/src/ops/factory_bases.rs +++ b/src/ops/factory_bases.rs @@ -288,7 +288,8 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static { context: Arc, ) -> Result>; - /// This is only called for non-user-setup targets. + /// Will not be called if it's setup by user. + /// It returns an error if the target only supports setup by user. fn check_setup_status( &self, key: Self::Key, diff --git a/src/ops/interface.rs b/src/ops/interface.rs index 9353538c..f217d056 100644 --- a/src/ops/interface.rs +++ b/src/ops/interface.rs @@ -162,8 +162,6 @@ pub struct ExportTargetBuildOutput { } pub trait ExportTargetFactory { - // The first field of the `input_schema` is the primary key field. - // If it has struct type, it should be converted to composite primary key. fn build( self: Arc, name: String, @@ -174,6 +172,8 @@ pub trait ExportTargetFactory { context: Arc, ) -> Result; + /// Will not be called if it's setup by user. + /// It returns an error if the target only supports setup by user. fn check_setup_status( &self, key: &serde_json::Value, diff --git a/src/setup/driver.rs b/src/setup/driver.rs index 578a4c2d..a97cbf5f 100644 --- a/src/setup/driver.rs +++ b/src/setup/driver.rs @@ -14,7 +14,6 @@ use super::{ db_metadata, CombinedState, DesiredMode, ExistingMode, FlowSetupState, FlowSetupStatusCheck, ObjectSetupStatusCheck, ObjectStatus, ResourceIdentifier, ResourceSetupStatusCheck, SetupChangeType, StateChange, TargetResourceSetupStatusCheck, TargetSetupState, - TargetSetupStateCommon, }; use super::{AllSetupState, AllSetupStatusCheck}; use crate::execution::db_tracking_setup; @@ -166,9 +165,8 @@ fn to_object_status(existing: Option, desired: Option) -> Result, - desired: Option, - existing: CombinedState, + desired: Option, + existing: CombinedState, } fn group_resource_states<'a>( @@ -181,8 +179,7 @@ fn group_resource_states<'a>( ( key, GroupedResourceStates { - desired_common: Some(state.common.clone()), - desired: Some(state.state.clone()), + desired: Some(state.clone()), existing: CombinedState::default(), }, ) @@ -199,14 +196,13 @@ fn group_resource_states<'a>( } let entry = entry.or_default(); if let Some(current) = &state.current { - entry.existing.current = Some(current.state.clone()); + entry.existing.current = Some(current.clone()); } for s in state.staging.iter() { match s { - StateChange::Upsert(v) => entry - .existing - .staging - .push(StateChange::Upsert(v.state.clone())), + StateChange::Upsert(v) => { + entry.existing.staging.push(StateChange::Upsert(v.clone())) + } StateChange::Delete => entry.existing.staging.push(StateChange::Delete), } } @@ -247,41 +243,72 @@ pub fn check_flow_setup_status( .collect(), ); - let target_resources = { - let grouped_target_resources = group_resource_states( - desired_state.iter().flat_map(|d| d.targets.iter()), - existing_state.iter().flat_map(|e| e.targets.iter()), - )?; - let registry = executor_factory_registry(); - grouped_target_resources - .into_iter() - .map(|(resource_id, v)| -> Result<_> { - let factory = registry.get(&resource_id.target_kind).ok_or_else(|| { - anyhow::anyhow!( - "Target resource type not found: {}", - resource_id.target_kind - ) - })?; - let status_check = match factory { - ExecutorFactory::ExportTarget(factory) => { - factory.check_setup_status(&resource_id.key, v.desired, v.existing)? + let mut target_setup_state_updates = Vec::new(); + let mut target_resources = Vec::new(); + + let grouped_target_resources = group_resource_states( + desired_state.iter().flat_map(|d| d.targets.iter()), + existing_state.iter().flat_map(|e| e.targets.iter()), + )?; + let registry = executor_factory_registry(); + for (resource_id, v) in grouped_target_resources.into_iter() { + let factory = registry.get(&resource_id.target_kind).ok_or_else(|| { + anyhow::anyhow!( + "Target resource type not found: {}", + resource_id.target_kind + ) + })?; + target_setup_state_updates.push((resource_id.clone(), v.desired.clone())); + let (desired_state, desired_common) = match v.desired { + Some(desired) => ( + (!desired.common.setup_by_user).then_some(desired.state), + Some(desired.common), + ), + None => (None, None), + }; + let existing_without_setup_by_user = CombinedState { + current: v + .existing + .current + .and_then(|s| s.state_unless_setup_by_user()), + staging: v + .existing + .staging + .into_iter() + .filter_map(|s| match s { + StateChange::Upsert(s) => { + s.state_unless_setup_by_user().map(StateChange::Upsert) } - _ => bail!("Unexpected factory type for {}", resource_id.target_kind), - }; - Ok(TargetResourceSetupStatusCheck { - target_kind: resource_id.target_kind.clone(), - common: v.desired_common, - status_check, + StateChange::Delete => Some(StateChange::Delete), }) - }) - .collect::>>()? - }; + .collect(), + }; + let never_setup_by_sys = desired_state.is_none() + && existing_without_setup_by_user.current.is_none() + && existing_without_setup_by_user.staging.is_empty(); + if !never_setup_by_sys { + let status_check = match factory { + ExecutorFactory::ExportTarget(factory) => factory.check_setup_status( + &resource_id.key, + desired_state, + existing_without_setup_by_user, + )?, + _ => bail!("Unexpected factory type for {}", resource_id.target_kind), + }; + target_resources.push(TargetResourceSetupStatusCheck { + target_kind: resource_id.target_kind.clone(), + common: desired_common, + status_check, + }); + } + } Ok(FlowSetupStatusCheck { status: to_object_status(existing_state, desired_state)?, seen_flow_metadata_version: existing_state.and_then(|s| s.seen_flow_metadata_version), metadata_change, tracking_table: tracking_table_change, target_resources, + target_setup_state_updates, }) } @@ -392,25 +419,15 @@ pub async fn apply_changes( .transpose()?, ); } - for target_resource in &flow_status.target_resources { + for (resource_id, state_update) in &flow_status.target_setup_state_updates { state_updates.insert( db_metadata::ResourceTypeKey::new( - MetadataRecordType::Target(target_resource.target_kind.clone()).to_string(), - target_resource.status_check.key().clone(), + MetadataRecordType::Target(resource_id.target_kind.clone()).to_string(), + resource_id.key.clone(), ), - target_resource - .common + state_update .as_ref() - .map(|c| { - serde_json::to_value(TargetSetupState { - common: c.clone(), - state: target_resource - .status_check - .desired_state() - .cloned() - .unwrap_or_default(), - }) - }) + .map(serde_json::to_value) .transpose()?, ); } diff --git a/src/setup/states.rs b/src/setup/states.rs index 7dce70f0..f4db45db 100644 --- a/src/setup/states.rs +++ b/src/setup/states.rs @@ -11,19 +11,14 @@ /// - [resource: tracking table] /// - Target /// - [resource: target-specific stuff] -use anyhow::Result; -use axum::async_trait; +use crate::prelude::*; + use indenter::indented; -use indexmap::IndexMap; -use serde::de::DeserializeOwned; -use serde::{Deserialize, Serialize}; -use std::collections::BTreeSet; +use std::fmt::Debug; use std::fmt::{Display, Write}; use std::hash::Hash; -use std::{collections::BTreeMap, fmt::Debug}; use super::db_metadata; -use crate::base::schema; use crate::execution::db_tracking_setup; const INDENT: &str = " "; @@ -142,6 +137,8 @@ pub struct TargetSetupStateCommon { pub target_id: i32, pub schema_version_id: i32, pub max_schema_version_id: i32, + #[serde(default)] + pub setup_by_user: bool, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] @@ -151,6 +148,12 @@ pub struct TargetSetupState { pub state: serde_json::Value, } +impl TargetSetupState { + pub fn state_unless_setup_by_user(self) -> Option { + (!self.common.setup_by_user).then_some(self.state) + } +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)] pub struct FlowSetupMetadata { pub last_source_id: i32, @@ -269,6 +272,7 @@ pub struct FlowSetupStatusCheck { pub tracking_table: db_tracking_setup::TrackingTableSetupStatusCheck, pub target_resources: Vec, + pub target_setup_state_updates: Vec<(ResourceIdentifier, Option)>, } impl ObjectSetupStatusCheck for FlowSetupStatusCheck { fn status(&self) -> ObjectStatus {