Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/execution/db_tracking_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ pub struct TrackingTableSetupChange {
pub legacy_source_state_table_names: BTreeSet<String>,

pub source_names_need_state_cleanup: BTreeMap<i32, BTreeSet<String>>,

has_state_change: bool,
}

impl TrackingTableSetupChange {
Expand Down Expand Up @@ -136,6 +138,7 @@ impl TrackingTableSetupChange {
legacy_source_state_table_names,
min_existing_version_id,
source_names_need_state_cleanup,
has_state_change: existing.has_state_diff(desired, |v| v),
})
} else {
None
Expand All @@ -148,6 +151,7 @@ impl TrackingTableSetupChange {
ResourceSetupInfo {
key: (),
state: self.desired_state.clone(),
has_tracked_state_change: self.has_state_change,
description: "Internal Storage for Tracking".to_string(),
setup_change: Some(self),
legacy_key: None,
Expand Down
1 change: 1 addition & 0 deletions src/setup/db_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ impl MetadataTableSetup {
ResourceSetupInfo {
key: (),
state: None,
has_tracked_state_change: self.metadata_table_missing,
description: "CocoIndex Metadata Table".to_string(),
setup_change: Some(self),
legacy_key: None,
Expand Down
18 changes: 15 additions & 3 deletions src/setup/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,13 @@ async fn collect_attachments_setup_change(

let mut attachments_change = AttachmentsSetupChange::default();
for (AttachmentSetupKey(kind, key), setup_state) in grouped_attachment_states.into_iter() {
let has_diff = setup_state
.existing
.has_state_diff(setup_state.desired.as_ref(), |s| s);
if !has_diff {
continue;
}
attachments_change.has_tracked_state_change = true;
let factory = get_attachment_factory(&kind)?;
let is_upsertion = setup_state.desired.is_some();
if let Some(action) = factory
Expand Down Expand Up @@ -428,9 +435,13 @@ pub async fn diff_flow_setup_states(
.await?;

let desired_state = target_states_group.desired.clone();
let target_state = target_states_group
let desired_target_state = target_states_group
.desired
.and_then(|state| (!state.common.setup_by_user).then_some(state.state));
let has_tracked_state_change = target_states_group
.existing
.has_state_diff(desired_target_state.as_ref(), |s| &s.state)
|| attachments_change.has_tracked_state_change;
let existing_without_setup_by_user = CombinedState {
current: target_states_group
.existing
Expand All @@ -449,7 +460,7 @@ pub async fn diff_flow_setup_states(
.collect(),
legacy_state_key: target_states_group.existing.legacy_state_key.clone(),
};
let never_setup_by_sys = target_state.is_none()
let never_setup_by_sys = desired_target_state.is_none()
&& existing_without_setup_by_user.current.is_none()
&& existing_without_setup_by_user.staging.is_empty();
let setup_change = if never_setup_by_sys {
Expand All @@ -459,7 +470,7 @@ pub async fn diff_flow_setup_states(
target_change: factory
.diff_setup_states(
&resource_id.key,
target_state,
desired_target_state,
existing_without_setup_by_user,
flow_instance_ctx.clone(),
)
Expand All @@ -471,6 +482,7 @@ pub async fn diff_flow_setup_states(
target_resources.push(ResourceSetupInfo {
key: resource_id.clone(),
state: desired_state,
has_tracked_state_change,
description: factory.describe_resource(&resource_id.key)?,
setup_change,
legacy_key: target_states_group
Expand Down
23 changes: 22 additions & 1 deletion src/setup/states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,17 @@ impl<T> CombinedState<T> {
.filter(|v| Some(*v) != desired_value)
.collect()
}

pub fn has_state_diff<S>(&self, state: Option<&S>, map_fn: impl Fn(&T) -> &S) -> bool
where
S: PartialEq,
{
if let Some(state) = state {
!self.always_exists_and(|s| map_fn(s) == state)
} else {
self.possible_versions().next().is_some()
}
}
}

impl<T: Debug + Clone> Default for CombinedState<T> {
Expand Down Expand Up @@ -320,6 +331,7 @@ impl ResourceSetupChange for std::convert::Infallible {
pub struct ResourceSetupInfo<K, S, C: ResourceSetupChange> {
pub key: K,
pub state: Option<S>,
pub has_tracked_state_change: bool,
pub description: String,

/// If `None`, the resource is managed by users.
Expand Down Expand Up @@ -406,6 +418,7 @@ pub trait ObjectSetupChange {

#[derive(Default)]
pub struct AttachmentsSetupChange {
pub has_tracked_state_change: bool,
pub deletes: Vec<Box<dyn AttachmentSetupChange + Send + Sync>>,
pub upserts: Vec<Box<dyn AttachmentSetupChange + Send + Sync>>,
}
Expand Down Expand Up @@ -472,7 +485,15 @@ impl ObjectSetupChange for FlowSetupChange {
}

fn has_internal_changes(&self) -> bool {
return self.metadata_change.is_some();
self.metadata_change.is_some()
|| self
.tracking_table
.as_ref()
.map_or(false, |t| t.has_tracked_state_change)
|| self
.target_resources
.iter()
.any(|target| target.has_tracked_state_change)
}

fn has_external_changes(&self) -> bool {
Expand Down