Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions src/execution/db_tracking_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,19 +82,16 @@ impl TrackingTableSetupStatusCheck {
}

#[async_trait]
impl ResourceSetupStatusCheck for TrackingTableSetupStatusCheck {
type Key = ();
type State = TrackingTableSetupState;

impl ResourceSetupStatusCheck<(), TrackingTableSetupState> for TrackingTableSetupStatusCheck {
fn describe_resource(&self) -> String {
"Tracking Table".to_string()
}

fn key(&self) -> &Self::Key {
fn key(&self) -> &() {
&()
}

fn desired_state(&self) -> Option<&Self::State> {
fn desired_state(&self) -> Option<&TrackingTableSetupState> {
self.desired_state.as_ref()
}

Expand Down
26 changes: 9 additions & 17 deletions src/ops/factory_bases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,7 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
key: Self::Key,
desired_state: Option<Self::SetupState>,
existing_states: setup::CombinedState<Self::SetupState>,
) -> Result<
impl setup::ResourceSetupStatusCheck<Key = Self::Key, State = Self::SetupState> + 'static,
>;
) -> Result<impl setup::ResourceSetupStatusCheck<Self::Key, Self::SetupState> + 'static>;

fn check_state_compatibility(
&self,
Expand All @@ -317,17 +315,14 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
}

struct ResourceSetupStatusCheckWrapper<T: StorageFactoryBase> {
inner:
Box<dyn setup::ResourceSetupStatusCheck<Key = T::Key, State = T::SetupState> + Send + Sync>,
inner: Box<dyn setup::ResourceSetupStatusCheck<T::Key, T::SetupState> + Send + Sync>,
key_json: serde_json::Value,
state_json: Option<serde_json::Value>,
}

impl<T: StorageFactoryBase> ResourceSetupStatusCheckWrapper<T> {
fn new(
inner: Box<
dyn setup::ResourceSetupStatusCheck<Key = T::Key, State = T::SetupState> + Send + Sync,
>,
inner: Box<dyn setup::ResourceSetupStatusCheck<T::Key, T::SetupState> + Send + Sync>,
) -> Result<Self> {
Ok(Self {
key_json: serde_json::to_value(inner.key())?,
Expand All @@ -347,19 +342,18 @@ impl<T: StorageFactoryBase> Debug for ResourceSetupStatusCheckWrapper<T> {
}

#[async_trait]
impl<T: StorageFactoryBase> setup::ResourceSetupStatusCheck for ResourceSetupStatusCheckWrapper<T> {
type Key = serde_json::Value;
type State = serde_json::Value;

impl<T: StorageFactoryBase> setup::ResourceSetupStatusCheck<serde_json::Value, serde_json::Value>
for ResourceSetupStatusCheckWrapper<T>
{
fn describe_resource(&self) -> String {
self.inner.describe_resource()
}

fn key(&self) -> &Self::Key {
fn key(&self) -> &serde_json::Value {
&self.key_json
}

fn desired_state(&self) -> Option<&Self::State> {
fn desired_state(&self) -> Option<&serde_json::Value> {
self.state_json.as_ref()
}

Expand Down Expand Up @@ -410,9 +404,7 @@ impl<T: StorageFactoryBase> ExportTargetFactory for T {
existing_states: setup::CombinedState<serde_json::Value>,
) -> Result<
Box<
dyn setup::ResourceSetupStatusCheck<Key = serde_json::Value, State = serde_json::Value>
+ Send
+ Sync,
dyn setup::ResourceSetupStatusCheck<serde_json::Value, serde_json::Value> + Send + Sync,
>,
> {
let key: T::Key = serde_json::from_value(key.clone())?;
Expand Down
4 changes: 1 addition & 3 deletions src/ops/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,7 @@ pub trait ExportTargetFactory {
existing_states: setup::CombinedState<serde_json::Value>,
) -> Result<
Box<
dyn setup::ResourceSetupStatusCheck<Key = serde_json::Value, State = serde_json::Value>
+ Send
+ Sync,
dyn setup::ResourceSetupStatusCheck<serde_json::Value, serde_json::Value> + Send + Sync,
>,
>;

Expand Down
12 changes: 4 additions & 8 deletions src/ops/storages/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,19 +736,16 @@ fn describe_index_spec(index_name: &str, index_spec: &VectorIndexDef) -> String
}

#[async_trait]
impl setup::ResourceSetupStatusCheck for SetupStatusCheck {
type Key = TableId;
type State = SetupState;

impl setup::ResourceSetupStatusCheck<TableId, SetupState> for SetupStatusCheck {
fn describe_resource(&self) -> String {
format!("Postgres table {}", self.table_id)
}

fn key(&self) -> &Self::Key {
fn key(&self) -> &TableId {
&self.table_id
}

fn desired_state(&self) -> Option<&Self::State> {
fn desired_state(&self) -> Option<&SetupState> {
self.desired_state.as_ref()
}

Expand Down Expand Up @@ -960,8 +957,7 @@ impl StorageFactoryBase for Arc<Factory> {
key: TableId,
desired: Option<SetupState>,
existing: setup::CombinedState<SetupState>,
) -> Result<impl setup::ResourceSetupStatusCheck<Key = TableId, State = SetupState> + 'static>
{
) -> Result<impl setup::ResourceSetupStatusCheck<TableId, SetupState> + 'static> {
Ok(SetupStatusCheck::new(self.clone(), key, desired, existing))
}

Expand Down
1 change: 1 addition & 0 deletions src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub(crate) use itertools::Itertools;
pub(crate) use serde::{de::DeserializeOwned, Deserialize, Serialize};
pub(crate) use std::borrow::Cow;
pub(crate) use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
pub(crate) use std::hash::Hash;
pub(crate) use std::sync::{Arc, LazyLock, Mutex, OnceLock, RwLock, Weak};

pub(crate) use crate::base::{schema, spec, value};
Expand Down
9 changes: 3 additions & 6 deletions src/setup/db_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,15 +295,12 @@ pub struct MetadataTableSetup {
}

#[async_trait]
impl ResourceSetupStatusCheck for MetadataTableSetup {
type Key = ();
type State = ();

fn key(&self) -> &Self::Key {
impl ResourceSetupStatusCheck<(), ()> for MetadataTableSetup {
fn key(&self) -> &() {
&()
}

fn desired_state(&self) -> Option<&Self::State> {
fn desired_state(&self) -> Option<&()> {
Some(&())
}

Expand Down
23 changes: 9 additions & 14 deletions src/setup/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,9 @@ pub fn check_flow_setup_status(
)
})?;
target_setup_state_updates.push((resource_id.clone(), v.desired.clone()));
let (desired_state, desired_common) = match v.desired {
Some(desired) => (
(!desired.common.setup_by_user).then_some(desired.state),
Some(desired.common),
),
None => (None, None),
};
let desired_state = v
.desired
.and_then(|state| (!state.common.setup_by_user).then_some(state.state));
let existing_without_setup_by_user = CombinedState {
current: v
.existing
Expand Down Expand Up @@ -295,11 +291,7 @@ pub fn check_flow_setup_status(
)?,
_ => bail!("Unexpected factory type for {}", resource_id.target_kind),
};
target_resources.push(TargetResourceSetupStatusCheck {
target_kind: resource_id.target_kind.clone(),
common: desired_common,
status_check,
});
target_resources.push(TargetResourceSetupStatusCheck { status_check });
}
}
Ok(FlowSetupStatusCheck {
Expand Down Expand Up @@ -356,9 +348,12 @@ pub fn drop_setup(
})
}

async fn maybe_update_resource_setup(
async fn maybe_update_resource_setup<
K: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash,
S: Debug + Clone + Serialize + DeserializeOwned,
>(
write: &mut impl std::io::Write,
resource: &(impl ResourceSetupStatusCheck + ?Sized),
resource: &(impl ResourceSetupStatusCheck<K, S> + ?Sized),
) -> Result<()> {
if resource.change_type() != SetupChangeType::NoChange {
writeln!(write, "{}:", resource.describe_resource(),)?;
Expand Down
86 changes: 67 additions & 19 deletions src/setup/states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use indenter::indented;
use std::fmt::Debug;
use std::fmt::{Display, Write};
use std::hash::Hash;
use std::marker::PhantomData;

use super::db_metadata;
use crate::execution::db_tracking_setup;
Expand Down Expand Up @@ -214,15 +215,16 @@ pub enum SetupChangeType {
}

#[async_trait]
pub trait ResourceSetupStatusCheck: Debug + Send + Sync {
type Key: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash;
type State: Debug + Clone + Serialize + DeserializeOwned;

pub trait ResourceSetupStatusCheck<K, S>: Debug + Send + Sync
where
K: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash,
S: Debug + Clone + Serialize + DeserializeOwned,
{
fn describe_resource(&self) -> String;

fn key(&self) -> &Self::Key;
fn key(&self) -> &K;

fn desired_state(&self) -> Option<&Self::State>;
fn desired_state(&self) -> Option<&S>;

fn describe_changes(&self) -> Vec<String>;

Expand All @@ -233,6 +235,37 @@ pub trait ResourceSetupStatusCheck: Debug + Send + Sync {
}
}

#[async_trait]
impl<K, S> ResourceSetupStatusCheck<K, S> for std::convert::Infallible
where
K: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash,
S: Debug + Clone + Serialize + DeserializeOwned,
{
fn describe_resource(&self) -> String {
unreachable!()
}

fn key(&self) -> &K {
unreachable!()
}

fn desired_state(&self) -> Option<&S> {
unreachable!()
}

fn describe_changes(&self) -> Vec<String> {
unreachable!()
}

fn change_type(&self) -> SetupChangeType {
unreachable!()
}

async fn apply_change(&self) -> Result<()> {
unreachable!()
}
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum ObjectStatus {
Invalid,
Expand All @@ -248,18 +281,17 @@ pub trait ObjectSetupStatusCheck {

#[derive(Debug)]
pub struct TargetResourceSetupStatusCheck {
pub target_kind: String,
pub common: Option<TargetSetupStateCommon>,
pub status_check: Box<
dyn ResourceSetupStatusCheck<Key = serde_json::Value, State = serde_json::Value>
+ Send
+ Sync,
>,
pub status_check:
Box<dyn ResourceSetupStatusCheck<serde_json::Value, serde_json::Value> + Send + Sync>,
}

impl std::fmt::Display for TargetResourceSetupStatusCheck {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", FormattedResourceSetup(self.status_check.as_ref()))
write!(
f,
"{}",
FormattedResourceSetup(self.status_check.as_ref(), PhantomData::default())
)
}
}

Expand Down Expand Up @@ -326,10 +358,18 @@ impl<StatusCheck: ObjectSetupStatusCheck> std::fmt::Display
}
}

pub struct FormattedResourceSetup<'a, Check: ResourceSetupStatusCheck + ?Sized>(&'a Check);
pub struct FormattedResourceSetup<
'a,
K: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash,
S: Debug + Clone + Serialize + DeserializeOwned,
Check: ResourceSetupStatusCheck<K, S> + ?Sized,
>(&'a Check, PhantomData<(K, S)>);

impl<Change: ResourceSetupStatusCheck + ?Sized> std::fmt::Display
for FormattedResourceSetup<'_, Change>
impl<K, S, Check> std::fmt::Display for FormattedResourceSetup<'_, K, S, Check>
where
K: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash,
S: Debug + Clone + Serialize + DeserializeOwned,
Check: ResourceSetupStatusCheck<K, S> + ?Sized,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let status_code = match self.0.change_type() {
Expand Down Expand Up @@ -377,7 +417,11 @@ impl std::fmt::Display for FormattedFlowSetupStatusCheck<'_> {
)?;

let mut f = indented(f).with_str(INDENT);
write!(f, "{}", FormattedResourceSetup(&flow_ssc.tracking_table))?;
write!(
f,
"{}",
FormattedResourceSetup(&flow_ssc.tracking_table, PhantomData::default())
)?;

for target_resource in &flow_ssc.target_resources {
writeln!(f, "{}", target_resource)?;
Expand All @@ -389,7 +433,11 @@ impl std::fmt::Display for FormattedFlowSetupStatusCheck<'_> {

impl std::fmt::Display for AllSetupStatusCheck {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", FormattedResourceSetup(&self.metadata_table))?;
write!(
f,
"{}",
FormattedResourceSetup(&self.metadata_table, PhantomData::default())
)?;
for (flow_name, flow_status) in &self.flows {
write!(
f,
Expand Down