diff --git a/src/builder/analyzed_flow.rs b/src/builder/analyzed_flow.rs index 1485b3a8..4f1f8df2 100644 --- a/src/builder/analyzed_flow.rs +++ b/src/builder/analyzed_flow.rs @@ -21,13 +21,13 @@ impl AnalyzedFlow { flow_instance: crate::base::spec::FlowInstanceSpec, existing_flow_ss: Option<&setup::FlowSetupState>, registry: &ExecutorFactoryRegistry, - auth_registry: Arc, + auth_registry: &Arc, ) -> Result { let ctx = analyzer::build_flow_instance_context(&flow_instance.name, auth_registry); let (data_schema, execution_plan_fut, desired_state) = analyzer::analyze_flow(&flow_instance, &ctx, existing_flow_ss, registry)?; let setup_status_check = - setup::check_flow_setup_status(Some(&desired_state), existing_flow_ss)?; + setup::check_flow_setup_status(Some(&desired_state), existing_flow_ss, auth_registry)?; let execution_plan = if setup_status_check.is_up_to_date() { Some( async move { @@ -73,7 +73,7 @@ impl AnalyzedTransientFlow { pub async fn from_transient_flow( transient_flow: spec::TransientFlowSpec, registry: &ExecutorFactoryRegistry, - auth_registry: Arc, + auth_registry: &Arc, ) -> Result { let ctx = analyzer::build_flow_instance_context(&transient_flow.name, auth_registry); let (output_type, data_schema, execution_plan_fut) = diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index 0aabedf7..3f244a5b 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -1029,11 +1029,11 @@ impl AnalyzerContext<'_> { pub fn build_flow_instance_context( flow_inst_name: &str, - auth_registry: Arc, + auth_registry: &Arc, ) -> Arc { Arc::new(FlowInstanceContext { flow_instance_name: flow_inst_name.to_string(), - auth_registry, + auth_registry: auth_registry.clone(), }) } diff --git a/src/builder/flow_builder.rs b/src/builder/flow_builder.rs index a60b1f32..3c57596c 100644 --- a/src/builder/flow_builder.rs +++ b/src/builder/flow_builder.rs @@ -347,8 +347,7 @@ impl FlowBuilder { .get(name) .cloned(); let root_data_scope = Arc::new(Mutex::new(DataScopeBuilder::new())); - let flow_inst_context = - build_flow_instance_context(name, lib_context.auth_registry.clone()); + let flow_inst_context = build_flow_instance_context(name, &lib_context.auth_registry); let result = Self { lib_context, flow_inst_context, @@ -650,7 +649,7 @@ impl FlowBuilder { spec, self.existing_flow_ss.as_ref(), &crate::ops::executor_factory_registry(), - self.lib_context.auth_registry.clone(), + &self.lib_context.auth_registry, )) }) .into_py_result()?; @@ -691,7 +690,7 @@ impl FlowBuilder { get_runtime().block_on(super::AnalyzedTransientFlow::from_transient_flow( spec, &crate::ops::executor_factory_registry(), - self.lib_context.auth_registry.clone(), + &self.lib_context.auth_registry, )) }) .into_py_result()?; diff --git a/src/ops/factory_bases.rs b/src/ops/factory_bases.rs index 97152e06..0a1aa611 100644 --- a/src/ops/factory_bases.rs +++ b/src/ops/factory_bases.rs @@ -295,6 +295,7 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static { key: Self::Key, desired_state: Option, existing_states: setup::CombinedState, + auth_registry: &Arc, ) -> Result + 'static>; fn check_state_compatibility( @@ -402,6 +403,7 @@ impl ExportTargetFactory for T { key: &serde_json::Value, desired_state: Option, existing_states: setup::CombinedState, + auth_registry: &Arc, ) -> Result< Box< dyn setup::ResourceSetupStatusCheck + Send + Sync, @@ -412,8 +414,13 @@ impl ExportTargetFactory for T { .map(|v| serde_json::from_value(v.clone())) .transpose()?; let existing_states = from_json_combined_state(existing_states)?; - let status_check = - StorageFactoryBase::check_setup_status(self, key, desired_state, existing_states)?; + let status_check = StorageFactoryBase::check_setup_status( + self, + key, + desired_state, + existing_states, + auth_registry, + )?; Ok(Box::new(ResourceSetupStatusCheckWrapper::::new( Box::new(status_check), )?)) diff --git a/src/ops/interface.rs b/src/ops/interface.rs index a487c894..cbac8756 100644 --- a/src/ops/interface.rs +++ b/src/ops/interface.rs @@ -181,6 +181,7 @@ pub trait ExportTargetFactory { key: &serde_json::Value, desired_state: Option, existing_states: setup::CombinedState, + auth_registry: &Arc, ) -> Result< Box< dyn setup::ResourceSetupStatusCheck + Send + Sync, diff --git a/src/ops/registration.rs b/src/ops/registration.rs index 5d7678b9..5251c08a 100644 --- a/src/ops/registration.rs +++ b/src/ops/registration.rs @@ -14,6 +14,9 @@ fn register_executor_factories(registry: &mut ExecutorFactoryRegistry) -> Result Arc::new(storages::postgres::Factory::default()).register(registry)?; + let neo4j_pool = Arc::new(storages::neo4j::GraphPool::default()); + storages::neo4j::RelationshipFactory::new(neo4j_pool).register(registry)?; + Ok(()) } diff --git a/src/ops/storages/neo4j.rs b/src/ops/storages/neo4j.rs index c5ff293d..76e91712 100644 --- a/src/ops/storages/neo4j.rs +++ b/src/ops/storages/neo4j.rs @@ -1,7 +1,5 @@ -use std::convert::Infallible; - use crate::prelude::*; -use crate::setup::ResourceSetupStatusCheck; +use crate::setup::{ResourceSetupStatusCheck, SetupChangeType}; use crate::{ops::sdk::*, setup::CombinedState}; use neo4rs::{BoltType, ConfigBuilder, Graph}; @@ -25,8 +23,8 @@ pub struct NodeSpec { #[derive(Debug, Deserialize)] pub struct RelationshipSpec { - connection: Neo4jConnectionSpec, - relationship_label: String, + connection: AuthEntryReference, + relationship: String, source_node: NodeSpec, target_node: NodeSpec, } @@ -34,7 +32,6 @@ pub struct RelationshipSpec { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] struct GraphKey { uri: String, - user: String, db: String, } @@ -42,7 +39,6 @@ impl GraphKey { fn from_spec(spec: &Neo4jConnectionSpec) -> Self { Self { uri: spec.uri.clone(), - user: spec.user.clone(), db: spec.db.clone().unwrap_or_else(|| DEFAULT_DB.to_string()), } } @@ -50,19 +46,20 @@ impl GraphKey { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] pub struct GraphRelationship { - graph: GraphKey, + connection: AuthEntryReference, relationship: String, } impl GraphRelationship { fn from_spec(spec: &RelationshipSpec) -> Self { Self { - graph: GraphKey::from_spec(&spec.connection), - relationship: spec.relationship_label.clone(), + connection: spec.connection.clone(), + relationship: spec.relationship.clone(), } } } +#[derive(Default)] pub struct GraphPool { graphs: Mutex>>>>, } @@ -248,7 +245,7 @@ impl RelationshipStorageExecutor { ) -> Self { let delete_cypher = format!( r#" -OPTIONAL MATCH (old_src)-[old_rel:{rel_label} {{{rel_key_field_name}: ${REL_ID_PARAM}}}]->(old_tgt) +OPTIONAL MATCH (old_src)-[old_rel:{rel_type} {{{rel_key_field_name}: ${REL_ID_PARAM}}}]->(old_tgt) DELETE old_rel @@ -271,7 +268,7 @@ CALL {{ RETURN 0 AS _2 }} "#, - rel_label = spec.relationship_label, + rel_type = spec.relationship, rel_key_field_name = key_field.name, ); @@ -284,14 +281,14 @@ CALL {{ r#" MERGE (new_src:{src_node_label} {{{src_node_key_field_name}: ${SRC_ID_PARAM}}}) MERGE (new_tgt:{tgt_node_label} {{{tgt_node_key_field_name}: ${TGT_ID_PARAM}}}) -MERGE (new_src)-[new_rel:{rel_label} {{id: ${REL_ID_PARAM}}}]->(new_tgt) +MERGE (new_src)-[new_rel:{rel_type} {{id: ${REL_ID_PARAM}}}]->(new_tgt) {optional_set_rel_props} "#, src_node_label = spec.source_node.label, src_node_key_field_name = spec.source_node.field_name, tgt_node_label = spec.target_node.label, tgt_node_key_field_name = spec.target_node.field_name, - rel_label = spec.relationship_label, + rel_type = spec.relationship, ); Self { graph, @@ -357,16 +354,327 @@ impl ExportTargetExecutor for RelationshipStorageExecutor { } } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NodeSetupState { + label: String, + key_field_name: String, + key_constraint_name: String, +} + +impl NodeSetupState { + fn from_spec(spec: &NodeSpec) -> Self { + Self { + label: spec.label.clone(), + key_field_name: spec.field_name.clone(), + key_constraint_name: format!("n__{}__{}", spec.label, spec.field_name), + } + } + + fn is_compatible(&self, other: &Self) -> bool { + self.label == other.label && self.key_field_name == other.key_field_name + } +} #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RelationshipSetupState { key_field_name: String, + key_constraint_name: String, + src_node: NodeSetupState, + tgt_node: NodeSetupState, +} + +impl RelationshipSetupState { + fn from_spec(spec: &RelationshipSpec, key_field_name: String) -> Self { + Self { + key_field_name, + key_constraint_name: format!("r__{}__key", spec.relationship), + src_node: NodeSetupState::from_spec(&spec.source_node), + tgt_node: NodeSetupState::from_spec(&spec.target_node), + } + } + + fn is_compatible(&self, other: &Self) -> bool { + self.key_field_name == other.key_field_name + && self.src_node.is_compatible(&other.src_node) + && self.tgt_node.is_compatible(&other.tgt_node) + } +} + +#[derive(Debug)] +struct DataClearAction { + rel_type: String, + node_labels: IndexSet, } +#[derive(Debug)] +struct KeyConstraint { + label: String, + field_name: String, +} + +impl KeyConstraint { + fn from_node_setup_state(state: &NodeSetupState) -> Self { + Self { + label: state.label.clone(), + field_name: state.key_field_name.clone(), + } + } +} + +#[derive(Derivative)] +#[derivative(Debug)] +struct SetupStatusCheck { + #[derivative(Debug = "ignore")] + graph_pool: Arc, + conn_spec: Neo4jConnectionSpec, + + key: GraphRelationship, + desired_state: Option, + + data_clear: Option, + rel_constraint_to_delete: IndexSet, + rel_constraint_to_create: IndexMap, + node_constraint_to_delete: IndexSet, + node_constraint_to_create: IndexMap, + + change_type: SetupChangeType, +} + +impl SetupStatusCheck { + fn new( + key: GraphRelationship, + graph_pool: Arc, + conn_spec: Neo4jConnectionSpec, + desired_state: Option, + existing: CombinedState, + ) -> Self { + let data_clear = existing + .current + .as_ref() + .filter(|existing_current| { + !desired_state + .as_ref() + .map(|desired| desired.is_compatible(existing_current)) + .unwrap_or(false) + }) + .map(|existing_current| DataClearAction { + rel_type: key.relationship.clone(), + node_labels: std::iter::once(existing_current.src_node.label.clone()) + .chain(std::iter::once(existing_current.tgt_node.label.clone())) + .collect(), + }); + + let mut old_rel_constraints = IndexSet::new(); + let mut old_node_constraints = IndexSet::new(); + for existing_version in existing.possible_versions() { + old_rel_constraints.insert(existing_version.key_constraint_name.clone()); + old_node_constraints.insert(existing_version.src_node.key_constraint_name.clone()); + old_node_constraints.insert(existing_version.tgt_node.key_constraint_name.clone()); + } + + let mut rel_constraint_to_create = IndexMap::new(); + let mut node_constraint_to_create = IndexMap::new(); + if let Some(desired_state) = &desired_state { + let rel_constraint = KeyConstraint { + label: key.relationship.clone(), + field_name: desired_state.key_field_name.clone(), + }; + old_rel_constraints.swap_remove(&desired_state.key_constraint_name); + if !existing + .current + .as_ref() + .map(|c| rel_constraint.field_name == c.key_field_name) + .unwrap_or(false) + { + rel_constraint_to_create + .insert(desired_state.key_constraint_name.clone(), rel_constraint); + } + + old_node_constraints.swap_remove(&desired_state.src_node.key_constraint_name); + if !existing + .current + .as_ref() + .map(|c| { + c.src_node.is_compatible(&desired_state.src_node) + || c.tgt_node.is_compatible(&desired_state.tgt_node) + }) + .unwrap_or(false) + { + node_constraint_to_create.insert( + desired_state.src_node.key_constraint_name.clone(), + KeyConstraint::from_node_setup_state(&desired_state.src_node), + ); + } + + old_node_constraints.swap_remove(&desired_state.tgt_node.key_constraint_name); + if !existing + .current + .as_ref() + .map(|c| c.src_node.is_compatible(&desired_state.src_node)) + .unwrap_or(false) + { + node_constraint_to_create.insert( + desired_state.tgt_node.key_constraint_name.clone(), + KeyConstraint::from_node_setup_state(&desired_state.tgt_node), + ); + } + } + + let rel_constraint_to_delete = old_rel_constraints; + let node_constraint_to_delete = old_node_constraints; + + let change_type = if data_clear.is_none() + && rel_constraint_to_delete.is_empty() + && rel_constraint_to_create.is_empty() + && node_constraint_to_delete.is_empty() + && node_constraint_to_create.is_empty() + { + SetupChangeType::NoChange + } else if data_clear.is_none() + && rel_constraint_to_delete.is_empty() + && node_constraint_to_delete.is_empty() + { + SetupChangeType::Create + } else if rel_constraint_to_create.is_empty() && node_constraint_to_create.is_empty() { + SetupChangeType::Delete + } else { + SetupChangeType::Update + }; + + Self { + graph_pool, + conn_spec, + key, + desired_state, + data_clear, + rel_constraint_to_delete, + rel_constraint_to_create, + node_constraint_to_delete, + node_constraint_to_create, + change_type, + } + } +} + +#[async_trait] +impl ResourceSetupStatusCheck for SetupStatusCheck { + fn describe_resource(&self) -> String { + format!("Neo4j relationship {}", self.key.relationship) + } + + fn key(&self) -> &GraphRelationship { + &self.key + } + + fn desired_state(&self) -> Option<&RelationshipSetupState> { + self.desired_state.as_ref() + } + + fn describe_changes(&self) -> Vec { + let mut result = vec![]; + if let Some(data_clear) = &self.data_clear { + result.push(format!( + "Clear data for relationship {}; nodes {})", + data_clear.rel_type, + data_clear.node_labels.iter().join(", "), + )); + } + for name in &self.rel_constraint_to_delete { + result.push(format!("Delete relationship constraint {}", name)); + } + for (name, rel_constraint) in self.rel_constraint_to_create.iter() { + result.push(format!( + "Create UNIQUE CONSTRAINT {} ON RELATIONSHIP {} (key: {})", + name, rel_constraint.label, rel_constraint.field_name, + )); + } + for name in &self.node_constraint_to_delete { + result.push(format!("Delete node constraint {}", name)); + } + for (name, node_constraint) in self.node_constraint_to_create.iter() { + result.push(format!( + "Create UNIQUE CONSTRAINT {} ON NODE {} (key: {})", + name, node_constraint.label, node_constraint.field_name, + )); + } + result + } + + fn change_type(&self) -> SetupChangeType { + self.change_type + } + + async fn apply_change(&self) -> Result<()> { + let graph = self.graph_pool.get_graph(&self.conn_spec).await?; + + if let Some(data_clear) = &self.data_clear { + let delete_rel_query = neo4rs::query(&format!( + r#" + CALL {{ + MATCH ()-[r:{rel_type}]->() + WITH r + DELETE r + }} IN TRANSACTIONS + "#, + rel_type = data_clear.rel_type + )); + graph.run(delete_rel_query).await?; + + for node_label in &data_clear.node_labels { + let delete_node_query = neo4rs::query(&format!( + r#" + CALL {{ + MATCH (n:{node_label}) + WHERE NOT (n)--() + DELETE n + }} IN TRANSACTIONS + "#, + node_label = node_label + )); + graph.run(delete_node_query).await?; + } + } + + for name in + (self.rel_constraint_to_delete.iter()).chain(self.node_constraint_to_delete.iter()) + { + graph + .run(neo4rs::query(&format!("DROP CONSTRAINT {name}"))) + .await?; + } + + for (name, constraint) in self.node_constraint_to_create.iter() { + graph + .run(neo4rs::query(&format!( + "CREATE CONSTRAINT {name} IF NOT EXISTS FOR (n:{label}) REQUIRE n.{field_name} IS UNIQUE", + label = constraint.label, + field_name = constraint.field_name + ))) + .await?; + } + + for (name, constraint) in self.rel_constraint_to_create.iter() { + graph + .run(neo4rs::query(&format!( + "CREATE CONSTRAINT {name} IF NOT EXISTS FOR ()-[e:{label}]-() REQUIRE e.{field_name} IS UNIQUE", + label = constraint.label, + field_name = constraint.field_name + ))) + .await?; + } + Ok(()) + } +} /// Factory for Neo4j relationships pub struct RelationshipFactory { graph_pool: Arc, } +impl RelationshipFactory { + pub fn new(graph_pool: Arc) -> Self { + Self { graph_pool } + } +} + impl StorageFactoryBase for RelationshipFactory { type Spec = RelationshipSpec; type SetupState = RelationshipSetupState; @@ -383,7 +691,7 @@ impl StorageFactoryBase for RelationshipFactory { key_fields_schema: Vec, value_fields_schema: Vec, _storage_options: IndexOptions, - _context: Arc, + context: Arc, ) -> Result> { let setup_key = GraphRelationship::from_spec(&spec); let key_field_schema = { @@ -392,10 +700,8 @@ impl StorageFactoryBase for RelationshipFactory { } key_fields_schema.into_iter().next().unwrap() }; - let desired_setup_state = RelationshipSetupState { - key_field_name: key_field_schema.name.clone(), - }; - + let desired_setup_state = + RelationshipSetupState::from_spec(&spec, key_field_schema.name.clone()); let mut src_field_info = None; let mut tgt_field_info = None; let mut rel_value_fields_info = vec![]; @@ -418,8 +724,11 @@ impl StorageFactoryBase for RelationshipFactory { let tgt_field_info = tgt_field_info.ok_or_else(|| { anyhow::anyhow!("Target key field {} not found", spec.target_node.field_name) })?; + let conn_spec = context + .auth_registry + .get::(&spec.connection)?; let executor = async move { - let graph = self.graph_pool.get_graph(&spec.connection).await?; + let graph = self.graph_pool.get_graph(&conn_spec).await?; let executor = Arc::new(RelationshipStorageExecutor::new( graph, spec, @@ -440,12 +749,20 @@ impl StorageFactoryBase for RelationshipFactory { fn check_setup_status( &self, - _key: GraphRelationship, - _desired: Option, - _existing: CombinedState, + key: GraphRelationship, + desired: Option, + existing: CombinedState, + auth_registry: &Arc, ) -> Result + 'static> { - Err(anyhow::anyhow!("Not supported")) as Result + let conn_spec = auth_registry.get::(&key.connection)?; + Ok(SetupStatusCheck::new( + key, + self.graph_pool.clone(), + conn_spec, + desired, + existing, + )) } fn check_state_compatibility( @@ -453,7 +770,7 @@ impl StorageFactoryBase for RelationshipFactory { desired: &RelationshipSetupState, existing: &RelationshipSetupState, ) -> Result { - let compatibility = if desired.key_field_name == existing.key_field_name { + let compatibility = if desired.is_compatible(existing) { SetupStateCompatibility::Compatible } else { SetupStateCompatibility::NotCompatible diff --git a/src/ops/storages/postgres.rs b/src/ops/storages/postgres.rs index c50eb587..5acd228c 100644 --- a/src/ops/storages/postgres.rs +++ b/src/ops/storages/postgres.rs @@ -957,6 +957,7 @@ impl StorageFactoryBase for Arc { key: TableId, desired: Option, existing: setup::CombinedState, + _auth_registry: &Arc, ) -> Result + 'static> { Ok(SetupStatusCheck::new(self.clone(), key, desired, existing)) } diff --git a/src/prelude.rs b/src/prelude.rs index 66fb9931..9fa14e3b 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -30,3 +30,5 @@ pub(crate) use crate::{api_bail, api_error}; pub(crate) use anyhow::{anyhow, bail}; pub(crate) use async_stream::{stream, try_stream}; pub(crate) use log::{debug, error, info, trace, warn}; + +pub(crate) use derivative::Derivative; diff --git a/src/py/mod.rs b/src/py/mod.rs index 12b5a1f8..dfa20f2b 100644 --- a/src/py/mod.rs +++ b/src/py/mod.rs @@ -282,7 +282,8 @@ fn sync_setup() -> PyResult { let lib_context = get_lib_context().into_py_result()?; let flows = lib_context.flows.lock().unwrap(); let all_setup_states = lib_context.all_setup_states.read().unwrap(); - let setup_status = setup::sync_setup(&flows, &all_setup_states).into_py_result()?; + let setup_status = setup::sync_setup(&flows, &all_setup_states, &lib_context.auth_registry) + .into_py_result()?; Ok(SetupStatusCheck(setup_status)) } @@ -290,7 +291,8 @@ fn sync_setup() -> PyResult { fn drop_setup(flow_names: Vec) -> PyResult { let lib_context = get_lib_context().into_py_result()?; let all_setup_states = lib_context.all_setup_states.read().unwrap(); - let setup_status = setup::drop_setup(flow_names, &all_setup_states).into_py_result()?; + let setup_status = setup::drop_setup(flow_names, &all_setup_states, &lib_context.auth_registry) + .into_py_result()?; Ok(SetupStatusCheck(setup_status)) } diff --git a/src/setup/driver.rs b/src/setup/driver.rs index f456527c..711f7cf2 100644 --- a/src/setup/driver.rs +++ b/src/setup/driver.rs @@ -213,6 +213,7 @@ fn group_resource_states<'a>( pub fn check_flow_setup_status( desired_state: Option<&FlowSetupState>, existing_state: Option<&FlowSetupState>, + auth_registry: &Arc, ) -> Result { let metadata_change = diff_state( existing_state.map(|e| &e.metadata), @@ -288,6 +289,7 @@ pub fn check_flow_setup_status( &resource_id.key, desired_state, existing_without_setup_by_user, + auth_registry, )?, _ => bail!("Unexpected factory type for {}", resource_id.target_kind), }; @@ -307,13 +309,18 @@ pub fn check_flow_setup_status( pub fn sync_setup( flows: &BTreeMap>, all_setup_state: &AllSetupState, + auth_registry: &Arc, ) -> Result { let mut flow_status_checks = BTreeMap::new(); for (flow_name, flow_context) in flows { let existing_state = all_setup_state.flows.get(flow_name); flow_status_checks.insert( flow_name.clone(), - check_flow_setup_status(Some(&flow_context.flow.desired_state), existing_state)?, + check_flow_setup_status( + Some(&flow_context.flow.desired_state), + existing_state, + auth_registry, + )?, ); } Ok(AllSetupStatusCheck { @@ -327,6 +334,7 @@ pub fn sync_setup( pub fn drop_setup( flow_names: impl IntoIterator, all_setup_state: &AllSetupState, + auth_registry: &Arc, ) -> Result { if !all_setup_state.has_metadata_table { api_bail!("CocoIndex metadata table is missing."); @@ -336,7 +344,7 @@ pub fn drop_setup( if let Some(existing_state) = all_setup_state.flows.get(&flow_name) { flow_status_checks.insert( flow_name, - check_flow_setup_status(None, Some(existing_state))?, + check_flow_setup_status(None, Some(existing_state), auth_registry)?, ); } } diff --git a/src/setup/states.rs b/src/setup/states.rs index df849704..e476bdf4 100644 --- a/src/setup/states.rs +++ b/src/setup/states.rs @@ -230,9 +230,7 @@ where fn change_type(&self) -> SetupChangeType; - async fn apply_change(&self) -> Result<()> { - Ok(()) - } + async fn apply_change(&self) -> Result<()>; } #[async_trait]