From b68729d089fa2a587fa63a316283c955ddcdbe1f Mon Sep 17 00:00:00 2001 From: LJ Date: Sun, 6 Apr 2025 14:03:12 -0700 Subject: [PATCH] Implement `ResourceSetupStatusCheck` for `Infallible`. --- src/execution/db_tracking_setup.rs | 9 ++-- src/ops/factory_bases.rs | 26 ++++----- src/ops/interface.rs | 4 +- src/ops/storages/postgres.rs | 12 ++--- src/prelude.rs | 1 + src/setup/db_metadata.rs | 9 ++-- src/setup/driver.rs | 23 ++++---- src/setup/states.rs | 86 +++++++++++++++++++++++------- 8 files changed, 97 insertions(+), 73 deletions(-) diff --git a/src/execution/db_tracking_setup.rs b/src/execution/db_tracking_setup.rs index 62554b19..14bb4e4c 100644 --- a/src/execution/db_tracking_setup.rs +++ b/src/execution/db_tracking_setup.rs @@ -82,19 +82,16 @@ impl TrackingTableSetupStatusCheck { } #[async_trait] -impl ResourceSetupStatusCheck for TrackingTableSetupStatusCheck { - type Key = (); - type State = TrackingTableSetupState; - +impl ResourceSetupStatusCheck<(), TrackingTableSetupState> for TrackingTableSetupStatusCheck { fn describe_resource(&self) -> String { "Tracking Table".to_string() } - fn key(&self) -> &Self::Key { + fn key(&self) -> &() { &() } - fn desired_state(&self) -> Option<&Self::State> { + fn desired_state(&self) -> Option<&TrackingTableSetupState> { self.desired_state.as_ref() } diff --git a/src/ops/factory_bases.rs b/src/ops/factory_bases.rs index fcee2ae4..97152e06 100644 --- a/src/ops/factory_bases.rs +++ b/src/ops/factory_bases.rs @@ -295,9 +295,7 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static { key: Self::Key, desired_state: Option, existing_states: setup::CombinedState, - ) -> Result< - impl setup::ResourceSetupStatusCheck + 'static, - >; + ) -> Result + 'static>; fn check_state_compatibility( &self, @@ -317,17 +315,14 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static { } struct ResourceSetupStatusCheckWrapper { - inner: - Box + Send + Sync>, + inner: Box + Send + Sync>, key_json: serde_json::Value, state_json: Option, } impl ResourceSetupStatusCheckWrapper { fn new( - inner: Box< - dyn setup::ResourceSetupStatusCheck + Send + Sync, - >, + inner: Box + Send + Sync>, ) -> Result { Ok(Self { key_json: serde_json::to_value(inner.key())?, @@ -347,19 +342,18 @@ impl Debug for ResourceSetupStatusCheckWrapper { } #[async_trait] -impl setup::ResourceSetupStatusCheck for ResourceSetupStatusCheckWrapper { - type Key = serde_json::Value; - type State = serde_json::Value; - +impl setup::ResourceSetupStatusCheck + for ResourceSetupStatusCheckWrapper +{ fn describe_resource(&self) -> String { self.inner.describe_resource() } - fn key(&self) -> &Self::Key { + fn key(&self) -> &serde_json::Value { &self.key_json } - fn desired_state(&self) -> Option<&Self::State> { + fn desired_state(&self) -> Option<&serde_json::Value> { self.state_json.as_ref() } @@ -410,9 +404,7 @@ impl ExportTargetFactory for T { existing_states: setup::CombinedState, ) -> Result< Box< - dyn setup::ResourceSetupStatusCheck - + Send - + Sync, + dyn setup::ResourceSetupStatusCheck + Send + Sync, >, > { let key: T::Key = serde_json::from_value(key.clone())?; diff --git a/src/ops/interface.rs b/src/ops/interface.rs index 59c10f40..90b9356e 100644 --- a/src/ops/interface.rs +++ b/src/ops/interface.rs @@ -182,9 +182,7 @@ pub trait ExportTargetFactory { existing_states: setup::CombinedState, ) -> Result< Box< - dyn setup::ResourceSetupStatusCheck - + Send - + Sync, + dyn setup::ResourceSetupStatusCheck + Send + Sync, >, >; diff --git a/src/ops/storages/postgres.rs b/src/ops/storages/postgres.rs index 61b4926e..c50eb587 100644 --- a/src/ops/storages/postgres.rs +++ b/src/ops/storages/postgres.rs @@ -736,19 +736,16 @@ fn describe_index_spec(index_name: &str, index_spec: &VectorIndexDef) -> String } #[async_trait] -impl setup::ResourceSetupStatusCheck for SetupStatusCheck { - type Key = TableId; - type State = SetupState; - +impl setup::ResourceSetupStatusCheck for SetupStatusCheck { fn describe_resource(&self) -> String { format!("Postgres table {}", self.table_id) } - fn key(&self) -> &Self::Key { + fn key(&self) -> &TableId { &self.table_id } - fn desired_state(&self) -> Option<&Self::State> { + fn desired_state(&self) -> Option<&SetupState> { self.desired_state.as_ref() } @@ -960,8 +957,7 @@ impl StorageFactoryBase for Arc { key: TableId, desired: Option, existing: setup::CombinedState, - ) -> Result + 'static> - { + ) -> Result + 'static> { Ok(SetupStatusCheck::new(self.clone(), key, desired, existing)) } diff --git a/src/prelude.rs b/src/prelude.rs index 7a8a7539..afcfd590 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -10,6 +10,7 @@ pub(crate) use itertools::Itertools; pub(crate) use serde::{de::DeserializeOwned, Deserialize, Serialize}; pub(crate) use std::borrow::Cow; pub(crate) use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +pub(crate) use std::hash::Hash; pub(crate) use std::sync::{Arc, LazyLock, Mutex, OnceLock, RwLock, Weak}; pub(crate) use crate::base::{schema, spec, value}; diff --git a/src/setup/db_metadata.rs b/src/setup/db_metadata.rs index 97008d79..fa8ceefe 100644 --- a/src/setup/db_metadata.rs +++ b/src/setup/db_metadata.rs @@ -295,15 +295,12 @@ pub struct MetadataTableSetup { } #[async_trait] -impl ResourceSetupStatusCheck for MetadataTableSetup { - type Key = (); - type State = (); - - fn key(&self) -> &Self::Key { +impl ResourceSetupStatusCheck<(), ()> for MetadataTableSetup { + fn key(&self) -> &() { &() } - fn desired_state(&self) -> Option<&Self::State> { + fn desired_state(&self) -> Option<&()> { Some(&()) } diff --git a/src/setup/driver.rs b/src/setup/driver.rs index a97cbf5f..f456527c 100644 --- a/src/setup/driver.rs +++ b/src/setup/driver.rs @@ -259,13 +259,9 @@ pub fn check_flow_setup_status( ) })?; target_setup_state_updates.push((resource_id.clone(), v.desired.clone())); - let (desired_state, desired_common) = match v.desired { - Some(desired) => ( - (!desired.common.setup_by_user).then_some(desired.state), - Some(desired.common), - ), - None => (None, None), - }; + let desired_state = v + .desired + .and_then(|state| (!state.common.setup_by_user).then_some(state.state)); let existing_without_setup_by_user = CombinedState { current: v .existing @@ -295,11 +291,7 @@ pub fn check_flow_setup_status( )?, _ => bail!("Unexpected factory type for {}", resource_id.target_kind), }; - target_resources.push(TargetResourceSetupStatusCheck { - target_kind: resource_id.target_kind.clone(), - common: desired_common, - status_check, - }); + target_resources.push(TargetResourceSetupStatusCheck { status_check }); } } Ok(FlowSetupStatusCheck { @@ -356,9 +348,12 @@ pub fn drop_setup( }) } -async fn maybe_update_resource_setup( +async fn maybe_update_resource_setup< + K: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash, + S: Debug + Clone + Serialize + DeserializeOwned, +>( write: &mut impl std::io::Write, - resource: &(impl ResourceSetupStatusCheck + ?Sized), + resource: &(impl ResourceSetupStatusCheck + ?Sized), ) -> Result<()> { if resource.change_type() != SetupChangeType::NoChange { writeln!(write, "{}:", resource.describe_resource(),)?; diff --git a/src/setup/states.rs b/src/setup/states.rs index f4db45db..df849704 100644 --- a/src/setup/states.rs +++ b/src/setup/states.rs @@ -17,6 +17,7 @@ use indenter::indented; use std::fmt::Debug; use std::fmt::{Display, Write}; use std::hash::Hash; +use std::marker::PhantomData; use super::db_metadata; use crate::execution::db_tracking_setup; @@ -214,15 +215,16 @@ pub enum SetupChangeType { } #[async_trait] -pub trait ResourceSetupStatusCheck: Debug + Send + Sync { - type Key: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash; - type State: Debug + Clone + Serialize + DeserializeOwned; - +pub trait ResourceSetupStatusCheck: Debug + Send + Sync +where + K: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash, + S: Debug + Clone + Serialize + DeserializeOwned, +{ fn describe_resource(&self) -> String; - fn key(&self) -> &Self::Key; + fn key(&self) -> &K; - fn desired_state(&self) -> Option<&Self::State>; + fn desired_state(&self) -> Option<&S>; fn describe_changes(&self) -> Vec; @@ -233,6 +235,37 @@ pub trait ResourceSetupStatusCheck: Debug + Send + Sync { } } +#[async_trait] +impl ResourceSetupStatusCheck for std::convert::Infallible +where + K: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash, + S: Debug + Clone + Serialize + DeserializeOwned, +{ + fn describe_resource(&self) -> String { + unreachable!() + } + + fn key(&self) -> &K { + unreachable!() + } + + fn desired_state(&self) -> Option<&S> { + unreachable!() + } + + fn describe_changes(&self) -> Vec { + unreachable!() + } + + fn change_type(&self) -> SetupChangeType { + unreachable!() + } + + async fn apply_change(&self) -> Result<()> { + unreachable!() + } +} + #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] pub enum ObjectStatus { Invalid, @@ -248,18 +281,17 @@ pub trait ObjectSetupStatusCheck { #[derive(Debug)] pub struct TargetResourceSetupStatusCheck { - pub target_kind: String, - pub common: Option, - pub status_check: Box< - dyn ResourceSetupStatusCheck - + Send - + Sync, - >, + pub status_check: + Box + Send + Sync>, } impl std::fmt::Display for TargetResourceSetupStatusCheck { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", FormattedResourceSetup(self.status_check.as_ref())) + write!( + f, + "{}", + FormattedResourceSetup(self.status_check.as_ref(), PhantomData::default()) + ) } } @@ -326,10 +358,18 @@ impl std::fmt::Display } } -pub struct FormattedResourceSetup<'a, Check: ResourceSetupStatusCheck + ?Sized>(&'a Check); +pub struct FormattedResourceSetup< + 'a, + K: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash, + S: Debug + Clone + Serialize + DeserializeOwned, + Check: ResourceSetupStatusCheck + ?Sized, +>(&'a Check, PhantomData<(K, S)>); -impl std::fmt::Display - for FormattedResourceSetup<'_, Change> +impl std::fmt::Display for FormattedResourceSetup<'_, K, S, Check> +where + K: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash, + S: Debug + Clone + Serialize + DeserializeOwned, + Check: ResourceSetupStatusCheck + ?Sized, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let status_code = match self.0.change_type() { @@ -377,7 +417,11 @@ impl std::fmt::Display for FormattedFlowSetupStatusCheck<'_> { )?; let mut f = indented(f).with_str(INDENT); - write!(f, "{}", FormattedResourceSetup(&flow_ssc.tracking_table))?; + write!( + f, + "{}", + FormattedResourceSetup(&flow_ssc.tracking_table, PhantomData::default()) + )?; for target_resource in &flow_ssc.target_resources { writeln!(f, "{}", target_resource)?; @@ -389,7 +433,11 @@ impl std::fmt::Display for FormattedFlowSetupStatusCheck<'_> { impl std::fmt::Display for AllSetupStatusCheck { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", FormattedResourceSetup(&self.metadata_table))?; + write!( + f, + "{}", + FormattedResourceSetup(&self.metadata_table, PhantomData::default()) + )?; for (flow_name, flow_status) in &self.flows { write!( f,