From 8a8bf9d99fbe375cf642b172527ba53e92f9df6d Mon Sep 17 00:00:00 2001 From: LJ Date: Sat, 12 Apr 2025 23:27:27 -0700 Subject: [PATCH] Support creating / managing vector index in Neo4j. --- src/base/spec.rs | 2 +- src/ops/storages/neo4j.rs | 275 ++++++++++++++++++++++++++++++++++---- 2 files changed, 249 insertions(+), 28 deletions(-) diff --git a/src/base/spec.rs b/src/base/spec.rs index 0989cacf8..66c5027d7 100644 --- a/src/base/spec.rs +++ b/src/base/spec.rs @@ -204,7 +204,7 @@ pub struct CollectOpSpec { pub auto_uuid_field: Option, } -#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] pub enum VectorSimilarityMetric { CosineSimilarity, L2Distance, diff --git a/src/ops/storages/neo4j.rs b/src/ops/storages/neo4j.rs index ea291da7d..25896acd2 100644 --- a/src/ops/storages/neo4j.rs +++ b/src/ops/storages/neo4j.rs @@ -531,17 +531,53 @@ impl ExportTargetExecutor for RelationshipStorageExecutor { } } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +struct VectorIndexState { + label: String, + field_name: String, + vector_size: usize, + metric: spec::VectorSimilarityMetric, +} + +impl VectorIndexState { + fn new( + label: &str, + index_def: &spec::VectorIndexDef, + field_typ: &schema::ValueType, + ) -> Result { + Ok(Self { + label: label.to_string(), + field_name: index_def.field_name.clone(), + vector_size: (match field_typ { + schema::ValueType::Basic(schema::BasicValueType::Vector(schema)) => { + schema.dimension + } + _ => None, + }) + .ok_or_else(|| { + api_error!("Vector index field must be a vector with fixed dimension") + })?, + metric: index_def.metric, + }) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NodeLabelSetupState { key_field_names: Vec, key_constraint_name: String, - vector_indexes: HashMap, + #[serde(default, skip_serializing_if = "HashMap::is_empty")] + vector_indexes: HashMap, } impl NodeLabelSetupState { - fn from_spec(label: &str, spec: &RelationshipNodeSpec) -> Self { + fn new( + label: &str, + spec: &RelationshipNodeSpec, + node_label_infos: &[&AnalyzedNodeLabelInfo], + ) -> Result { let key_constraint_name = format!("n__{}__unique", label); - Self { + Ok(Self { key_field_names: spec .index_options .primary_key_fields @@ -552,14 +588,28 @@ impl NodeLabelSetupState { .index_options .vector_index_defs .iter() - .map(|v| { - ( + .map(|v| -> Result<_> { + Ok(( format!("n__{}__{}__{}", label, v.field_name.clone(), v.metric), - v.clone(), - ) + VectorIndexState::new( + label, + v, + node_label_infos + .iter() + .flat_map(|v| v.key_fields.iter().chain(v.value_fields.iter())) + .find(|f| f.field_name == v.field_name) + .map(|f| &f.value_type) + .ok_or_else(|| { + api_error!( + "Unknown field name for vector index: {}", + v.field_name + ) + })?, + )?, + )) }) - .collect(), - } + .collect::>()?, + }) } fn is_compatible(&self, other: &Self) -> bool { @@ -570,21 +620,58 @@ impl NodeLabelSetupState { pub struct RelationshipSetupState { key_field_names: Vec, key_constraint_name: String, - #[serde(default)] + #[serde(default, skip_serializing_if = "HashMap::is_empty")] + vector_indexes: HashMap, + #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] nodes: BTreeMap, } impl RelationshipSetupState { - fn from_spec(spec: &RelationshipSpec, key_field_names: Vec) -> Self { - Self { + fn new( + spec: &RelationshipSpec, + key_field_names: Vec, + index_options: &IndexOptions, + rel_value_fields_info: &Vec, + src_label_info: &AnalyzedNodeLabelInfo, + tgt_label_info: &AnalyzedNodeLabelInfo, + ) -> Result { + Ok(Self { key_field_names, key_constraint_name: format!("r__{}__key", spec.rel_type), + vector_indexes: index_options + .vector_index_defs + .iter() + .map(|v| -> Result<_> { + Ok(( + format!("r__{}__{}__{}", spec.rel_type, v.field_name, v.metric), + VectorIndexState::new( + &spec.rel_type, + v, + &rel_value_fields_info + .iter() + .find(|f| f.field_name == v.field_name) + .ok_or_else(|| { + api_error!( + "Unknown field name for vector index: {}", + v.field_name + ) + })? + .value_type, + )?, + )) + }) + .collect::>()?, nodes: spec .nodes .iter() - .map(|(label, node)| (label.clone(), NodeLabelSetupState::from_spec(label, node))) - .collect(), - } + .map(|(label, node)| -> Result<_> { + Ok(( + label.clone(), + NodeLabelSetupState::new(label, node, &[src_label_info, tgt_label_info])?, + )) + }) + .collect::>()?, + }) } fn check_compatible(&self, existing: &Self) -> SetupStateCompatibility { @@ -619,7 +706,7 @@ struct KeyConstraint { impl KeyConstraint { fn new(label: String, state: &NodeLabelSetupState) -> Self { Self { - label: label, + label, field_names: state.key_field_names.clone(), } } @@ -633,11 +720,17 @@ struct SetupStatusCheck { conn_spec: ConnectionSpec, data_clear: Option, + rel_constraint_to_delete: IndexSet, rel_constraint_to_create: IndexMap, node_constraint_to_delete: IndexSet, node_constraint_to_create: IndexMap, + rel_index_to_delete: IndexSet, + rel_index_to_create: IndexMap, + node_index_to_delete: IndexSet, + node_index_to_create: IndexMap, + change_type: SetupChangeType, } @@ -665,15 +758,23 @@ impl SetupStatusCheck { let mut old_rel_constraints = IndexSet::new(); let mut old_node_constraints = IndexSet::new(); + let mut old_rel_indexes = IndexSet::new(); + let mut old_node_indexes = IndexSet::new(); + for existing_version in existing.possible_versions() { old_rel_constraints.insert(existing_version.key_constraint_name.clone()); + old_rel_indexes.extend(existing_version.vector_indexes.keys().cloned()); for (_, node) in existing_version.nodes.iter() { old_node_constraints.insert(node.key_constraint_name.clone()); + old_node_indexes.extend(node.vector_indexes.keys().cloned()); } } let mut rel_constraint_to_create = IndexMap::new(); let mut node_constraint_to_create = IndexMap::new(); + let mut rel_index_to_create = IndexMap::new(); + let mut node_index_to_create = IndexMap::new(); + if let Some(desired_state) = desired_state { let rel_constraint = KeyConstraint { label: key.relationship.clone(), @@ -689,42 +790,74 @@ impl SetupStatusCheck { rel_constraint_to_create.insert(desired_state.key_constraint_name, rel_constraint); } - for (label, node) in desired_state.nodes.iter() { + for (index_name, vector_index) in desired_state.vector_indexes.into_iter() { + old_rel_indexes.shift_remove(&index_name); + if !existing.current.as_ref().map_or(false, |c| { + Some(&vector_index) == c.vector_indexes.get(&index_name) + }) { + rel_index_to_create.insert(index_name, vector_index); + } + } + + for (label, node) in desired_state.nodes.into_iter() { old_node_constraints.shift_remove(&node.key_constraint_name); if !existing .current .as_ref() .map(|c| { c.nodes - .get(label) + .get(&label) .map_or(false, |existing_node| node.is_compatible(existing_node)) }) .unwrap_or(false) { node_constraint_to_create.insert( node.key_constraint_name.clone(), - KeyConstraint::new(label.clone(), node), + KeyConstraint::new(label.clone(), &node), ); } + + for (index_name, vector_index) in node.vector_indexes.into_iter() { + old_node_indexes.shift_remove(&index_name); + if !existing.current.as_ref().map_or(false, |c| { + c.nodes.get(&label).map_or(false, |n| { + Some(&vector_index) == n.vector_indexes.get(&index_name) + }) + }) { + node_index_to_create.insert(index_name, vector_index); + } + } } } let rel_constraint_to_delete = old_rel_constraints; let node_constraint_to_delete = old_node_constraints; + let rel_index_to_delete = old_rel_indexes; + let node_index_to_delete = old_node_indexes; let change_type = if data_clear.is_none() && rel_constraint_to_delete.is_empty() && rel_constraint_to_create.is_empty() && node_constraint_to_delete.is_empty() && node_constraint_to_create.is_empty() + && rel_index_to_delete.is_empty() + && rel_index_to_create.is_empty() + && node_index_to_delete.is_empty() + && node_index_to_create.is_empty() { SetupChangeType::NoChange } else if data_clear.is_none() && rel_constraint_to_delete.is_empty() && node_constraint_to_delete.is_empty() + && rel_index_to_delete.is_empty() + && node_index_to_delete.is_empty() { SetupChangeType::Create - } else if rel_constraint_to_create.is_empty() && node_constraint_to_create.is_empty() { + } else if rel_constraint_to_create.is_empty() + && node_constraint_to_create.is_empty() + && rel_index_to_create.is_empty() + && node_index_to_create.is_empty() + { SetupChangeType::Delete } else { SetupChangeType::Update @@ -738,6 +871,10 @@ impl SetupStatusCheck { rel_constraint_to_create, node_constraint_to_delete, node_constraint_to_create, + rel_index_to_delete, + rel_index_to_create, + node_index_to_delete, + node_index_to_create, change_type, } } @@ -776,6 +913,25 @@ impl ResourceSetupStatusCheck for SetupStatusCheck { node_constraint.field_names.join(", "), )); } + for name in &self.rel_index_to_delete { + result.push(format!("Delete relationship index {}", name)); + } + for (name, vector_index) in self.rel_index_to_create.iter() { + result.push(format!( + "Create VECTOR INDEX {} (vector_size: {}, metric: {}) ON RELATIONSHIP {}", + name, vector_index.vector_size, vector_index.metric, vector_index.label + )); + } + for name in &self.node_index_to_delete { + result.push(format!("Delete node index {}", name)); + } + for (name, vector_index) in self.node_index_to_create.iter() { + result.push(format!( + "Create VECTOR INDEX {} (vector_size: {}, metric: {}) ON NODE {}", + name, vector_index.vector_size, vector_index.metric, vector_index.label + )); + } + result } @@ -830,11 +986,19 @@ impl ResourceSetupStatusCheck for SetupStatusCheck { (self.rel_constraint_to_delete.iter()).chain(self.node_constraint_to_delete.iter()) { graph - .run(neo4rs::query(&format!("DROP CONSTRAINT {name}"))) + .run(neo4rs::query(&format!("DROP CONSTRAINT {name} IF EXISTS"))) + .await?; + } + for name in (self.rel_index_to_delete.iter()).chain(self.node_index_to_delete.iter()) { + graph + .run(neo4rs::query(&format!("DROP INDEX {name} IF EXISTS"))) .await?; } for (name, constraint) in self.node_constraint_to_create.iter() { + graph + .run(neo4rs::query(&format!("DROP CONSTRAINT {name} IF EXISTS"))) + .await?; graph .run(neo4rs::query(&format!( "CREATE CONSTRAINT {name} IF NOT EXISTS FOR (n:{label}) REQUIRE {field_names} IS UNIQUE", @@ -845,6 +1009,9 @@ impl ResourceSetupStatusCheck for SetupStatusCheck { } for (name, constraint) in self.rel_constraint_to_create.iter() { + graph + .run(neo4rs::query(&format!("DROP CONSTRAINT {name} IF EXISTS"))) + .await?; graph .run(neo4rs::query(&format!( "CREATE CONSTRAINT {name} IF NOT EXISTS FOR ()-[e:{label}]-() REQUIRE {field_names} IS UNIQUE", @@ -853,6 +1020,55 @@ impl ResourceSetupStatusCheck for SetupStatusCheck { ))) .await?; } + + let build_create_vector_index_query = |name: &str, + index_state: &VectorIndexState, + matcher: &str, + arg_name: &str| + -> Result { + let metric = match index_state.metric { + spec::VectorSimilarityMetric::CosineSimilarity => "cosine", + spec::VectorSimilarityMetric::L2Distance => "euclidean", + _ => api_bail!( + "Unsupported vector similarity metric in Neo4j: {}", + index_state.metric + ), + }; + let query = format!( + r#"CREATE VECTOR INDEX {name} IF NOT EXISTS FOR {matcher} ON {arg_name}.{field_name} OPTIONS + {{ indexConfig: {{`vector.dimensions`: {vector_size}, `vector.similarity_function`: '{metric}'}}}}"#, + field_name = index_state.field_name, + vector_size = index_state.vector_size, + ); + Ok(query) + }; + for (name, vector_index) in self.rel_index_to_create.iter() { + graph + .run(neo4rs::query(&format!("DROP INDEX {name} IF EXISTS"))) + .await?; + graph + .run(neo4rs::query(&build_create_vector_index_query( + name, + vector_index, + &format!("()-[r:{}]-()", vector_index.label), + "r", + )?)) + .await?; + } + for (name, vector_index) in self.node_index_to_create.iter() { + graph + .run(neo4rs::query(&format!("DROP INDEX {name} IF EXISTS"))) + .await?; + graph + .run(neo4rs::query(&build_create_vector_index_query( + name, + vector_index, + &format!("(n:{})", vector_index.label), + "n", + )?)) + .await?; + } + Ok(()) } } @@ -963,19 +1179,15 @@ impl StorageFactoryBase for RelationshipFactory { spec: RelationshipSpec, key_fields_schema: Vec, value_fields_schema: Vec, - _storage_options: IndexOptions, + index_options: IndexOptions, context: Arc, ) -> Result> { let setup_key = GraphRelationship::from_spec(&spec); - let desired_setup_state = RelationshipSetupState::from_spec( - &spec, - key_fields_schema.iter().map(|f| f.name.clone()).collect(), - ); let mut src_label_analyzer = NodeLabelAnalyzer::new(&spec, &spec.source)?; let mut tgt_label_analyzer = NodeLabelAnalyzer::new(&spec, &spec.target)?; let mut rel_value_fields_info = vec![]; - for (field_idx, field_schema) in value_fields_schema.into_iter().enumerate() { + for (field_idx, field_schema) in value_fields_schema.iter().enumerate() { if !src_label_analyzer.process_field(field_idx, &field_schema) && !tgt_label_analyzer.process_field(field_idx, &field_schema) { @@ -989,6 +1201,15 @@ impl StorageFactoryBase for RelationshipFactory { let src_label_info = src_label_analyzer.build()?; let tgt_label_info = tgt_label_analyzer.build()?; + let desired_setup_state = RelationshipSetupState::new( + &spec, + key_fields_schema.iter().map(|f| f.name.clone()).collect(), + &index_options, + &rel_value_fields_info, + &src_label_info, + &tgt_label_info, + )?; + let conn_spec = context .auth_registry .get::(&spec.connection)?;