diff --git a/src/execution/db_tracking_setup.rs b/src/execution/db_tracking_setup.rs index d9a5bd33..0a2a8289 100644 --- a/src/execution/db_tracking_setup.rs +++ b/src/execution/db_tracking_setup.rs @@ -67,19 +67,25 @@ impl TrackingTableSetupStatusCheck { desired: Option<&TrackingTableSetupState>, existing: &CombinedState, source_ids_to_delete: Vec, - ) -> Self { - Self { - desired_state: desired.cloned(), - legacy_table_names: existing - .legacy_values(desired, |v| &v.table_name) - .into_iter() - .cloned() - .collect(), - min_existing_version_id: existing - .always_exists() - .then(|| existing.possible_versions().map(|v| v.version_id).min()) - .flatten(), - source_ids_to_delete, + ) -> Option { + let legacy_table_names = existing + .legacy_values(desired, |v| &v.table_name) + .into_iter() + .cloned() + .collect(); + let min_existing_version_id = existing + .always_exists() + .then(|| existing.possible_versions().map(|v| v.version_id).min()) + .flatten(); + if desired.is_some() || min_existing_version_id.is_some() { + Some(Self { + desired_state: desired.cloned(), + legacy_table_names, + min_existing_version_id, + source_ids_to_delete, + }) + } else { + None } } diff --git a/src/setup/driver.rs b/src/setup/driver.rs index 615bed95..96f53d5a 100644 --- a/src/setup/driver.rs +++ b/src/setup/driver.rs @@ -308,7 +308,7 @@ pub fn check_flow_setup_status( status: to_object_status(existing_state, desired_state)?, seen_flow_metadata_version: existing_state.and_then(|s| s.seen_flow_metadata_version), metadata_change, - tracking_table: tracking_table_change.into_setup_info(), + tracking_table: tracking_table_change.map(|c| c.into_setup_info()), target_resources, unknown_resources, }) @@ -412,25 +412,25 @@ pub async fn apply_changes( .transpose()?, ); } - if flow_status - .tracking_table - .status_check - .as_ref() - .map(|c| c.change_type() != SetupChangeType::NoChange) - .unwrap_or_default() - { - state_updates.insert( - db_metadata::ResourceTypeKey::new( - MetadataRecordType::TrackingTable.to_string(), - serde_json::Value::Null, - ), - flow_status - .tracking_table - .state - .as_ref() - .map(serde_json::to_value) - .transpose()?, - ); + if let Some(tracking_table) = &flow_status.tracking_table { + if tracking_table + .status_check + .as_ref() + .map(|c| c.change_type() != SetupChangeType::NoChange) + .unwrap_or_default() + { + state_updates.insert( + db_metadata::ResourceTypeKey::new( + MetadataRecordType::TrackingTable.to_string(), + serde_json::Value::Null, + ), + tracking_table + .state + .as_ref() + .map(serde_json::to_value) + .transpose()?, + ); + } } for target_resource in &flow_status.target_resources { state_updates.insert( @@ -454,10 +454,8 @@ pub async fn apply_changes( ) .await?; - maybe_update_resource_setup(write, &flow_status.tracking_table).await?; - - for target_resource in &flow_status.target_resources { - maybe_update_resource_setup(write, target_resource).await?; + if let Some(tracking_table) = &flow_status.tracking_table { + maybe_update_resource_setup(write, tracking_table).await?; } let is_deletion = flow_status.status == ObjectStatus::Deleted; diff --git a/src/setup/states.rs b/src/setup/states.rs index a0ef9a3b..e4e810f7 100644 --- a/src/setup/states.rs +++ b/src/setup/states.rs @@ -323,7 +323,7 @@ pub struct FlowSetupStatusCheck { pub metadata_change: Option>, pub tracking_table: - ResourceSetupInfo<(), TrackingTableSetupState, TrackingTableSetupStatusCheck>, + Option>, pub target_resources: Vec< ResourceSetupInfo>, >, @@ -338,7 +338,10 @@ impl ObjectSetupStatusCheck for FlowSetupStatusCheck { fn is_up_to_date(&self) -> bool { self.metadata_change.is_none() - && self.tracking_table.is_up_to_date() + && self + .tracking_table + .as_ref() + .is_none_or(|t| t.is_up_to_date()) && self .target_resources .iter() @@ -392,17 +395,19 @@ impl std::fmt::Display for FormattedFlowSetupStatusCheck<'_> { write!( f, "{} {}\n", - ObjectSetupStatusCode(flow_ssc).to_string().color(AnsiColors::Cyan), + ObjectSetupStatusCode(flow_ssc) + .to_string() + .color(AnsiColors::Cyan), format!("Flow: {}", self.0) )?; let mut f = indented(f).with_str(INDENT); - write!(f, "{}", flow_ssc.tracking_table)?; - + if let Some(tracking_table) = &flow_ssc.tracking_table { + write!(f, "{}", tracking_table)?; + } for target_resource in &flow_ssc.target_resources { write!(f, "{}", target_resource)?; } - for resource in &flow_ssc.unknown_resources { writeln!(f, "[ UNKNOWN ] {resource}")?; }