From 1ac3b8b16aed27eff29cca0704a8833044232965 Mon Sep 17 00:00:00 2001 From: Jiangzhou He Date: Sat, 4 Oct 2025 20:26:19 -0700 Subject: [PATCH] feat(attachment): update tracked states even if no update action needed --- src/execution/db_tracking_setup.rs | 4 ++++ src/setup/db_metadata.rs | 1 + src/setup/driver.rs | 18 +++++++++++++++--- src/setup/states.rs | 23 ++++++++++++++++++++++- 4 files changed, 42 insertions(+), 4 deletions(-) diff --git a/src/execution/db_tracking_setup.rs b/src/execution/db_tracking_setup.rs index fec3afca..83668b8b 100644 --- a/src/execution/db_tracking_setup.rs +++ b/src/execution/db_tracking_setup.rs @@ -103,6 +103,8 @@ pub struct TrackingTableSetupChange { pub legacy_source_state_table_names: BTreeSet, pub source_names_need_state_cleanup: BTreeMap>, + + has_state_change: bool, } impl TrackingTableSetupChange { @@ -136,6 +138,7 @@ impl TrackingTableSetupChange { legacy_source_state_table_names, min_existing_version_id, source_names_need_state_cleanup, + has_state_change: existing.has_state_diff(desired, |v| v), }) } else { None @@ -148,6 +151,7 @@ impl TrackingTableSetupChange { ResourceSetupInfo { key: (), state: self.desired_state.clone(), + has_tracked_state_change: self.has_state_change, description: "Internal Storage for Tracking".to_string(), setup_change: Some(self), legacy_key: None, diff --git a/src/setup/db_metadata.rs b/src/setup/db_metadata.rs index a1cd888f..ed62d589 100644 --- a/src/setup/db_metadata.rs +++ b/src/setup/db_metadata.rs @@ -323,6 +323,7 @@ impl MetadataTableSetup { ResourceSetupInfo { key: (), state: None, + has_tracked_state_change: self.metadata_table_missing, description: "CocoIndex Metadata Table".to_string(), setup_change: Some(self), legacy_key: None, diff --git a/src/setup/driver.rs b/src/setup/driver.rs index 9b71a710..4bc239ae 100644 --- a/src/setup/driver.rs +++ b/src/setup/driver.rs @@ -309,6 +309,13 @@ async fn collect_attachments_setup_change( let mut attachments_change = AttachmentsSetupChange::default(); for (AttachmentSetupKey(kind, key), setup_state) in grouped_attachment_states.into_iter() { + let has_diff = setup_state + .existing + .has_state_diff(setup_state.desired.as_ref(), |s| s); + if !has_diff { + continue; + } + attachments_change.has_tracked_state_change = true; let factory = get_attachment_factory(&kind)?; let is_upsertion = setup_state.desired.is_some(); if let Some(action) = factory @@ -428,9 +435,13 @@ pub async fn diff_flow_setup_states( .await?; let desired_state = target_states_group.desired.clone(); - let target_state = target_states_group + let desired_target_state = target_states_group .desired .and_then(|state| (!state.common.setup_by_user).then_some(state.state)); + let has_tracked_state_change = target_states_group + .existing + .has_state_diff(desired_target_state.as_ref(), |s| &s.state) + || attachments_change.has_tracked_state_change; let existing_without_setup_by_user = CombinedState { current: target_states_group .existing @@ -449,7 +460,7 @@ pub async fn diff_flow_setup_states( .collect(), legacy_state_key: target_states_group.existing.legacy_state_key.clone(), }; - let never_setup_by_sys = target_state.is_none() + let never_setup_by_sys = desired_target_state.is_none() && existing_without_setup_by_user.current.is_none() && existing_without_setup_by_user.staging.is_empty(); let setup_change = if never_setup_by_sys { @@ -459,7 +470,7 @@ pub async fn diff_flow_setup_states( target_change: factory .diff_setup_states( &resource_id.key, - target_state, + desired_target_state, existing_without_setup_by_user, flow_instance_ctx.clone(), ) @@ -471,6 +482,7 @@ pub async fn diff_flow_setup_states( target_resources.push(ResourceSetupInfo { key: resource_id.clone(), state: desired_state, + has_tracked_state_change, description: factory.describe_resource(&resource_id.key)?, setup_change, legacy_key: target_states_group diff --git a/src/setup/states.rs b/src/setup/states.rs index defd4034..33ecafcc 100644 --- a/src/setup/states.rs +++ b/src/setup/states.rs @@ -106,6 +106,17 @@ impl CombinedState { .filter(|v| Some(*v) != desired_value) .collect() } + + pub fn has_state_diff(&self, state: Option<&S>, map_fn: impl Fn(&T) -> &S) -> bool + where + S: PartialEq, + { + if let Some(state) = state { + !self.always_exists_and(|s| map_fn(s) == state) + } else { + self.possible_versions().next().is_some() + } + } } impl Default for CombinedState { @@ -320,6 +331,7 @@ impl ResourceSetupChange for std::convert::Infallible { pub struct ResourceSetupInfo { pub key: K, pub state: Option, + pub has_tracked_state_change: bool, pub description: String, /// If `None`, the resource is managed by users. @@ -406,6 +418,7 @@ pub trait ObjectSetupChange { #[derive(Default)] pub struct AttachmentsSetupChange { + pub has_tracked_state_change: bool, pub deletes: Vec>, pub upserts: Vec>, } @@ -472,7 +485,15 @@ impl ObjectSetupChange for FlowSetupChange { } fn has_internal_changes(&self) -> bool { - return self.metadata_change.is_some(); + self.metadata_change.is_some() + || self + .tracking_table + .as_ref() + .map_or(false, |t| t.has_tracked_state_change) + || self + .target_resources + .iter() + .any(|target| target.has_tracked_state_change) } fn has_external_changes(&self) -> bool {