diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index b50ba7b01..60d489a08 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -885,10 +885,8 @@ impl AnalyzerContext<'_> { } }; - let target_id: i32 = 1; // TODO: Fill it with a meaningful value automatically let ((setup_key, desired_state), executor_fut) = export_factory.clone().build( export_op.name.clone(), - target_id, spec, key_fields_schema, value_fields_schema, @@ -902,44 +900,42 @@ impl AnalyzerContext<'_> { let existing_target_states = existing_target_states.get(&resource_id); let target_id = setup_state .map(|setup_state| -> Result { - let existing_target_ids = existing_target_states - .iter() - .flat_map(|v| v.iter()) - .map(|state| state.common.target_id) - .collect::>(); - let target_id = if existing_target_ids.len() == 1 { - existing_target_ids.into_iter().next().unwrap() + let mut compatible_target_ids = HashSet::>::new(); + let mut reusable_schema_version_ids = HashSet::>::new(); + for existing_state in existing_target_states.iter().flat_map(|v| v.iter()) { + let compatibility = export_factory + .check_state_compatibility(&desired_state, &existing_state.state)?; + let compatible_target_id = + if compatibility != SetupStateCompatibility::NotCompatible { + reusable_schema_version_ids.insert( + (compatibility == SetupStateCompatibility::Compatible) + .then_some(existing_state.common.schema_version_id), + ); + Some(existing_state.common.target_id) + } else { + None + }; + compatible_target_ids.insert(compatible_target_id); + } + + let target_id = if compatible_target_ids.len() == 1 { + compatible_target_ids.into_iter().next().flatten() } else { - if existing_target_ids.len() > 1 { + if compatible_target_ids.len() > 1 { warn!("Multiple target states with the same key schema found"); } + None + }; + let target_id = target_id.unwrap_or_else(|| { setup_state.metadata.last_target_id += 1; setup_state.metadata.last_target_id - }; + }); let max_schema_version_id = existing_target_states .iter() .flat_map(|v| v.iter()) .map(|s| s.common.max_schema_version_id) .max() .unwrap_or(0); - let reusable_schema_version_ids = existing_target_states - .iter() - .flat_map(|v| v.iter()) - .map(|s| { - Ok({ - if export_factory.will_keep_all_existing_data( - &export_op.name, - target_id, - &desired_state, - &s.state, - )? { - Some(s.common.schema_version_id) - } else { - None - } - }) - }) - .collect::>>()?; let schema_version_id = if reusable_schema_version_ids.len() == 1 { reusable_schema_version_ids .into_iter() diff --git a/src/builder/flow_builder.rs b/src/builder/flow_builder.rs index 53f966316..6b451874b 100644 --- a/src/builder/flow_builder.rs +++ b/src/builder/flow_builder.rs @@ -523,7 +523,6 @@ impl FlowBuilder { ) -> PyResult<()> { let common_scope = Self::minimum_common_scope(fields.iter().map(|(_, ds)| &ds.scope), None) .into_py_result()?; - let has_auto_uuid_field = auto_uuid_field.is_some(); let name = format!(".collect.{}", self.next_generated_op_id); self.next_generated_op_id += 1; self.do_in_scope( diff --git a/src/ops/factory_bases.rs b/src/ops/factory_bases.rs index 28cbea6e7..e1b52506a 100644 --- a/src/ops/factory_bases.rs +++ b/src/ops/factory_bases.rs @@ -281,7 +281,6 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static { fn build( self: Arc, name: String, - target_id: i32, spec: Self::Spec, key_fields_schema: Vec, value_fields_schema: Vec, @@ -301,13 +300,11 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static { impl setup::ResourceSetupStatusCheck + 'static, >; - fn will_keep_all_existing_data( + fn check_state_compatibility( &self, - name: &str, - target_id: i32, desired_state: &Self::SetupState, existing_state: &Self::SetupState, - ) -> Result; + ) -> Result; fn register(self, registry: &mut ExecutorFactoryRegistry) -> Result<()> where @@ -384,7 +381,6 @@ impl ExportTargetFactory for T { fn build( self: Arc, name: String, - target_id: i32, spec: serde_json::Value, key_fields_schema: Vec, value_fields_schema: Vec, @@ -398,7 +394,6 @@ impl ExportTargetFactory for T { let ((setup_key, setup_state), executors) = StorageFactoryBase::build( self, name, - target_id, spec, key_fields_schema, value_fields_schema, @@ -438,17 +433,13 @@ impl ExportTargetFactory for T { )?)) } - fn will_keep_all_existing_data( + fn check_state_compatibility( &self, - name: &str, - target_id: i32, desired_state: &serde_json::Value, existing_state: &serde_json::Value, - ) -> Result { - let result = StorageFactoryBase::will_keep_all_existing_data( + ) -> Result { + let result = StorageFactoryBase::check_state_compatibility( self, - name, - target_id, &serde_json::from_value(desired_state.clone())?, &serde_json::from_value(existing_state.clone())?, )?; diff --git a/src/ops/interface.rs b/src/ops/interface.rs index e040b6cce..ac420a992 100644 --- a/src/ops/interface.rs +++ b/src/ops/interface.rs @@ -86,13 +86,24 @@ pub trait ExportTargetExecutor: Send + Sync { async fn apply_mutation(&self, mutation: ExportTargetMutation) -> Result<()>; } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SetupStateCompatibility { + /// The resource is fully compatible with the desired state. + /// This means the resource can be updated to the desired state without any loss of data. + Compatible, + /// The resource is partially compatible with the desired state. + /// This means some existing data will be lost after applying the setup change. + PartialCompatible, + /// The resource needs to be rebuilt + NotCompatible, +} + pub trait ExportTargetFactory { // The first field of the `input_schema` is the primary key field. // If it has struct type, it should be converted to composite primary key. fn build( self: Arc, name: String, - target_id: i32, spec: serde_json::Value, key_fields_schema: Vec, value_fields_schema: Vec, @@ -116,13 +127,11 @@ pub trait ExportTargetFactory { >, >; - fn will_keep_all_existing_data( + fn check_state_compatibility( &self, - name: &str, - target_id: i32, desired_state: &serde_json::Value, existing_state: &serde_json::Value, - ) -> Result; + ) -> Result; } #[derive(Clone)] diff --git a/src/ops/storages/postgres.rs b/src/ops/storages/postgres.rs index 3b63edfe7..efdab32dd 100644 --- a/src/ops/storages/postgres.rs +++ b/src/ops/storages/postgres.rs @@ -894,7 +894,6 @@ impl StorageFactoryBase for Arc { fn build( self: Arc, name: String, - target_id: i32, spec: Spec, key_fields_schema: Vec, value_fields_schema: Vec, @@ -906,9 +905,9 @@ impl StorageFactoryBase for Arc { )> { let table_id = TableId { database_url: spec.database_url.clone(), - table_name: spec.table_name.unwrap_or_else(|| { - format!("{}__{}__{}", context.flow_instance_name, name, target_id) - }), + table_name: spec + .table_name + .unwrap_or_else(|| format!("{}__{}", context.flow_instance_name, name)), }; let setup_state = SetupState::new( &table_id, @@ -943,22 +942,30 @@ impl StorageFactoryBase for Arc { Ok(SetupStatusCheck::new(self.clone(), key, desired, existing)) } - fn will_keep_all_existing_data( + fn check_state_compatibility( &self, - _name: &str, - _target_id: i32, desired: &SetupState, existing: &SetupState, - ) -> Result { - let result = existing - .key_fields_schema - .iter() - .all(|(k, v)| desired.key_fields_schema.get(k) == Some(v)) + ) -> Result { + let is_key_identical = existing.key_fields_schema.len() == desired.key_fields_schema.len() && existing + .key_fields_schema + .iter() + .all(|(k, v)| desired.key_fields_schema.get(k) == Some(v)); + let compatibility = if is_key_identical { + let is_value_lossy = existing .value_fields_schema .iter() .any(|(k, v)| desired.value_fields_schema.get(k) != Some(v)); - Ok(result) + if is_value_lossy { + SetupStateCompatibility::PartialCompatible + } else { + SetupStateCompatibility::Compatible + } + } else { + SetupStateCompatibility::NotCompatible + }; + Ok(compatibility) } }