Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions python/cocoindex/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ def collect(self, **kwargs):

def export(self, name: str, target_spec: op.StorageSpec, /, *,
primary_key_fields: Sequence[str] | None = None,
vector_index: Sequence[tuple[str, vector.VectorSimilarityMetric]] = ()):
vector_index: Sequence[tuple[str, vector.VectorSimilarityMetric]] = (),
setup_by_user: bool = False):
"""
Export the collected data to the specified target.
"""
Expand All @@ -298,7 +299,7 @@ def export(self, name: str, target_spec: op.StorageSpec, /, *,
for field_name, metric in vector_index]
self._flow_builder_state.engine_flow_builder.export(
name, _spec_kind(target_spec), _dump_engine_object(target_spec),
index_options, self._engine_data_collector)
index_options, self._engine_data_collector, setup_by_user)


_flow_name_builder = _NameBuilder()
Expand Down
1 change: 1 addition & 0 deletions src/base/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ pub struct ExportOpSpec {
pub collector_name: FieldName,
pub target: OpSpec,
pub index_options: IndexOptions,
pub setup_by_user: bool,
}

/// A reactive operation reacts on given input values.
Expand Down
14 changes: 10 additions & 4 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -904,10 +904,15 @@ impl AnalyzerContext<'_> {
let mut compatible_target_ids = HashSet::<Option<i32>>::new();
let mut reusable_schema_version_ids = HashSet::<Option<i32>>::new();
for existing_state in existing_target_states.iter().flat_map(|v| v.iter()) {
let compatibility = export_factory.check_state_compatibility(
&setup_output.desired_setup_state,
&existing_state.state,
)?;
let compatibility =
if export_op.spec.setup_by_user == existing_state.common.setup_by_user {
export_factory.check_state_compatibility(
&setup_output.desired_setup_state,
&existing_state.state,
)?
} else {
SetupStateCompatibility::NotCompatible
};
let compatible_target_id =
if compatibility != SetupStateCompatibility::NotCompatible {
reusable_schema_version_ids.insert(
Expand Down Expand Up @@ -962,6 +967,7 @@ impl AnalyzerContext<'_> {
target_id,
schema_version_id,
max_schema_version_id: max_schema_version_id.max(schema_version_id),
setup_by_user: export_op.spec.setup_by_user,
},
state: setup_output.desired_setup_state,
});
Expand Down
3 changes: 3 additions & 0 deletions src/builder/flow_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,13 +579,15 @@ impl FlowBuilder {
Ok(())
}

#[pyo3(signature = (name, kind, op_spec, index_options, input, setup_by_user=false))]
pub fn export(
&mut self,
name: String,
kind: String,
op_spec: py::Pythonized<serde_json::Map<String, serde_json::Value>>,
index_options: py::Pythonized<spec::IndexOptions>,
input: &DataCollector,
setup_by_user: bool,
) -> PyResult<()> {
let spec = spec::OpSpec {
kind,
Expand All @@ -603,6 +605,7 @@ impl FlowBuilder {
collector_name: input.name.clone(),
target: spec,
index_options: index_options.into_inner(),
setup_by_user,
},
});
Ok(())
Expand Down
3 changes: 2 additions & 1 deletion src/ops/factory_bases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
context: Arc<FlowInstanceContext>,
) -> Result<ExportTargetBuildOutput<Self>>;

/// This is only called for non-user-setup targets.
/// Will not be called if it's setup by user.
/// It returns an error if the target only supports setup by user.
fn check_setup_status(
&self,
key: Self::Key,
Expand Down
4 changes: 2 additions & 2 deletions src/ops/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,6 @@ pub struct ExportTargetBuildOutput {
}

pub trait ExportTargetFactory {
// The first field of the `input_schema` is the primary key field.
// If it has struct type, it should be converted to composite primary key.
fn build(
self: Arc<Self>,
name: String,
Expand All @@ -174,6 +172,8 @@ pub trait ExportTargetFactory {
context: Arc<FlowInstanceContext>,
) -> Result<ExportTargetBuildOutput>;

/// Will not be called if it's setup by user.
/// It returns an error if the target only supports setup by user.
fn check_setup_status(
&self,
key: &serde_json::Value,
Expand Down
123 changes: 70 additions & 53 deletions src/setup/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use super::{
db_metadata, CombinedState, DesiredMode, ExistingMode, FlowSetupState, FlowSetupStatusCheck,
ObjectSetupStatusCheck, ObjectStatus, ResourceIdentifier, ResourceSetupStatusCheck,
SetupChangeType, StateChange, TargetResourceSetupStatusCheck, TargetSetupState,
TargetSetupStateCommon,
};
use super::{AllSetupState, AllSetupStatusCheck};
use crate::execution::db_tracking_setup;
Expand Down Expand Up @@ -166,9 +165,8 @@ fn to_object_status<A, B>(existing: Option<A>, desired: Option<B>) -> Result<Obj

#[derive(Debug, Default)]
struct GroupedResourceStates {
desired_common: Option<TargetSetupStateCommon>,
desired: Option<serde_json::Value>,
existing: CombinedState<serde_json::Value>,
desired: Option<TargetSetupState>,
existing: CombinedState<TargetSetupState>,
}

fn group_resource_states<'a>(
Expand All @@ -181,8 +179,7 @@ fn group_resource_states<'a>(
(
key,
GroupedResourceStates {
desired_common: Some(state.common.clone()),
desired: Some(state.state.clone()),
desired: Some(state.clone()),
existing: CombinedState::default(),
},
)
Expand All @@ -199,14 +196,13 @@ fn group_resource_states<'a>(
}
let entry = entry.or_default();
if let Some(current) = &state.current {
entry.existing.current = Some(current.state.clone());
entry.existing.current = Some(current.clone());
}
for s in state.staging.iter() {
match s {
StateChange::Upsert(v) => entry
.existing
.staging
.push(StateChange::Upsert(v.state.clone())),
StateChange::Upsert(v) => {
entry.existing.staging.push(StateChange::Upsert(v.clone()))
}
StateChange::Delete => entry.existing.staging.push(StateChange::Delete),
}
}
Expand Down Expand Up @@ -247,41 +243,72 @@ pub fn check_flow_setup_status(
.collect(),
);

let target_resources = {
let grouped_target_resources = group_resource_states(
desired_state.iter().flat_map(|d| d.targets.iter()),
existing_state.iter().flat_map(|e| e.targets.iter()),
)?;
let registry = executor_factory_registry();
grouped_target_resources
.into_iter()
.map(|(resource_id, v)| -> Result<_> {
let factory = registry.get(&resource_id.target_kind).ok_or_else(|| {
anyhow::anyhow!(
"Target resource type not found: {}",
resource_id.target_kind
)
})?;
let status_check = match factory {
ExecutorFactory::ExportTarget(factory) => {
factory.check_setup_status(&resource_id.key, v.desired, v.existing)?
let mut target_setup_state_updates = Vec::new();
let mut target_resources = Vec::new();

let grouped_target_resources = group_resource_states(
desired_state.iter().flat_map(|d| d.targets.iter()),
existing_state.iter().flat_map(|e| e.targets.iter()),
)?;
let registry = executor_factory_registry();
for (resource_id, v) in grouped_target_resources.into_iter() {
let factory = registry.get(&resource_id.target_kind).ok_or_else(|| {
anyhow::anyhow!(
"Target resource type not found: {}",
resource_id.target_kind
)
})?;
target_setup_state_updates.push((resource_id.clone(), v.desired.clone()));
let (desired_state, desired_common) = match v.desired {
Some(desired) => (
(!desired.common.setup_by_user).then_some(desired.state),
Some(desired.common),
),
None => (None, None),
};
let existing_without_setup_by_user = CombinedState {
current: v
.existing
.current
.and_then(|s| s.state_unless_setup_by_user()),
staging: v
.existing
.staging
.into_iter()
.filter_map(|s| match s {
StateChange::Upsert(s) => {
s.state_unless_setup_by_user().map(StateChange::Upsert)
}
_ => bail!("Unexpected factory type for {}", resource_id.target_kind),
};
Ok(TargetResourceSetupStatusCheck {
target_kind: resource_id.target_kind.clone(),
common: v.desired_common,
status_check,
StateChange::Delete => Some(StateChange::Delete),
})
})
.collect::<Result<Vec<_>>>()?
};
.collect(),
};
let never_setup_by_sys = desired_state.is_none()
&& existing_without_setup_by_user.current.is_none()
&& existing_without_setup_by_user.staging.is_empty();
if !never_setup_by_sys {
let status_check = match factory {
ExecutorFactory::ExportTarget(factory) => factory.check_setup_status(
&resource_id.key,
desired_state,
existing_without_setup_by_user,
)?,
_ => bail!("Unexpected factory type for {}", resource_id.target_kind),
};
target_resources.push(TargetResourceSetupStatusCheck {
target_kind: resource_id.target_kind.clone(),
common: desired_common,
status_check,
});
}
}
Ok(FlowSetupStatusCheck {
status: to_object_status(existing_state, desired_state)?,
seen_flow_metadata_version: existing_state.and_then(|s| s.seen_flow_metadata_version),
metadata_change,
tracking_table: tracking_table_change,
target_resources,
target_setup_state_updates,
})
}

Expand Down Expand Up @@ -392,25 +419,15 @@ pub async fn apply_changes(
.transpose()?,
);
}
for target_resource in &flow_status.target_resources {
for (resource_id, state_update) in &flow_status.target_setup_state_updates {
state_updates.insert(
db_metadata::ResourceTypeKey::new(
MetadataRecordType::Target(target_resource.target_kind.clone()).to_string(),
target_resource.status_check.key().clone(),
MetadataRecordType::Target(resource_id.target_kind.clone()).to_string(),
resource_id.key.clone(),
),
target_resource
.common
state_update
.as_ref()
.map(|c| {
serde_json::to_value(TargetSetupState {
common: c.clone(),
state: target_resource
.status_check
.desired_state()
.cloned()
.unwrap_or_default(),
})
})
.map(serde_json::to_value)
.transpose()?,
);
}
Expand Down
20 changes: 12 additions & 8 deletions src/setup/states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,14 @@
/// - [resource: tracking table]
/// - Target
/// - [resource: target-specific stuff]
use anyhow::Result;
use axum::async_trait;
use crate::prelude::*;

use indenter::indented;
use indexmap::IndexMap;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::collections::BTreeSet;
use std::fmt::Debug;
use std::fmt::{Display, Write};
use std::hash::Hash;
use std::{collections::BTreeMap, fmt::Debug};

use super::db_metadata;
use crate::base::schema;
use crate::execution::db_tracking_setup;

const INDENT: &str = " ";
Expand Down Expand Up @@ -142,6 +137,8 @@ pub struct TargetSetupStateCommon {
pub target_id: i32,
pub schema_version_id: i32,
pub max_schema_version_id: i32,
#[serde(default)]
pub setup_by_user: bool,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
Expand All @@ -151,6 +148,12 @@ pub struct TargetSetupState {
pub state: serde_json::Value,
}

impl TargetSetupState {
pub fn state_unless_setup_by_user(self) -> Option<serde_json::Value> {
(!self.common.setup_by_user).then_some(self.state)
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
pub struct FlowSetupMetadata {
pub last_source_id: i32,
Expand Down Expand Up @@ -269,6 +272,7 @@ pub struct FlowSetupStatusCheck {

pub tracking_table: db_tracking_setup::TrackingTableSetupStatusCheck,
pub target_resources: Vec<TargetResourceSetupStatusCheck>,
pub target_setup_state_updates: Vec<(ResourceIdentifier, Option<TargetSetupState>)>,
}
impl ObjectSetupStatusCheck for FlowSetupStatusCheck {
fn status(&self) -> ObjectStatus {
Expand Down