diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index d96518c6..e21ea73a 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -1,6 +1,4 @@ -use std::collections::{BTreeMap, HashSet}; -use std::sync::Mutex; -use std::{collections::HashMap, future::Future, sync::Arc}; +use crate::prelude::*; use super::plan::*; use crate::execution::db_tracking_setup; @@ -11,16 +9,12 @@ use crate::setup::{ }; use crate::utils::fingerprint::Fingerprinter; use crate::{ - api_bail, api_error, - base::{schema::*, spec::*, value}, + base::{schema::*, spec::*}, ops::{interface::*, registry::*}, utils::immutable::RefList, }; -use anyhow::{anyhow, bail, Context, Result}; use futures::future::{try_join3, BoxFuture}; use futures::{future::try_join_all, FutureExt}; -use indexmap::IndexMap; -use log::{trace, warn}; #[derive(Debug)] pub(super) enum ValueTypeBuilder { @@ -595,6 +589,14 @@ fn add_collector( }) } +struct ExportDataFieldsInfo { + local_collector_ref: AnalyzedLocalCollectorReference, + primary_key_def: AnalyzedPrimaryKeyDef, + primary_key_type: ValueType, + value_fields_idx: Vec, + value_stable: bool, +} + impl AnalyzerContext<'_> { pub(super) fn analyze_import_op( &self, @@ -816,194 +818,227 @@ impl AnalyzerContext<'_> { Ok(result_fut) } - pub(super) fn analyze_export_op( + fn analyze_export_op( &self, - scope: &mut DataScopeBuilder, - export_op: NamedSpec, - export_factory: Arc, - setup_state: Option<&mut FlowSetupState>, + export_op: &NamedSpec, + export_factory: &dyn ExportTargetFactory, + data_coll_output: ExportDataCollectionBuildOutput, + data_fields_info: ExportDataFieldsInfo, + flow_setup_state: &mut FlowSetupState, existing_target_states: &HashMap<&ResourceIdentifier, Vec<&TargetSetupState>>, ) -> Result> + Send> { - let export_target = export_op.spec.target; - let spec = serde_json::Value::Object(export_target.spec.clone()); - let (local_collector_ref, collector_schema) = - scope.consume_collector(&export_op.spec.collector_name)?; - let ( - key_fields_schema, - value_fields_schema, - primary_key_def, - primary_key_type, - value_fields_idx, - ) = match &export_op.spec.index_options.primary_key_fields { - Some(fields) => { - let pk_fields_idx = fields - .iter() - .map(|f| { - collector_schema - .fields - .iter() - .position(|field| &field.name == f) - .map(|idx| idx as u32) - .ok_or_else(|| anyhow!("field not found: {}", f)) - }) - .collect::>>()?; - - let key_fields_schema = pk_fields_idx - .iter() - .map(|idx| collector_schema.fields[*idx as usize].clone()) - .collect::>(); - let primary_key_type = if pk_fields_idx.len() == 1 { - key_fields_schema[0].value_type.typ.clone() - } else { - ValueType::Struct(StructSchema { - fields: Arc::from(key_fields_schema.clone()), - description: None, - }) - }; - let mut value_fields_schema: Vec = vec![]; - let mut value_fields_idx = vec![]; - for (idx, field) in collector_schema.fields.iter().enumerate() { - if !pk_fields_idx.contains(&(idx as u32)) { - value_fields_schema.push(field.clone()); - value_fields_idx.push(idx as u32); - } - } - ( - key_fields_schema, - value_fields_schema, - AnalyzedPrimaryKeyDef::Fields(pk_fields_idx), - primary_key_type, - value_fields_idx, - ) - } - None => { - // TODO: Support auto-generate primary key - api_bail!("Primary key fields must be specified") - } - }; - - let setup_output = export_factory.clone().build( - export_op.name.clone(), - spec, - key_fields_schema, - value_fields_schema, - export_op.spec.index_options, - self.flow_ctx.clone(), - )?; let resource_id = ResourceIdentifier { - key: setup_output.setup_key.clone(), - target_kind: export_target.kind.clone(), + key: data_coll_output.setup_key.clone(), + target_kind: export_op.spec.target.kind.clone(), }; let existing_target_states = existing_target_states.get(&resource_id); - let target_id = setup_state - .map(|setup_state| -> Result { - let mut compatible_target_ids = HashSet::>::new(); - let mut reusable_schema_version_ids = HashSet::>::new(); - for existing_state in existing_target_states.iter().flat_map(|v| v.iter()) { - let compatibility = - if export_op.spec.setup_by_user == existing_state.common.setup_by_user { - export_factory.check_state_compatibility( - &setup_output.desired_setup_state, - &existing_state.state, - )? - } else { - SetupStateCompatibility::NotCompatible - }; - let compatible_target_id = - if compatibility != SetupStateCompatibility::NotCompatible { - reusable_schema_version_ids.insert( - (compatibility == SetupStateCompatibility::Compatible) - .then_some(existing_state.common.schema_version_id), - ); - Some(existing_state.common.target_id) - } else { - None - }; - compatible_target_ids.insert(compatible_target_id); - } - - let target_id = if compatible_target_ids.len() == 1 { - compatible_target_ids.into_iter().next().flatten() - } else { - if compatible_target_ids.len() > 1 { - warn!("Multiple target states with the same key schema found"); - } - None - }; - let target_id = target_id.unwrap_or_else(|| { - setup_state.metadata.last_target_id += 1; - setup_state.metadata.last_target_id - }); - let max_schema_version_id = existing_target_states - .iter() - .flat_map(|v| v.iter()) - .map(|s| s.common.max_schema_version_id) - .max() - .unwrap_or(0); - let schema_version_id = if reusable_schema_version_ids.len() == 1 { - reusable_schema_version_ids - .into_iter() - .next() - .unwrap() - .unwrap_or(max_schema_version_id + 1) + let mut compatible_target_ids = HashSet::>::new(); + let mut reusable_schema_version_ids = HashSet::>::new(); + for existing_state in existing_target_states.iter().flat_map(|v| v.iter()) { + let compatibility = + if export_op.spec.setup_by_user == existing_state.common.setup_by_user { + export_factory.check_state_compatibility( + &data_coll_output.desired_setup_state, + &existing_state.state, + )? } else { - max_schema_version_id + 1 + SetupStateCompatibility::NotCompatible }; - match setup_state.targets.entry(resource_id) { - indexmap::map::Entry::Occupied(entry) => { - api_bail!( - "Target resource already exists: kind = {}, key = {}", - entry.key().target_kind, - entry.key().key - ); - } - indexmap::map::Entry::Vacant(entry) => { - entry.insert(TargetSetupState { - common: TargetSetupStateCommon { - target_id, - schema_version_id, - max_schema_version_id: max_schema_version_id.max(schema_version_id), - setup_by_user: export_op.spec.setup_by_user, - }, - state: setup_output.desired_setup_state, - }); - } - } - Ok(target_id) - }) - .transpose()?; + let compatible_target_id = if compatibility != SetupStateCompatibility::NotCompatible { + reusable_schema_version_ids.insert( + (compatibility == SetupStateCompatibility::Compatible) + .then_some(existing_state.common.schema_version_id), + ); + Some(existing_state.common.target_id) + } else { + None + }; + compatible_target_ids.insert(compatible_target_id); + } - let value_stable = collector_schema - .auto_uuid_field_idx - .map(|uuid_idx| match &primary_key_def { - AnalyzedPrimaryKeyDef::Fields(fields) => fields.contains(&uuid_idx), - }) - .unwrap_or(false); + let target_id = if compatible_target_ids.len() == 1 { + compatible_target_ids.into_iter().next().flatten() + } else { + if compatible_target_ids.len() > 1 { + warn!("Multiple target states with the same key schema found"); + } + None + }; + let target_id = target_id.unwrap_or_else(|| { + flow_setup_state.metadata.last_target_id += 1; + flow_setup_state.metadata.last_target_id + }); + let max_schema_version_id = existing_target_states + .iter() + .flat_map(|v| v.iter()) + .map(|s| s.common.max_schema_version_id) + .max() + .unwrap_or(0); + let schema_version_id = if reusable_schema_version_ids.len() == 1 { + reusable_schema_version_ids + .into_iter() + .next() + .unwrap() + .unwrap_or(max_schema_version_id + 1) + } else { + max_schema_version_id + 1 + }; + match flow_setup_state.targets.entry(resource_id) { + indexmap::map::Entry::Occupied(entry) => { + api_bail!( + "Target resource already exists: kind = {}, key = {}", + entry.key().target_kind, + entry.key().key + ); + } + indexmap::map::Entry::Vacant(entry) => { + entry.insert(TargetSetupState { + common: TargetSetupStateCommon { + target_id, + schema_version_id, + max_schema_version_id: max_schema_version_id.max(schema_version_id), + setup_by_user: export_op.spec.setup_by_user, + }, + state: data_coll_output.desired_setup_state, + }); + } + } + let op_name = export_op.name.clone(); Ok(async move { - trace!("Start building executor for export op `{}`", export_op.name); - let executors = setup_output + trace!("Start building executor for export op `{op_name}`"); + let executors = data_coll_output .executors .await - .with_context(|| format!("Analyzing export op: {}", export_op.name))?; - trace!( - "Finished building executor for export op `{}`", - export_op.name - ); - let name = export_op.name; + .with_context(|| format!("Analyzing export op: {op_name}"))?; + trace!("Finished building executor for export op `{op_name}`"); Ok(AnalyzedExportOp { - name, - target_id: target_id.unwrap_or_default(), - input: local_collector_ref, + name: op_name, + target_id, + input: data_fields_info.local_collector_ref, export_context: executors.export_context, query_target: executors.query_target, - primary_key_def, - primary_key_type, - value_fields: value_fields_idx, - value_stable, + primary_key_def: data_fields_info.primary_key_def, + primary_key_type: data_fields_info.primary_key_type, + value_fields: data_fields_info.value_fields_idx, + value_stable: data_fields_info.value_stable, }) }) } + fn analyze_export_op_group( + &self, + scope: &mut DataScopeBuilder, + flow_inst: &FlowInstanceSpec, + export_op_group: &AnalyzedExportTargetOpGroup, + flow_setup_state: &mut FlowSetupState, + existing_target_states: &HashMap<&ResourceIdentifier, Vec<&TargetSetupState>>, + ) -> Result> + Send>> { + let mut collection_specs = Vec::::new(); + let mut data_fields_infos = Vec::::new(); + for idx in export_op_group.op_idx.iter() { + let export_op = &flow_inst.export_ops[*idx]; + let (local_collector_ref, collector_schema) = + scope.consume_collector(&export_op.spec.collector_name)?; + let (key_fields_schema, value_fields_schema, data_collection_info) = + match &export_op.spec.index_options.primary_key_fields { + Some(fields) => { + let pk_fields_idx = fields + .iter() + .map(|f| { + collector_schema + .fields + .iter() + .position(|field| &field.name == f) + .map(|idx| idx as u32) + .ok_or_else(|| anyhow!("field not found: {}", f)) + }) + .collect::>>()?; + + let key_fields_schema = pk_fields_idx + .iter() + .map(|idx| collector_schema.fields[*idx as usize].clone()) + .collect::>(); + let primary_key_type = if pk_fields_idx.len() == 1 { + key_fields_schema[0].value_type.typ.clone() + } else { + ValueType::Struct(StructSchema { + fields: Arc::from(key_fields_schema.clone()), + description: None, + }) + }; + let mut value_fields_schema: Vec = vec![]; + let mut value_fields_idx = vec![]; + for (idx, field) in collector_schema.fields.iter().enumerate() { + if !pk_fields_idx.contains(&(idx as u32)) { + value_fields_schema.push(field.clone()); + value_fields_idx.push(idx as u32); + } + } + let value_stable = collector_schema + .auto_uuid_field_idx + .as_ref() + .map(|uuid_idx| pk_fields_idx.contains(uuid_idx)) + .unwrap_or(false); + ( + key_fields_schema, + value_fields_schema, + ExportDataFieldsInfo { + local_collector_ref, + primary_key_def: AnalyzedPrimaryKeyDef::Fields(pk_fields_idx), + primary_key_type, + value_fields_idx, + value_stable, + }, + ) + } + None => { + // TODO: Support auto-generate primary key + api_bail!("Primary key fields must be specified") + } + }; + collection_specs.push(interface::ExportDataCollectionSpec { + name: export_op.name.clone(), + spec: serde_json::Value::Object(export_op.spec.target.spec.clone()), + key_fields_schema, + value_fields_schema, + index_options: export_op.spec.index_options.clone(), + }); + data_fields_infos.push(data_collection_info); + } + let (data_collections_output, _) = export_op_group.target_factory.clone().build( + collection_specs, + vec![], + self.flow_ctx.clone(), + )?; + if data_collections_output.len() != data_fields_infos.len() { + api_bail!( + "Data collection output length mismatch: expect {}, got {}", + data_fields_infos.len(), + data_collections_output.len() + ); + } + + let result = export_op_group + .op_idx + .iter() + .zip(data_collections_output.into_iter()) + .zip(data_fields_infos.into_iter()) + .map(|((idx, data_coll_output), data_fields_info)| { + let export_op = &flow_inst.export_ops[*idx]; + Ok(self.analyze_export_op( + export_op, + export_op_group.target_factory.as_ref(), + data_coll_output, + data_fields_info, + flow_setup_state, + existing_target_states, + )?) + }) + .collect::>>()?; + + Ok(result) + } + fn analyze_op_scope( &self, scope: &mut ExecutionScope<'_>, @@ -1128,7 +1163,6 @@ pub fn analyze_flow( )?; let mut target_groups = IndexMap::::new(); - let mut export_ops_futs = vec![]; for (idx, export_op) in flow_inst.export_ops.iter().enumerate() { let target_kind = export_op.spec.target.kind.clone(); let export_factory = match registry.get(&target_kind) { @@ -1140,13 +1174,6 @@ pub fn analyze_flow( )) } }; - export_ops_futs.push(analyzer_ctx.analyze_export_op( - root_exec_scope.data, - export_op.clone(), - export_factory.clone(), - Some(&mut setup_state), - &target_states_by_name_type, - )?); target_groups .entry(target_kind) .or_insert_with(|| AnalyzedExportTargetOpGroup { @@ -1157,6 +1184,17 @@ pub fn analyze_flow( .push(idx); } + let mut export_ops_futs = vec![]; + for group in target_groups.values() { + export_ops_futs.extend(analyzer_ctx.analyze_export_op_group( + root_exec_scope.data, + flow_inst, + group, + &mut setup_state, + &target_states_by_name_type, + )?); + } + let tracking_table_setup = setup_state.tracking_table.clone(); let data_schema = root_data_scope.into_data_schema()?; let logic_fingerprint = Fingerprinter::default() diff --git a/src/ops/factory_bases.rs b/src/ops/factory_bases.rs index 2e509ab0..4b8385de 100644 --- a/src/ops/factory_bases.rs +++ b/src/ops/factory_bases.rs @@ -269,15 +269,23 @@ pub struct TypedExportTargetExecutors { pub query_target: Option>, } -pub struct TypedExportTargetBuildOutput { +pub struct TypedExportDataCollectionBuildOutput { pub executors: BoxFuture<'static, Result>>, pub setup_key: F::Key, pub desired_setup_state: F::SetupState, } +pub struct TypedExportDataCollectionSpec { + pub name: String, + pub spec: F::Spec, + pub key_fields_schema: Vec, + pub value_fields_schema: Vec, + pub index_options: IndexOptions, +} #[async_trait] pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static { type Spec: DeserializeOwned + Send + Sync; + type DeclarationSpec: DeserializeOwned + Send + Sync; type Key: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash + Send + Sync; type SetupState: Debug + Clone + Serialize + DeserializeOwned + Send + Sync; type ExportContext: Send + Sync + 'static; @@ -286,13 +294,13 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static { fn build( self: Arc, - name: String, - spec: Self::Spec, - key_fields_schema: Vec, - value_fields_schema: Vec, - storage_options: IndexOptions, + data_collections: Vec>, + declarations: Vec, context: Arc, - ) -> Result>; + ) -> Result<( + Vec>, + Vec<(Self::Key, Self::SetupState)>, + )>; /// Will not be called if it's setup by user. /// It returns an error if the target only supports setup by user. @@ -332,35 +340,56 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static { impl ExportTargetFactory for T { fn build( self: Arc, - name: String, - spec: serde_json::Value, - key_fields_schema: Vec, - value_fields_schema: Vec, - storage_options: IndexOptions, + data_collections: Vec, + declarations: Vec, context: Arc, - ) -> Result { - let spec: T::Spec = serde_json::from_value(spec)?; - let build_output = StorageFactoryBase::build( + ) -> Result<( + Vec, + Vec<(serde_json::Value, serde_json::Value)>, + )> { + let (data_coll_output, decl_output) = StorageFactoryBase::build( self, - name, - spec, - key_fields_schema, - value_fields_schema, - storage_options, + data_collections + .into_iter() + .map(|d| { + anyhow::Ok(TypedExportDataCollectionSpec { + name: d.name, + spec: serde_json::from_value(d.spec)?, + key_fields_schema: d.key_fields_schema, + value_fields_schema: d.value_fields_schema, + index_options: d.index_options, + }) + }) + .collect::>>()?, + declarations + .into_iter() + .map(|d| anyhow::Ok(serde_json::from_value(d)?)) + .collect::>>()?, context, )?; - let executors = async move { - let executors = build_output.executors.await?; - Ok(interface::ExportTargetExecutors { - export_context: executors.export_context, - query_target: executors.query_target, + + let data_coll_output = data_coll_output + .into_iter() + .map(|d| { + Ok(interface::ExportDataCollectionBuildOutput { + executors: async move { + let executors = d.executors.await?; + Ok(interface::ExportTargetExecutors { + export_context: executors.export_context, + query_target: executors.query_target, + }) + } + .boxed(), + setup_key: serde_json::to_value(d.setup_key)?, + desired_setup_state: serde_json::to_value(d.desired_setup_state)?, + }) }) - }; - Ok(interface::ExportTargetBuildOutput { - setup_key: serde_json::to_value(build_output.setup_key)?, - desired_setup_state: serde_json::to_value(build_output.desired_setup_state)?, - executors: executors.boxed(), - }) + .collect::>>()?; + let decl_output = decl_output + .into_iter() + .map(|(key, state)| Ok((serde_json::to_value(key)?, serde_json::to_value(state)?))) + .collect::>>()?; + Ok((data_coll_output, decl_output)) } fn check_setup_status( diff --git a/src/ops/interface.rs b/src/ops/interface.rs index 67aec53e..0f12da50 100644 --- a/src/ops/interface.rs +++ b/src/ops/interface.rs @@ -162,23 +162,31 @@ pub struct ExportTargetExecutors { pub export_context: Arc, pub query_target: Option>, } -pub struct ExportTargetBuildOutput { +pub struct ExportDataCollectionBuildOutput { pub executors: BoxFuture<'static, Result>, pub setup_key: serde_json::Value, pub desired_setup_state: serde_json::Value, } +pub struct ExportDataCollectionSpec { + pub name: String, + pub spec: serde_json::Value, + pub key_fields_schema: Vec, + pub value_fields_schema: Vec, + pub index_options: IndexOptions, +} + #[async_trait] pub trait ExportTargetFactory: Send + Sync { fn build( self: Arc, - name: String, - spec: serde_json::Value, - key_fields_schema: Vec, - value_fields_schema: Vec, - storage_options: IndexOptions, + data_collections: Vec, + declarations: Vec, context: Arc, - ) -> Result; + ) -> Result<( + Vec, + Vec<(serde_json::Value, serde_json::Value)>, + )>; /// Will not be called if it's setup by user. /// It returns an error if the target only supports setup by user. diff --git a/src/ops/sdk.rs b/src/ops/sdk.rs index 803c67ba..f0a36b9f 100644 --- a/src/ops/sdk.rs +++ b/src/ops/sdk.rs @@ -11,7 +11,7 @@ pub use crate::base::spec::*; pub use crate::base::value::*; // Disambiguate the ExportTargetBuildOutput type. -pub use super::factory_bases::TypedExportTargetBuildOutput; +pub use super::factory_bases::TypedExportDataCollectionBuildOutput; /// Defined for all types convertible to ValueType, to ease creation for ValueType in various operation factories. pub trait TypeCore { fn into_type(self) -> ValueType; diff --git a/src/ops/storages/neo4j.rs b/src/ops/storages/neo4j.rs index b07887d1..098b5f15 100644 --- a/src/ops/storages/neo4j.rs +++ b/src/ops/storages/neo4j.rs @@ -1122,6 +1122,7 @@ impl<'a> DependentNodeLabelAnalyzer<'a> { #[async_trait] impl StorageFactoryBase for Factory { type Spec = Spec; + type DeclarationSpec = (); type SetupState = RelationshipSetupState; type Key = GraphElement; type ExportContext = ExportContext; @@ -1132,82 +1133,89 @@ impl StorageFactoryBase for Factory { fn build( self: Arc, - _name: String, - spec: Spec, - key_fields_schema: Vec, - value_fields_schema: Vec, - index_options: IndexOptions, + data_collections: Vec>, + _declarations: Vec<()>, context: Arc, - ) -> Result> { - let setup_key = GraphElement::from_spec(&spec); - - let (value_fields_info, rel_end_label_info) = match &spec.mapping { - GraphElementMapping::Node(_) => ( - value_fields_schema - .into_iter() - .enumerate() - .map(|(field_idx, field_schema)| AnalyzedGraphFieldMapping { - field_idx, - field_name: field_schema.name.clone(), - value_type: field_schema.value_type.typ.clone(), - }) - .collect(), - None, - ), - GraphElementMapping::Relationship(rel_spec) => { - let mut src_label_analyzer = - DependentNodeLabelAnalyzer::new(&rel_spec, &rel_spec.source)?; - let mut tgt_label_analyzer = - DependentNodeLabelAnalyzer::new(&rel_spec, &rel_spec.target)?; - let mut value_fields_info = vec![]; - for (field_idx, field_schema) in value_fields_schema.iter().enumerate() { - if !src_label_analyzer.process_field(field_idx, field_schema) - && !tgt_label_analyzer.process_field(field_idx, field_schema) - { - value_fields_info.push(AnalyzedGraphFieldMapping { - field_idx, - field_name: field_schema.name.clone(), - value_type: field_schema.value_type.typ.clone(), - }); + ) -> Result<( + Vec>, + Vec<(GraphElement, RelationshipSetupState)>, + )> { + let data_coll_output = data_collections + .into_iter() + .map(|d| { + let setup_key = GraphElement::from_spec(&d.spec); + + let (value_fields_info, rel_end_label_info) = match &d.spec.mapping { + GraphElementMapping::Node(_) => ( + d.value_fields_schema + .into_iter() + .enumerate() + .map(|(field_idx, field_schema)| AnalyzedGraphFieldMapping { + field_idx, + field_name: field_schema.name.clone(), + value_type: field_schema.value_type.typ.clone(), + }) + .collect(), + None, + ), + GraphElementMapping::Relationship(rel_spec) => { + let mut src_label_analyzer = + DependentNodeLabelAnalyzer::new(&rel_spec, &rel_spec.source)?; + let mut tgt_label_analyzer = + DependentNodeLabelAnalyzer::new(&rel_spec, &rel_spec.target)?; + let mut value_fields_info = vec![]; + for (field_idx, field_schema) in d.value_fields_schema.iter().enumerate() { + if !src_label_analyzer.process_field(field_idx, field_schema) + && !tgt_label_analyzer.process_field(field_idx, field_schema) + { + value_fields_info.push(AnalyzedGraphFieldMapping { + field_idx, + field_name: field_schema.name.clone(), + value_type: field_schema.value_type.typ.clone(), + }); + } + } + let src_label_info = src_label_analyzer.build()?; + let tgt_label_info = tgt_label_analyzer.build()?; + (value_fields_info, Some((src_label_info, tgt_label_info))) } - } - let src_label_info = src_label_analyzer.build()?; - let tgt_label_info = tgt_label_analyzer.build()?; - (value_fields_info, Some((src_label_info, tgt_label_info))) - } - }; - - let desired_setup_state = RelationshipSetupState::new( - &spec, - key_fields_schema.iter().map(|f| f.name.clone()).collect(), - &index_options, - &value_fields_info, - rel_end_label_info.as_ref(), - )?; + }; - let conn_spec = context - .auth_registry - .get::(&spec.connection)?; - let executors = async move { - let graph = self.graph_pool.get_graph(&conn_spec).await?; - let executor = Arc::new(ExportContext::new( - graph, - spec, - key_fields_schema, - value_fields_info, - rel_end_label_info, - )?); - Ok(TypedExportTargetExecutors { - export_context: executor, - query_target: None, + let desired_setup_state = RelationshipSetupState::new( + &d.spec, + d.key_fields_schema.iter().map(|f| f.name.clone()).collect(), + &d.index_options, + &value_fields_info, + rel_end_label_info.as_ref(), + )?; + + let conn_spec = context + .auth_registry + .get::(&d.spec.connection)?; + let factory = self.clone(); + let executors = async move { + let graph = factory.graph_pool.get_graph(&conn_spec).await?; + let executor = Arc::new(ExportContext::new( + graph, + d.spec, + d.key_fields_schema, + value_fields_info, + rel_end_label_info, + )?); + Ok(TypedExportTargetExecutors { + export_context: executor, + query_target: None, + }) + } + .boxed(); + Ok(TypedExportDataCollectionBuildOutput { + executors, + setup_key, + desired_setup_state, + }) }) - } - .boxed(); - Ok(TypedExportTargetBuildOutput { - executors, - setup_key, - desired_setup_state, - }) + .collect::>>()?; + Ok((data_coll_output, vec![])) } fn check_setup_status( diff --git a/src/ops/storages/postgres.rs b/src/ops/storages/postgres.rs index ebbea877..3af2e655 100644 --- a/src/ops/storages/postgres.rs +++ b/src/ops/storages/postgres.rs @@ -6,6 +6,7 @@ use crate::service::error::{shared_ok, SharedError, SharedResultExt}; use crate::setup; use crate::utils::db::ValidIdentifier; use async_trait::async_trait; +use bytes::Bytes; use derivative::Derivative; use futures::future::{BoxFuture, Shared}; use futures::FutureExt; @@ -17,7 +18,6 @@ use sqlx::postgres::PgRow; use sqlx::{PgPool, Row}; use std::ops::Bound; use uuid::Uuid; -use bytes::Bytes; #[derive(Debug, Deserialize)] pub struct Spec { @@ -856,8 +856,7 @@ impl setup::ResourceSetupStatusCheck for SetupStatusCheck { TableUpsertionAction::Create { keys, values } => { let mut fields = (keys .iter() - .map(|(k, v)| format!("{} {} NOT NULL", k, to_column_type_sql(v))) - ) + .map(|(k, v)| format!("{} {} NOT NULL", k, to_column_type_sql(v)))) .chain( values .iter() @@ -906,6 +905,7 @@ impl setup::ResourceSetupStatusCheck for SetupStatusCheck { #[async_trait] impl StorageFactoryBase for Arc { type Spec = Spec; + type DeclarationSpec = (); type SetupState = SetupState; type Key = TableId; type ExportContext = ExportContext; @@ -916,47 +916,55 @@ impl StorageFactoryBase for Arc { fn build( self: Arc, - name: String, - spec: Spec, - key_fields_schema: Vec, - value_fields_schema: Vec, - storage_options: IndexOptions, + data_collections: Vec>, + _declarations: Vec<()>, context: Arc, - ) -> Result> { - let table_id = TableId { - database_url: spec.database_url.clone(), - table_name: spec - .table_name - .unwrap_or_else(|| format!("{}__{}", context.flow_instance_name, name)), - }; - let setup_state = SetupState::new( - &table_id, - &key_fields_schema, - &value_fields_schema, - &storage_options, - ); - let table_name = table_id.table_name.clone(); - let export_context = Arc::new(ExportContext::new( - spec.database_url.clone(), - table_name, - key_fields_schema, - value_fields_schema, - )?); - let executors = async move { - let query_target = Arc::new(PostgresQueryTarget { - db_pool: self.get_db_pool(&spec.database_url).await?, - context: export_context.clone(), - }); - Ok(TypedExportTargetExecutors { - export_context: export_context.clone(), - query_target: Some(query_target as Arc), + ) -> Result<( + Vec>, + Vec<(TableId, SetupState)>, + )> { + let data_coll_output = data_collections + .into_iter() + .map(|d| { + let table_id = TableId { + database_url: d.spec.database_url.clone(), + table_name: d + .spec + .table_name + .unwrap_or_else(|| format!("{}__{}", context.flow_instance_name, d.name)), + }; + let setup_state = SetupState::new( + &table_id, + &d.key_fields_schema, + &d.value_fields_schema, + &d.index_options, + ); + let table_name = table_id.table_name.clone(); + let export_context = Arc::new(ExportContext::new( + d.spec.database_url.clone(), + table_name, + d.key_fields_schema, + d.value_fields_schema, + )?); + let factory = self.clone(); + let executors = async move { + let query_target = Arc::new(PostgresQueryTarget { + db_pool: factory.get_db_pool(&d.spec.database_url).await?, + context: export_context.clone(), + }); + Ok(TypedExportTargetExecutors { + export_context: export_context.clone(), + query_target: Some(query_target as Arc), + }) + }; + Ok(TypedExportDataCollectionBuildOutput { + setup_key: table_id, + desired_setup_state: setup_state, + executors: executors.boxed(), + }) }) - }; - Ok(TypedExportTargetBuildOutput { - setup_key: table_id, - desired_setup_state: setup_state, - executors: executors.boxed(), - }) + .collect::>>()?; + Ok((data_coll_output, vec![])) } fn check_setup_status( diff --git a/src/ops/storages/qdrant.rs b/src/ops/storages/qdrant.rs index 1a48f477..5cbf1a27 100644 --- a/src/ops/storages/qdrant.rs +++ b/src/ops/storages/qdrant.rs @@ -3,7 +3,6 @@ use std::convert::Infallible; use std::fmt::Display; use std::sync::Arc; -use crate::base::spec::*; use crate::ops::sdk::*; use crate::setup; use anyhow::{bail, Result}; @@ -329,6 +328,7 @@ impl Display for CollectionId { #[async_trait] impl StorageFactoryBase for Arc { type Spec = Spec; + type DeclarationSpec = (); type SetupState = (); type Key = String; type ExportContext = ExportContext; @@ -339,41 +339,47 @@ impl StorageFactoryBase for Arc { fn build( self: Arc, - _name: String, - spec: Spec, - key_fields_schema: Vec, - value_fields_schema: Vec, - _storage_options: IndexOptions, + data_collections: Vec>, + _declarations: Vec<()>, _context: Arc, - ) -> Result> { - if key_fields_schema.len() != 1 { - api_bail!( - "Expected one primary key field for the point ID. Got {}.", - key_fields_schema.len() - ) - } - - let collection_name = spec.collection_name.clone(); + ) -> Result<( + Vec>, + Vec<(String, ())>, + )> { + let data_coll_output = data_collections + .into_iter() + .map(|d| { + if d.key_fields_schema.len() != 1 { + api_bail!( + "Expected one primary key field for the point ID. Got {}.", + d.key_fields_schema.len() + ) + } - let export_context = Arc::new(ExportContext::new( - spec.grpc_url, - spec.collection_name.clone(), - spec.api_key, - key_fields_schema, - value_fields_schema, - )?); - let query_target = export_context.clone(); - let executors = async move { - Ok(TypedExportTargetExecutors { - export_context, - query_target: Some(query_target as Arc), + let collection_name = d.spec.collection_name.clone(); + + let export_context = Arc::new(ExportContext::new( + d.spec.grpc_url, + d.spec.collection_name.clone(), + d.spec.api_key, + d.key_fields_schema, + d.value_fields_schema, + )?); + let query_target = export_context.clone(); + let executors = async move { + Ok(TypedExportTargetExecutors { + export_context, + query_target: Some(query_target as Arc), + }) + }; + Ok(TypedExportDataCollectionBuildOutput { + executors: executors.boxed(), + setup_key: collection_name, + desired_setup_state: (), + }) }) - }; - Ok(TypedExportTargetBuildOutput { - executors: executors.boxed(), - setup_key: collection_name, - desired_setup_state: (), - }) + .collect::>>()?; + Ok((data_coll_output, vec![])) } fn check_setup_status( diff --git a/src/prelude.rs b/src/prelude.rs index 976bd3f2..f19c8a4c 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -1,6 +1,6 @@ #![allow(unused_imports)] -pub(crate) use anyhow::Result; +pub(crate) use anyhow::{Context, Result}; pub(crate) use async_trait::async_trait; pub(crate) use chrono::{DateTime, Utc}; pub(crate) use futures::{