diff --git a/docs/docs/ops/storages.md b/docs/docs/ops/storages.md index 4b1a8520..5bb571b6 100644 --- a/docs/docs/ops/storages.md +++ b/docs/docs/ops/storages.md @@ -70,24 +70,24 @@ The `Neo4j` storage exports each row as a relationship to Neo4j Knowledge Graph. * `password` (type: `str`): Password for the Neo4j database. * `db` (type: `str`, optional): The name of the Neo4j database to use as the internal storage, e.g. `neo4j`. * `mapping`: The mapping from collected row to nodes or relationships of the graph. 2 variations are supported: - * `cocoindex.storages.GraphNode`: each collected row is mapped to a node in the graph. It has the following fields: + * `cocoindex.storages.NodeMapping`: Each collected row is mapped to a node in the graph. It has the following fields: * `label`: The label of the node. - * `cocoindex.storages.GraphRelationship`: each collected row is mapped to a relationship in the graph, + * `cocoindex.storages.RelationshipMapping`: Each collected row is mapped to a relationship in the graph, With the following fields: * `rel_type` (type: `str`): The type of the relationship. - * `source`/`target` (type: `cocoindex.storages.GraphRelationshipEnd`): The source/target node of the relationship, with the following fields: + * `source`/`target` (type: `cocoindex.storages.NodeReferenceMapping`): The source/target node of the relationship, with the following fields: * `label` (type: `str`): The label of the node. - * `fields` (type: `list[cocoindex.storages.GraphFieldMapping]`): Map fields from the collector to nodes in Neo4j, with the following fields: - * `field_name` (type: `str`): The name of the field in the collected row. - * `node_field_name` (type: `str`, optional): The name of the field to use as the node field. If unspecified, will use the same as `field_name`. + * `fields` (type: `Sequence[cocoindex.storages.TargetFieldMapping]`): Map fields from the collector to nodes in Neo4j, with the following fields: + * `source` (type: `str`): The name of the field in the collected row. + * `target` (type: `str`, optional): The name of the field to use as the node field. If unspecified, will use the same as `source`. :::info - All fields specified in `fields` will be mapped to properties of source/target nodes. All remaining fields will be mapped to relationship properties by default. + All fields specified in `fields.source` will be mapped to properties of source/target nodes. All remaining fields will be mapped to relationship properties by default. ::: - * `nodes` (type: `dict[str, cocoindex.storages.GraphRelationshipNode]`): This configures indexes for different node labels. Key is the node label. The value type `GraphRelationshipNode` has the following fields to configure [storage indexes](../core/flow_def#storage-indexes) for the node. + * `nodes_storage_spec` (type: `dict[str, cocoindex.storages.NodeStorageSpec]`): This configures indexes for different node labels. Key is the node label. The value type `NodeStorageSpec` has the following fields to configure [storage indexes](../core/flow_def#storage-indexes) for the node. * `primary_key_fields` is required. * `vector_indexes` is also supported and optional. \ No newline at end of file diff --git a/docs/package.json b/docs/package.json index da90cb10..e55ec2e7 100644 --- a/docs/package.json +++ b/docs/package.json @@ -46,5 +46,6 @@ }, "engines": { "node": ">=18.0" - } + }, + "packageManager": "yarn@1.22.22+sha512.a6b2f7906b721bba3d67d4aff083df04dad64c399707841b7acf00f6b133b7ac24255f2652fa22ae3534329dc6180534e98d17432037ff6fd140556e2bb3137e" } diff --git a/examples/docs_to_kg/main.py b/examples/docs_to_kg/main.py index 9813768f..c20acbd0 100644 --- a/examples/docs_to_kg/main.py +++ b/examples/docs_to_kg/main.py @@ -96,35 +96,35 @@ def docs_to_kg_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.D "document_node", cocoindex.storages.Neo4j( connection=conn_spec, - mapping=cocoindex.storages.GraphNode(label="Document")), + mapping=cocoindex.storages.NodeMapping(label="Document")), primary_key_fields=["filename"], ) entity_relationship.export( "entity_relationship", cocoindex.storages.Neo4j( connection=conn_spec, - mapping=cocoindex.storages.GraphRelationship( + mapping=cocoindex.storages.RelationshipMapping( rel_type="RELATIONSHIP", - source=cocoindex.storages.GraphRelationshipEnd( + source=cocoindex.storages.NodeReferenceMapping( label="Entity", fields=[ - cocoindex.storages.GraphFieldMapping( - field_name="subject", node_field_name="value"), - cocoindex.storages.GraphFieldMapping( - field_name="subject_embedding", node_field_name="embedding"), + cocoindex.storages.TargetFieldMapping( + source="subject", target="value"), + cocoindex.storages.TargetFieldMapping( + source="subject_embedding", target="embedding"), ] ), - target=cocoindex.storages.GraphRelationshipEnd( + target=cocoindex.storages.NodeReferenceMapping( label="Entity", fields=[ - cocoindex.storages.GraphFieldMapping( - field_name="object", node_field_name="value"), - cocoindex.storages.GraphFieldMapping( - field_name="object_embedding", node_field_name="embedding"), + cocoindex.storages.TargetFieldMapping( + source="object", target="value"), + cocoindex.storages.TargetFieldMapping( + source="object_embedding", target="embedding"), ] ), - nodes={ - "Entity": cocoindex.storages.GraphRelationshipNode( + nodes_storage_spec={ + "Entity": cocoindex.storages.NodeStorageSpec( primary_key_fields=["value"], vector_indexes=[ cocoindex.VectorIndexDef( @@ -142,16 +142,16 @@ def docs_to_kg_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.D "entity_mention", cocoindex.storages.Neo4j( connection=conn_spec, - mapping=cocoindex.storages.GraphRelationship( + mapping=cocoindex.storages.RelationshipMapping( rel_type="MENTION", - source=cocoindex.storages.GraphRelationshipEnd( + source=cocoindex.storages.NodeReferenceMapping( label="Document", - fields=[cocoindex.storages.GraphFieldMapping("filename")], + fields=[cocoindex.storages.TargetFieldMapping("filename")], ), - target=cocoindex.storages.GraphRelationshipEnd( + target=cocoindex.storages.NodeReferenceMapping( label="Entity", - fields=[cocoindex.storages.GraphFieldMapping( - field_name="entity", node_field_name="value")], + fields=[cocoindex.storages.TargetFieldMapping( + source="entity", target="value")], ), ), ), diff --git a/python/cocoindex/storages.py b/python/cocoindex/storages.py index f615cfa3..b143f3a4 100644 --- a/python/cocoindex/storages.py +++ b/python/cocoindex/storages.py @@ -29,44 +29,44 @@ class Neo4jConnection: db: str | None = None @dataclass -class GraphFieldMapping: - """Mapping for a Neo4j field.""" - field_name: str +class TargetFieldMapping: + """Mapping for a graph element (node or relationship) field.""" + source: str # Field name for the node in the Knowledge Graph. # If unspecified, it's the same as `field_name`. - node_field_name: str | None = None + target: str | None = None @dataclass -class GraphRelationshipEnd: - """Spec for a Neo4j node type.""" +class NodeReferenceMapping: + """Spec for a referenced graph node, usually as part of a relationship.""" label: str - fields: list[GraphFieldMapping] + fields: list[TargetFieldMapping] @dataclass -class GraphRelationshipNode: - """Spec for a Neo4j node type.""" +class NodeStorageSpec: + """Storage spec for a graph node.""" primary_key_fields: Sequence[str] vector_indexes: Sequence[index.VectorIndexDef] = () @dataclass -class GraphNode: - """Spec for a Neo4j node type.""" +class NodeMapping: + """Spec to map a row to a graph node.""" kind = "Node" label: str @dataclass -class GraphRelationship: - """Spec for a Neo4j relationship.""" +class RelationshipMapping: + """Spec to map a row to a graph relationship.""" kind = "Relationship" rel_type: str - source: GraphRelationshipEnd - target: GraphRelationshipEnd - nodes: dict[str, GraphRelationshipNode] | None = None + source: NodeReferenceMapping + target: NodeReferenceMapping + nodes_storage_spec: dict[str, NodeStorageSpec] | None = None class Neo4j(op.StorageSpec): """Graph storage powered by Neo4j.""" connection: AuthEntryReference - mapping: GraphNode | GraphRelationship + mapping: NodeMapping | RelationshipMapping diff --git a/src/ops/storages/mod.rs b/src/ops/storages/mod.rs index 40d74d6b..b069af92 100644 --- a/src/ops/storages/mod.rs +++ b/src/ops/storages/mod.rs @@ -1,3 +1,4 @@ pub mod neo4j; pub mod postgres; pub mod qdrant; +pub mod spec; diff --git a/src/ops/storages/neo4j.rs b/src/ops/storages/neo4j.rs index d73bb907..b07887d1 100644 --- a/src/ops/storages/neo4j.rs +++ b/src/ops/storages/neo4j.rs @@ -1,4 +1,8 @@ use crate::prelude::*; + +use super::spec::{ + GraphElementMapping, NodeReferenceMapping, RelationshipMapping, TargetFieldMapping, +}; use crate::setup::components::{self, State}; use crate::setup::{ResourceSetupStatusCheck, SetupChangeType}; use crate::{ops::sdk::*, setup::CombinedState}; @@ -18,58 +22,10 @@ pub struct ConnectionSpec { db: Option, } -#[derive(Debug, Deserialize)] -pub struct GraphFieldMappingSpec { - field_name: FieldName, - - /// Field name for the node in the Knowledge Graph. - /// If unspecified, it's the same as `field_name`. - #[serde(default)] - node_field_name: Option, -} - -impl GraphFieldMappingSpec { - fn get_node_field_name(&self) -> &FieldName { - self.node_field_name.as_ref().unwrap_or(&self.field_name) - } -} - -#[derive(Debug, Deserialize)] -pub struct GraphRelationshipEndSpec { - label: String, - fields: Vec, -} - -#[derive(Debug, Deserialize)] -pub struct GraphRelationshipNodeSpec { - #[serde(flatten)] - index_options: spec::IndexOptions, -} - -#[derive(Debug, Deserialize)] -pub struct GraphNodeSpec { - label: String, -} - -#[derive(Debug, Deserialize)] -pub struct GraphRelationshipSpec { - rel_type: String, - source: GraphRelationshipEndSpec, - target: GraphRelationshipEndSpec, - nodes: Option>, -} - -#[derive(Debug, Deserialize)] -#[serde(tag = "kind")] -pub enum GraphMappingSpec { - Relationship(GraphRelationshipSpec), - Node(GraphNodeSpec), -} - #[derive(Debug, Deserialize)] pub struct Spec { - connection: AuthEntryReference, - mapping: GraphMappingSpec, + connection: spec::AuthEntryReference, + mapping: GraphElementMapping, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] @@ -101,12 +57,12 @@ impl ElementType { } } - fn from_mapping_spec(spec: &GraphMappingSpec) -> Self { + fn from_mapping_spec(spec: &GraphElementMapping) -> Self { match spec { - GraphMappingSpec::Relationship(spec) => { + GraphElementMapping::Relationship(spec) => { ElementType::Relationship(spec.rel_type.clone()) } - GraphMappingSpec::Node(spec) => ElementType::Node(spec.label.clone()), + GraphElementMapping::Node(spec) => ElementType::Node(spec.label.clone()), } } @@ -283,7 +239,9 @@ fn mapped_field_values_to_bolt<'a>( fn basic_value_to_bolt(value: &BasicValue, schema: &BasicValueType) -> Result { let bolt_value = match value { - BasicValue::Bytes(v) => BoltType::Bytes(neo4rs::BoltBytes::new(v.clone())), + BasicValue::Bytes(v) => { + BoltType::Bytes(neo4rs::BoltBytes::new(bytes::Bytes::from_owner(v.clone()))) + } BasicValue::Str(v) => BoltType::String(neo4rs::BoltString::new(v)), BasicValue::Bool(v) => BoltType::Boolean(neo4rs::BoltBoolean::new(*v)), BasicValue::Int64(v) => BoltType::Integer(neo4rs::BoltInteger::new(*v)), @@ -393,7 +351,7 @@ impl ExportContext { key_fields.iter().map(|f| &f.name), ); let result = match spec.mapping { - GraphMappingSpec::Node(node_spec) => { + GraphElementMapping::Node(node_spec) => { let delete_cypher = formatdoc! {" OPTIONAL MATCH (old_node:{label} {key_fields_literal}) WITH old_node @@ -433,7 +391,7 @@ impl ExportContext { tgt_fields: None, } } - GraphMappingSpec::Relationship(rel_spec) => { + GraphElementMapping::Relationship(rel_spec) => { let delete_cypher = formatdoc! {" OPTIONAL MATCH (old_src)-[old_rel:{rel_type} {key_fields_literal}]->(old_tgt) @@ -515,7 +473,7 @@ impl ExportContext { create_order: 1, delete_cypher, insert_cypher, - delete_before_upsert: true, + delete_before_upsert: false, // true key_field_params, key_fields, value_fields, @@ -687,15 +645,15 @@ impl RelationshipSetupState { } let mut dependent_node_labels = vec![]; match &spec.mapping { - GraphMappingSpec::Node(_) => {} - GraphMappingSpec::Relationship(rel_spec) => { + GraphElementMapping::Node(_) => {} + GraphElementMapping::Relationship(rel_spec) => { let (src_label_info, tgt_label_info) = end_nodes_label_info.ok_or_else(|| { anyhow!( "Expect `end_nodes_label_info` existing for relationship `{}`", rel_spec.rel_type ) })?; - for (label, node) in rel_spec.nodes.iter().flatten() { + for (label, node) in rel_spec.nodes_storage_spec.iter().flatten() { if let Some(primary_key_fields) = &node.index_options.primary_key_fields { sub_components.push(ComponentState { object_label: ElementType::Node(label.clone()), @@ -726,7 +684,7 @@ impl RelationshipSetupState { } dependent_node_labels.extend( rel_spec - .nodes + .nodes_storage_spec .iter() .flat_map(|nodes| nodes.keys()) .cloned(), @@ -1079,14 +1037,14 @@ impl Factory { struct DependentNodeLabelAnalyzer<'a> { label_name: &'a str, fields: IndexMap<&'a str, AnalyzedGraphFieldMapping>, - remaining_fields: HashMap<&'a str, &'a GraphFieldMappingSpec>, + remaining_fields: HashMap<&'a str, &'a TargetFieldMapping>, index_options: Option<&'a IndexOptions>, } impl<'a> DependentNodeLabelAnalyzer<'a> { fn new( - rel_spec: &'a GraphRelationshipSpec, - rel_end_spec: &'a GraphRelationshipEndSpec, + rel_spec: &'a RelationshipMapping, + rel_end_spec: &'a NodeReferenceMapping, ) -> Result { Ok(Self { label_name: rel_end_spec.label.as_str(), @@ -1094,10 +1052,10 @@ impl<'a> DependentNodeLabelAnalyzer<'a> { remaining_fields: rel_end_spec .fields .iter() - .map(|f| (f.field_name.as_str(), f)) + .map(|f| (f.source.as_str(), f)) .collect(), index_options: rel_spec - .nodes + .nodes_storage_spec .as_ref() .and_then(|nodes| nodes.get(&rel_end_spec.label)) .and_then(|node_spec| Some(&node_spec.index_options)), @@ -1110,10 +1068,10 @@ impl<'a> DependentNodeLabelAnalyzer<'a> { None => return false, }; self.fields.insert( - field_info.get_node_field_name().as_str(), + field_info.get_target().as_str(), AnalyzedGraphFieldMapping { field_idx, - field_name: field_info.get_node_field_name().clone(), + field_name: field_info.get_target().clone(), value_type: field_schema.value_type.typ.clone(), }, ); @@ -1184,7 +1142,7 @@ impl StorageFactoryBase for Factory { let setup_key = GraphElement::from_spec(&spec); let (value_fields_info, rel_end_label_info) = match &spec.mapping { - GraphMappingSpec::Node(_) => ( + GraphElementMapping::Node(_) => ( value_fields_schema .into_iter() .enumerate() @@ -1196,7 +1154,7 @@ impl StorageFactoryBase for Factory { .collect(), None, ), - GraphMappingSpec::Relationship(rel_spec) => { + GraphElementMapping::Relationship(rel_spec) => { let mut src_label_analyzer = DependentNodeLabelAnalyzer::new(&rel_spec, &rel_spec.source)?; let mut tgt_label_analyzer = diff --git a/src/ops/storages/spec.rs b/src/ops/storages/spec.rs new file mode 100644 index 00000000..c1d8772f --- /dev/null +++ b/src/ops/storages/spec.rs @@ -0,0 +1,49 @@ +use crate::prelude::*; + +#[derive(Debug, Deserialize)] +pub struct TargetFieldMapping { + pub source: spec::FieldName, + + /// Field name for the node in the Knowledge Graph. + /// If unspecified, it's the same as `field_name`. + #[serde(default)] + pub target: Option, +} + +impl TargetFieldMapping { + pub fn get_target(&self) -> &spec::FieldName { + self.target.as_ref().unwrap_or(&self.source) + } +} + +#[derive(Debug, Deserialize)] +pub struct NodeReferenceMapping { + pub label: String, + pub fields: Vec, +} + +#[derive(Debug, Deserialize)] +pub struct NodeStorageSpec { + #[serde(flatten)] + pub index_options: spec::IndexOptions, +} + +#[derive(Debug, Deserialize)] +pub struct NodeMapping { + pub label: String, +} + +#[derive(Debug, Deserialize)] +pub struct RelationshipMapping { + pub rel_type: String, + pub source: NodeReferenceMapping, + pub target: NodeReferenceMapping, + pub nodes_storage_spec: Option>, +} + +#[derive(Debug, Deserialize)] +#[serde(tag = "kind")] +pub enum GraphElementMapping { + Relationship(RelationshipMapping), + Node(NodeMapping), +}