Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions docs/docs/ops/storages.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,24 +70,24 @@ The `Neo4j` storage exports each row as a relationship to Neo4j Knowledge Graph.
* `password` (type: `str`): Password for the Neo4j database.
* `db` (type: `str`, optional): The name of the Neo4j database to use as the internal storage, e.g. `neo4j`.
* `mapping`: The mapping from collected row to nodes or relationships of the graph. 2 variations are supported:
* `cocoindex.storages.GraphNode`: each collected row is mapped to a node in the graph. It has the following fields:
* `cocoindex.storages.NodeMapping`: Each collected row is mapped to a node in the graph. It has the following fields:
* `label`: The label of the node.
* `cocoindex.storages.GraphRelationship`: each collected row is mapped to a relationship in the graph,
* `cocoindex.storages.RelationshipMapping`: Each collected row is mapped to a relationship in the graph,
With the following fields:

* `rel_type` (type: `str`): The type of the relationship.
* `source`/`target` (type: `cocoindex.storages.GraphRelationshipEnd`): The source/target node of the relationship, with the following fields:
* `source`/`target` (type: `cocoindex.storages.NodeReferenceMapping`): The source/target node of the relationship, with the following fields:
* `label` (type: `str`): The label of the node.
* `fields` (type: `list[cocoindex.storages.GraphFieldMapping]`): Map fields from the collector to nodes in Neo4j, with the following fields:
* `field_name` (type: `str`): The name of the field in the collected row.
* `node_field_name` (type: `str`, optional): The name of the field to use as the node field. If unspecified, will use the same as `field_name`.
* `fields` (type: `Sequence[cocoindex.storages.TargetFieldMapping]`): Map fields from the collector to nodes in Neo4j, with the following fields:
* `source` (type: `str`): The name of the field in the collected row.
* `target` (type: `str`, optional): The name of the field to use as the node field. If unspecified, will use the same as `source`.

:::info

All fields specified in `fields` will be mapped to properties of source/target nodes. All remaining fields will be mapped to relationship properties by default.
All fields specified in `fields.source` will be mapped to properties of source/target nodes. All remaining fields will be mapped to relationship properties by default.

:::

* `nodes` (type: `dict[str, cocoindex.storages.GraphRelationshipNode]`): This configures indexes for different node labels. Key is the node label. The value type `GraphRelationshipNode` has the following fields to configure [storage indexes](../core/flow_def#storage-indexes) for the node.
* `nodes_storage_spec` (type: `dict[str, cocoindex.storages.NodeStorageSpec]`): This configures indexes for different node labels. Key is the node label. The value type `NodeStorageSpec` has the following fields to configure [storage indexes](../core/flow_def#storage-indexes) for the node.
* `primary_key_fields` is required.
* `vector_indexes` is also supported and optional.
3 changes: 2 additions & 1 deletion docs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,6 @@
},
"engines": {
"node": ">=18.0"
}
},
"packageManager": "yarn@1.22.22+sha512.a6b2f7906b721bba3d67d4aff083df04dad64c399707841b7acf00f6b133b7ac24255f2652fa22ae3534329dc6180534e98d17432037ff6fd140556e2bb3137e"
}
40 changes: 20 additions & 20 deletions examples/docs_to_kg/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,35 +96,35 @@ def docs_to_kg_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.D
"document_node",
cocoindex.storages.Neo4j(
connection=conn_spec,
mapping=cocoindex.storages.GraphNode(label="Document")),
mapping=cocoindex.storages.NodeMapping(label="Document")),
primary_key_fields=["filename"],
)
entity_relationship.export(
"entity_relationship",
cocoindex.storages.Neo4j(
connection=conn_spec,
mapping=cocoindex.storages.GraphRelationship(
mapping=cocoindex.storages.RelationshipMapping(
rel_type="RELATIONSHIP",
source=cocoindex.storages.GraphRelationshipEnd(
source=cocoindex.storages.NodeReferenceMapping(
label="Entity",
fields=[
cocoindex.storages.GraphFieldMapping(
field_name="subject", node_field_name="value"),
cocoindex.storages.GraphFieldMapping(
field_name="subject_embedding", node_field_name="embedding"),
cocoindex.storages.TargetFieldMapping(
source="subject", target="value"),
cocoindex.storages.TargetFieldMapping(
source="subject_embedding", target="embedding"),
]
),
target=cocoindex.storages.GraphRelationshipEnd(
target=cocoindex.storages.NodeReferenceMapping(
label="Entity",
fields=[
cocoindex.storages.GraphFieldMapping(
field_name="object", node_field_name="value"),
cocoindex.storages.GraphFieldMapping(
field_name="object_embedding", node_field_name="embedding"),
cocoindex.storages.TargetFieldMapping(
source="object", target="value"),
cocoindex.storages.TargetFieldMapping(
source="object_embedding", target="embedding"),
]
),
nodes={
"Entity": cocoindex.storages.GraphRelationshipNode(
nodes_storage_spec={
"Entity": cocoindex.storages.NodeStorageSpec(
primary_key_fields=["value"],
vector_indexes=[
cocoindex.VectorIndexDef(
Expand All @@ -142,16 +142,16 @@ def docs_to_kg_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.D
"entity_mention",
cocoindex.storages.Neo4j(
connection=conn_spec,
mapping=cocoindex.storages.GraphRelationship(
mapping=cocoindex.storages.RelationshipMapping(
rel_type="MENTION",
source=cocoindex.storages.GraphRelationshipEnd(
source=cocoindex.storages.NodeReferenceMapping(
label="Document",
fields=[cocoindex.storages.GraphFieldMapping("filename")],
fields=[cocoindex.storages.TargetFieldMapping("filename")],
),
target=cocoindex.storages.GraphRelationshipEnd(
target=cocoindex.storages.NodeReferenceMapping(
label="Entity",
fields=[cocoindex.storages.GraphFieldMapping(
field_name="entity", node_field_name="value")],
fields=[cocoindex.storages.TargetFieldMapping(
source="entity", target="value")],
),
),
),
Expand Down
34 changes: 17 additions & 17 deletions python/cocoindex/storages.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,44 +29,44 @@ class Neo4jConnection:
db: str | None = None

@dataclass
class GraphFieldMapping:
"""Mapping for a Neo4j field."""
field_name: str
class TargetFieldMapping:
"""Mapping for a graph element (node or relationship) field."""
source: str
# Field name for the node in the Knowledge Graph.
# If unspecified, it's the same as `field_name`.
node_field_name: str | None = None
target: str | None = None

@dataclass
class GraphRelationshipEnd:
"""Spec for a Neo4j node type."""
class NodeReferenceMapping:
"""Spec for a referenced graph node, usually as part of a relationship."""
label: str
fields: list[GraphFieldMapping]
fields: list[TargetFieldMapping]

@dataclass
class GraphRelationshipNode:
"""Spec for a Neo4j node type."""
class NodeStorageSpec:
"""Storage spec for a graph node."""
primary_key_fields: Sequence[str]
vector_indexes: Sequence[index.VectorIndexDef] = ()

@dataclass
class GraphNode:
"""Spec for a Neo4j node type."""
class NodeMapping:
"""Spec to map a row to a graph node."""
kind = "Node"

label: str

@dataclass
class GraphRelationship:
"""Spec for a Neo4j relationship."""
class RelationshipMapping:
"""Spec to map a row to a graph relationship."""
kind = "Relationship"

rel_type: str
source: GraphRelationshipEnd
target: GraphRelationshipEnd
nodes: dict[str, GraphRelationshipNode] | None = None
source: NodeReferenceMapping
target: NodeReferenceMapping
nodes_storage_spec: dict[str, NodeStorageSpec] | None = None

class Neo4j(op.StorageSpec):
"""Graph storage powered by Neo4j."""

connection: AuthEntryReference
mapping: GraphNode | GraphRelationship
mapping: NodeMapping | RelationshipMapping
1 change: 1 addition & 0 deletions src/ops/storages/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod neo4j;
pub mod postgres;
pub mod qdrant;
pub mod spec;
98 changes: 28 additions & 70 deletions src/ops/storages/neo4j.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use crate::prelude::*;

use super::spec::{
GraphElementMapping, NodeReferenceMapping, RelationshipMapping, TargetFieldMapping,
};
use crate::setup::components::{self, State};
use crate::setup::{ResourceSetupStatusCheck, SetupChangeType};
use crate::{ops::sdk::*, setup::CombinedState};
Expand All @@ -18,58 +22,10 @@ pub struct ConnectionSpec {
db: Option<String>,
}

#[derive(Debug, Deserialize)]
pub struct GraphFieldMappingSpec {
field_name: FieldName,

/// Field name for the node in the Knowledge Graph.
/// If unspecified, it's the same as `field_name`.
#[serde(default)]
node_field_name: Option<FieldName>,
}

impl GraphFieldMappingSpec {
fn get_node_field_name(&self) -> &FieldName {
self.node_field_name.as_ref().unwrap_or(&self.field_name)
}
}

#[derive(Debug, Deserialize)]
pub struct GraphRelationshipEndSpec {
label: String,
fields: Vec<GraphFieldMappingSpec>,
}

#[derive(Debug, Deserialize)]
pub struct GraphRelationshipNodeSpec {
#[serde(flatten)]
index_options: spec::IndexOptions,
}

#[derive(Debug, Deserialize)]
pub struct GraphNodeSpec {
label: String,
}

#[derive(Debug, Deserialize)]
pub struct GraphRelationshipSpec {
rel_type: String,
source: GraphRelationshipEndSpec,
target: GraphRelationshipEndSpec,
nodes: Option<BTreeMap<String, GraphRelationshipNodeSpec>>,
}

#[derive(Debug, Deserialize)]
#[serde(tag = "kind")]
pub enum GraphMappingSpec {
Relationship(GraphRelationshipSpec),
Node(GraphNodeSpec),
}

#[derive(Debug, Deserialize)]
pub struct Spec {
connection: AuthEntryReference,
mapping: GraphMappingSpec,
connection: spec::AuthEntryReference,
mapping: GraphElementMapping,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -101,12 +57,12 @@ impl ElementType {
}
}

fn from_mapping_spec(spec: &GraphMappingSpec) -> Self {
fn from_mapping_spec(spec: &GraphElementMapping) -> Self {
match spec {
GraphMappingSpec::Relationship(spec) => {
GraphElementMapping::Relationship(spec) => {
ElementType::Relationship(spec.rel_type.clone())
}
GraphMappingSpec::Node(spec) => ElementType::Node(spec.label.clone()),
GraphElementMapping::Node(spec) => ElementType::Node(spec.label.clone()),
}
}

Expand Down Expand Up @@ -283,7 +239,9 @@ fn mapped_field_values_to_bolt<'a>(

fn basic_value_to_bolt(value: &BasicValue, schema: &BasicValueType) -> Result<BoltType> {
let bolt_value = match value {
BasicValue::Bytes(v) => BoltType::Bytes(neo4rs::BoltBytes::new(v.clone())),
BasicValue::Bytes(v) => {
BoltType::Bytes(neo4rs::BoltBytes::new(bytes::Bytes::from_owner(v.clone())))
}
BasicValue::Str(v) => BoltType::String(neo4rs::BoltString::new(v)),
BasicValue::Bool(v) => BoltType::Boolean(neo4rs::BoltBoolean::new(*v)),
BasicValue::Int64(v) => BoltType::Integer(neo4rs::BoltInteger::new(*v)),
Expand Down Expand Up @@ -393,7 +351,7 @@ impl ExportContext {
key_fields.iter().map(|f| &f.name),
);
let result = match spec.mapping {
GraphMappingSpec::Node(node_spec) => {
GraphElementMapping::Node(node_spec) => {
let delete_cypher = formatdoc! {"
OPTIONAL MATCH (old_node:{label} {key_fields_literal})
WITH old_node
Expand Down Expand Up @@ -433,7 +391,7 @@ impl ExportContext {
tgt_fields: None,
}
}
GraphMappingSpec::Relationship(rel_spec) => {
GraphElementMapping::Relationship(rel_spec) => {
let delete_cypher = formatdoc! {"
OPTIONAL MATCH (old_src)-[old_rel:{rel_type} {key_fields_literal}]->(old_tgt)

Expand Down Expand Up @@ -515,7 +473,7 @@ impl ExportContext {
create_order: 1,
delete_cypher,
insert_cypher,
delete_before_upsert: true,
delete_before_upsert: false, // true
key_field_params,
key_fields,
value_fields,
Expand Down Expand Up @@ -687,15 +645,15 @@ impl RelationshipSetupState {
}
let mut dependent_node_labels = vec![];
match &spec.mapping {
GraphMappingSpec::Node(_) => {}
GraphMappingSpec::Relationship(rel_spec) => {
GraphElementMapping::Node(_) => {}
GraphElementMapping::Relationship(rel_spec) => {
let (src_label_info, tgt_label_info) = end_nodes_label_info.ok_or_else(|| {
anyhow!(
"Expect `end_nodes_label_info` existing for relationship `{}`",
rel_spec.rel_type
)
})?;
for (label, node) in rel_spec.nodes.iter().flatten() {
for (label, node) in rel_spec.nodes_storage_spec.iter().flatten() {
if let Some(primary_key_fields) = &node.index_options.primary_key_fields {
sub_components.push(ComponentState {
object_label: ElementType::Node(label.clone()),
Expand Down Expand Up @@ -726,7 +684,7 @@ impl RelationshipSetupState {
}
dependent_node_labels.extend(
rel_spec
.nodes
.nodes_storage_spec
.iter()
.flat_map(|nodes| nodes.keys())
.cloned(),
Expand Down Expand Up @@ -1079,25 +1037,25 @@ impl Factory {
struct DependentNodeLabelAnalyzer<'a> {
label_name: &'a str,
fields: IndexMap<&'a str, AnalyzedGraphFieldMapping>,
remaining_fields: HashMap<&'a str, &'a GraphFieldMappingSpec>,
remaining_fields: HashMap<&'a str, &'a TargetFieldMapping>,
index_options: Option<&'a IndexOptions>,
}

impl<'a> DependentNodeLabelAnalyzer<'a> {
fn new(
rel_spec: &'a GraphRelationshipSpec,
rel_end_spec: &'a GraphRelationshipEndSpec,
rel_spec: &'a RelationshipMapping,
rel_end_spec: &'a NodeReferenceMapping,
) -> Result<Self> {
Ok(Self {
label_name: rel_end_spec.label.as_str(),
fields: IndexMap::new(),
remaining_fields: rel_end_spec
.fields
.iter()
.map(|f| (f.field_name.as_str(), f))
.map(|f| (f.source.as_str(), f))
.collect(),
index_options: rel_spec
.nodes
.nodes_storage_spec
.as_ref()
.and_then(|nodes| nodes.get(&rel_end_spec.label))
.and_then(|node_spec| Some(&node_spec.index_options)),
Expand All @@ -1110,10 +1068,10 @@ impl<'a> DependentNodeLabelAnalyzer<'a> {
None => return false,
};
self.fields.insert(
field_info.get_node_field_name().as_str(),
field_info.get_target().as_str(),
AnalyzedGraphFieldMapping {
field_idx,
field_name: field_info.get_node_field_name().clone(),
field_name: field_info.get_target().clone(),
value_type: field_schema.value_type.typ.clone(),
},
);
Expand Down Expand Up @@ -1184,7 +1142,7 @@ impl StorageFactoryBase for Factory {
let setup_key = GraphElement::from_spec(&spec);

let (value_fields_info, rel_end_label_info) = match &spec.mapping {
GraphMappingSpec::Node(_) => (
GraphElementMapping::Node(_) => (
value_fields_schema
.into_iter()
.enumerate()
Expand All @@ -1196,7 +1154,7 @@ impl StorageFactoryBase for Factory {
.collect(),
None,
),
GraphMappingSpec::Relationship(rel_spec) => {
GraphElementMapping::Relationship(rel_spec) => {
let mut src_label_analyzer =
DependentNodeLabelAnalyzer::new(&rel_spec, &rel_spec.source)?;
let mut tgt_label_analyzer =
Expand Down
Loading