Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
408 changes: 223 additions & 185 deletions src/builder/analyzer.rs

Large diffs are not rendered by default.

91 changes: 60 additions & 31 deletions src/ops/factory_bases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,15 +269,23 @@ pub struct TypedExportTargetExecutors<F: StorageFactoryBase + ?Sized> {
pub query_target: Option<Arc<dyn QueryTarget>>,
}

pub struct TypedExportTargetBuildOutput<F: StorageFactoryBase + ?Sized> {
pub struct TypedExportDataCollectionBuildOutput<F: StorageFactoryBase + ?Sized> {
pub executors: BoxFuture<'static, Result<TypedExportTargetExecutors<F>>>,
pub setup_key: F::Key,
pub desired_setup_state: F::SetupState,
}
pub struct TypedExportDataCollectionSpec<F: StorageFactoryBase + ?Sized> {
pub name: String,
pub spec: F::Spec,
pub key_fields_schema: Vec<FieldSchema>,
pub value_fields_schema: Vec<FieldSchema>,
pub index_options: IndexOptions,
}

#[async_trait]
pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
type Spec: DeserializeOwned + Send + Sync;
type DeclarationSpec: DeserializeOwned + Send + Sync;
type Key: Debug + Clone + Serialize + DeserializeOwned + Eq + Hash + Send + Sync;
type SetupState: Debug + Clone + Serialize + DeserializeOwned + Send + Sync;
type ExportContext: Send + Sync + 'static;
Expand All @@ -286,13 +294,13 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {

fn build(
self: Arc<Self>,
name: String,
spec: Self::Spec,
key_fields_schema: Vec<FieldSchema>,
value_fields_schema: Vec<FieldSchema>,
storage_options: IndexOptions,
data_collections: Vec<TypedExportDataCollectionSpec<Self>>,
declarations: Vec<Self::DeclarationSpec>,
context: Arc<FlowInstanceContext>,
) -> Result<TypedExportTargetBuildOutput<Self>>;
) -> Result<(
Vec<TypedExportDataCollectionBuildOutput<Self>>,
Vec<(Self::Key, Self::SetupState)>,
)>;

/// Will not be called if it's setup by user.
/// It returns an error if the target only supports setup by user.
Expand Down Expand Up @@ -332,35 +340,56 @@ pub trait StorageFactoryBase: ExportTargetFactory + Send + Sync + 'static {
impl<T: StorageFactoryBase> ExportTargetFactory for T {
fn build(
self: Arc<Self>,
name: String,
spec: serde_json::Value,
key_fields_schema: Vec<FieldSchema>,
value_fields_schema: Vec<FieldSchema>,
storage_options: IndexOptions,
data_collections: Vec<interface::ExportDataCollectionSpec>,
declarations: Vec<serde_json::Value>,
context: Arc<FlowInstanceContext>,
) -> Result<interface::ExportTargetBuildOutput> {
let spec: T::Spec = serde_json::from_value(spec)?;
let build_output = StorageFactoryBase::build(
) -> Result<(
Vec<interface::ExportDataCollectionBuildOutput>,
Vec<(serde_json::Value, serde_json::Value)>,
)> {
let (data_coll_output, decl_output) = StorageFactoryBase::build(
self,
name,
spec,
key_fields_schema,
value_fields_schema,
storage_options,
data_collections
.into_iter()
.map(|d| {
anyhow::Ok(TypedExportDataCollectionSpec {
name: d.name,
spec: serde_json::from_value(d.spec)?,
key_fields_schema: d.key_fields_schema,
value_fields_schema: d.value_fields_schema,
index_options: d.index_options,
})
})
.collect::<Result<Vec<_>>>()?,
declarations
.into_iter()
.map(|d| anyhow::Ok(serde_json::from_value(d)?))
.collect::<Result<Vec<_>>>()?,
context,
)?;
let executors = async move {
let executors = build_output.executors.await?;
Ok(interface::ExportTargetExecutors {
export_context: executors.export_context,
query_target: executors.query_target,

let data_coll_output = data_coll_output
.into_iter()
.map(|d| {
Ok(interface::ExportDataCollectionBuildOutput {
executors: async move {
let executors = d.executors.await?;
Ok(interface::ExportTargetExecutors {
export_context: executors.export_context,
query_target: executors.query_target,
})
}
.boxed(),
setup_key: serde_json::to_value(d.setup_key)?,
desired_setup_state: serde_json::to_value(d.desired_setup_state)?,
})
})
};
Ok(interface::ExportTargetBuildOutput {
setup_key: serde_json::to_value(build_output.setup_key)?,
desired_setup_state: serde_json::to_value(build_output.desired_setup_state)?,
executors: executors.boxed(),
})
.collect::<Result<Vec<_>>>()?;
let decl_output = decl_output
.into_iter()
.map(|(key, state)| Ok((serde_json::to_value(key)?, serde_json::to_value(state)?)))
.collect::<Result<Vec<_>>>()?;
Ok((data_coll_output, decl_output))
}

fn check_setup_status(
Expand Down
22 changes: 15 additions & 7 deletions src/ops/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,23 +162,31 @@ pub struct ExportTargetExecutors {
pub export_context: Arc<dyn Any + Send + Sync>,
pub query_target: Option<Arc<dyn QueryTarget>>,
}
pub struct ExportTargetBuildOutput {
pub struct ExportDataCollectionBuildOutput {
pub executors: BoxFuture<'static, Result<ExportTargetExecutors>>,
pub setup_key: serde_json::Value,
pub desired_setup_state: serde_json::Value,
}

pub struct ExportDataCollectionSpec {
pub name: String,
pub spec: serde_json::Value,
pub key_fields_schema: Vec<FieldSchema>,
pub value_fields_schema: Vec<FieldSchema>,
pub index_options: IndexOptions,
}

#[async_trait]
pub trait ExportTargetFactory: Send + Sync {
fn build(
self: Arc<Self>,
name: String,
spec: serde_json::Value,
key_fields_schema: Vec<FieldSchema>,
value_fields_schema: Vec<FieldSchema>,
storage_options: IndexOptions,
data_collections: Vec<ExportDataCollectionSpec>,
declarations: Vec<serde_json::Value>,
context: Arc<FlowInstanceContext>,
) -> Result<ExportTargetBuildOutput>;
) -> Result<(
Vec<ExportDataCollectionBuildOutput>,
Vec<(serde_json::Value, serde_json::Value)>,
)>;

/// Will not be called if it's setup by user.
/// It returns an error if the target only supports setup by user.
Expand Down
2 changes: 1 addition & 1 deletion src/ops/sdk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub use crate::base::spec::*;
pub use crate::base::value::*;

// Disambiguate the ExportTargetBuildOutput type.
pub use super::factory_bases::TypedExportTargetBuildOutput;
pub use super::factory_bases::TypedExportDataCollectionBuildOutput;
/// Defined for all types convertible to ValueType, to ease creation for ValueType in various operation factories.
pub trait TypeCore {
fn into_type(self) -> ValueType;
Expand Down
152 changes: 80 additions & 72 deletions src/ops/storages/neo4j.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,7 @@ impl<'a> DependentNodeLabelAnalyzer<'a> {
#[async_trait]
impl StorageFactoryBase for Factory {
type Spec = Spec;
type DeclarationSpec = ();
type SetupState = RelationshipSetupState;
type Key = GraphElement;
type ExportContext = ExportContext;
Expand All @@ -1132,82 +1133,89 @@ impl StorageFactoryBase for Factory {

fn build(
self: Arc<Self>,
_name: String,
spec: Spec,
key_fields_schema: Vec<FieldSchema>,
value_fields_schema: Vec<FieldSchema>,
index_options: IndexOptions,
data_collections: Vec<TypedExportDataCollectionSpec<Self>>,
_declarations: Vec<()>,
context: Arc<FlowInstanceContext>,
) -> Result<TypedExportTargetBuildOutput<Self>> {
let setup_key = GraphElement::from_spec(&spec);

let (value_fields_info, rel_end_label_info) = match &spec.mapping {
GraphElementMapping::Node(_) => (
value_fields_schema
.into_iter()
.enumerate()
.map(|(field_idx, field_schema)| AnalyzedGraphFieldMapping {
field_idx,
field_name: field_schema.name.clone(),
value_type: field_schema.value_type.typ.clone(),
})
.collect(),
None,
),
GraphElementMapping::Relationship(rel_spec) => {
let mut src_label_analyzer =
DependentNodeLabelAnalyzer::new(&rel_spec, &rel_spec.source)?;
let mut tgt_label_analyzer =
DependentNodeLabelAnalyzer::new(&rel_spec, &rel_spec.target)?;
let mut value_fields_info = vec![];
for (field_idx, field_schema) in value_fields_schema.iter().enumerate() {
if !src_label_analyzer.process_field(field_idx, field_schema)
&& !tgt_label_analyzer.process_field(field_idx, field_schema)
{
value_fields_info.push(AnalyzedGraphFieldMapping {
field_idx,
field_name: field_schema.name.clone(),
value_type: field_schema.value_type.typ.clone(),
});
) -> Result<(
Vec<TypedExportDataCollectionBuildOutput<Self>>,
Vec<(GraphElement, RelationshipSetupState)>,
)> {
let data_coll_output = data_collections
.into_iter()
.map(|d| {
let setup_key = GraphElement::from_spec(&d.spec);

let (value_fields_info, rel_end_label_info) = match &d.spec.mapping {
GraphElementMapping::Node(_) => (
d.value_fields_schema
.into_iter()
.enumerate()
.map(|(field_idx, field_schema)| AnalyzedGraphFieldMapping {
field_idx,
field_name: field_schema.name.clone(),
value_type: field_schema.value_type.typ.clone(),
})
.collect(),
None,
),
GraphElementMapping::Relationship(rel_spec) => {
let mut src_label_analyzer =
DependentNodeLabelAnalyzer::new(&rel_spec, &rel_spec.source)?;
let mut tgt_label_analyzer =
DependentNodeLabelAnalyzer::new(&rel_spec, &rel_spec.target)?;
let mut value_fields_info = vec![];
for (field_idx, field_schema) in d.value_fields_schema.iter().enumerate() {
if !src_label_analyzer.process_field(field_idx, field_schema)
&& !tgt_label_analyzer.process_field(field_idx, field_schema)
{
value_fields_info.push(AnalyzedGraphFieldMapping {
field_idx,
field_name: field_schema.name.clone(),
value_type: field_schema.value_type.typ.clone(),
});
}
}
let src_label_info = src_label_analyzer.build()?;
let tgt_label_info = tgt_label_analyzer.build()?;
(value_fields_info, Some((src_label_info, tgt_label_info)))
}
}
let src_label_info = src_label_analyzer.build()?;
let tgt_label_info = tgt_label_analyzer.build()?;
(value_fields_info, Some((src_label_info, tgt_label_info)))
}
};

let desired_setup_state = RelationshipSetupState::new(
&spec,
key_fields_schema.iter().map(|f| f.name.clone()).collect(),
&index_options,
&value_fields_info,
rel_end_label_info.as_ref(),
)?;
};

let conn_spec = context
.auth_registry
.get::<ConnectionSpec>(&spec.connection)?;
let executors = async move {
let graph = self.graph_pool.get_graph(&conn_spec).await?;
let executor = Arc::new(ExportContext::new(
graph,
spec,
key_fields_schema,
value_fields_info,
rel_end_label_info,
)?);
Ok(TypedExportTargetExecutors {
export_context: executor,
query_target: None,
let desired_setup_state = RelationshipSetupState::new(
&d.spec,
d.key_fields_schema.iter().map(|f| f.name.clone()).collect(),
&d.index_options,
&value_fields_info,
rel_end_label_info.as_ref(),
)?;

let conn_spec = context
.auth_registry
.get::<ConnectionSpec>(&d.spec.connection)?;
let factory = self.clone();
let executors = async move {
let graph = factory.graph_pool.get_graph(&conn_spec).await?;
let executor = Arc::new(ExportContext::new(
graph,
d.spec,
d.key_fields_schema,
value_fields_info,
rel_end_label_info,
)?);
Ok(TypedExportTargetExecutors {
export_context: executor,
query_target: None,
})
}
.boxed();
Ok(TypedExportDataCollectionBuildOutput {
executors,
setup_key,
desired_setup_state,
})
})
}
.boxed();
Ok(TypedExportTargetBuildOutput {
executors,
setup_key,
desired_setup_state,
})
.collect::<Result<Vec<_>>>()?;
Ok((data_coll_output, vec![]))
}

fn check_setup_status(
Expand Down
Loading