Skip to content

Commit

Permalink
groot support pk index (#1471)
Browse files Browse the repository at this point in the history
  • Loading branch information
tianliplus committed Apr 21, 2022
1 parent ca9c7ad commit 4de8e6c
Show file tree
Hide file tree
Showing 6 changed files with 532 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
/**
* Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* <p>http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.maxgraph.common.util;
Expand All @@ -20,13 +18,11 @@

/**
* murmur hash 2.0.
* <p>
* The murmur hash is a relatively fast hash function from
* http://murmurhash.googlepages.com/ for platforms with efficient
* multiplication.
* <p>
* This is a re-implementation of the original C code plus some additional
* features.
*
* <p>The murmur hash is a relatively fast hash function from http://murmurhash.googlepages.com/ for
* platforms with efficient multiplication.
*
* <p>This is a re-implementation of the original C code plus some additional features.
*/
public final class PkHashUtils {

Expand All @@ -48,17 +44,16 @@ public static long hash(int labelId, List<byte[]> pks) {
/**
* Generates 64 bit hash from byte array of the given length and seed.
*
* @param data byte array to hash
* @param data byte array to hash
* @param length length of the array to hash
* @param seed initial seed value
* @param seed initial seed value
* @return 64 bit hash of the given array
*/
private static long hash64(final byte[] data, int length, int seed) {
final long m = 0xc6a4a7935bd1e995L;
final int r = 47;

long h = (seed & 0xffffffffL) ^ (length * m);

int length8 = length / 8;

for (int i = 0; i < length8; i++) {
Expand Down Expand Up @@ -109,7 +104,7 @@ private static long hash64(final byte[] data, int length, int seed) {
/**
* Generates 64 bit hash from byte array with default seed value.
*
* @param data byte array to hash
* @param data byte array to hash
* @param length length of the array to hash
* @return 64 bit hash of the given string
*/
Expand Down
4 changes: 4 additions & 0 deletions interactive_engine/executor/runtime/src/store/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1454,6 +1454,10 @@ impl GraphPartitionManager for VineyardPartitionManager {
}
return Some((partition_id as u32, vertex_id));
}

fn get_vertex_id_by_primary_keys(&self, _label_id: LabelId, _pks: &[Property]) -> Option<VertexId> {
unimplemented!()
}
}

unsafe impl Send for VineyardPartitionManager {}
Expand Down
132 changes: 121 additions & 11 deletions interactive_engine/executor/runtime/src/store/groot/global_graph.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
//
//! Copyright 2020 Alibaba Group Holding Limited.
//!
//!
//! Licensed under the Apache License, Version 2.0 (the "License");
//! you may not use this file except in compliance with the License.
//! You may obtain a copy of the License at
//!
//!
//! http://www.apache.org/licenses/LICENSE-2.0
//!
//!
//! Unless required by applicable law or agreed to in writing, software
//! distributed under the License is distributed on an "AS IS" BASIS,
//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -30,8 +30,15 @@ use maxgraph_store::db::graph::store::GraphStore;
use store::groot::global_graph_schema::GlobalGraphSchema;
use maxgraph_store::db::graph::entity::{RocksVertexImpl, RocksEdgeImpl};
use maxgraph_store::db::api::multi_version_graph::MultiVersionGraph;
use maxgraph_store::db::api::types::RocksEdge;
use maxgraph_store::db::api::types::{RocksEdge, PropertyValue};
use maxgraph_store::db::storage::RawBytes;
use byteorder::{WriteBytesExt, BigEndian};
use std::io::Write;
use utils::hash;
use maxgraph_store::db::graph::{hash64, get_vertex_id_by_primary_keys};
use std::cell::{RefCell, RefMut};
use std::borrow::BorrowMut;
use std::ops::Deref;

pub struct GlobalGraph {
graph_partitions: Arc<HashMap<PartitionId, Arc<GraphStore>>>,
Expand All @@ -40,6 +47,7 @@ pub struct GlobalGraph {
}

unsafe impl Send for GlobalGraph {}

unsafe impl Sync for GlobalGraph {}

impl GlobalGraph {
Expand Down Expand Up @@ -97,7 +105,7 @@ impl GlobalGraphQuery for GlobalGraph {

fn get_out_edges(&self, si: SnapshotId, src_ids: Vec<PartitionVertexIds>, edge_labels: &Vec<LabelId>, condition: Option<&Condition>,
dedup_prop_ids: Option<&Vec<PropId>>, output_prop_ids: Option<&Vec<PropId>>, limit: usize) -> Box<dyn Iterator<Item=(VertexId, Self::EI)>> {
let mut res: Vec<(VertexId, Self::EI)> = Vec::new();
let mut res: Vec<(VertexId, Self::EI)> = Vec::new();
for (partition_id, vertex_ids) in src_ids {
if let Some(store) = self.graph_partitions.get(&partition_id) {
for vertex_id in vertex_ids {
Expand Down Expand Up @@ -178,8 +186,8 @@ impl GlobalGraphQuery for GlobalGraph {
}))
}

fn get_edge_properties(&self, si: SnapshotId, ids: Vec<PartitionLabeledVertexIds>, output_prop_ids: Option<&Vec<PropId>>)
-> Self::EI {
fn get_edge_properties(&self, si: SnapshotId, ids: Vec<PartitionLabeledVertexIds>, output_prop_ids: Option<&Vec<PropId>>)
-> Self::EI {
unimplemented!()
}

Expand All @@ -197,9 +205,9 @@ impl GlobalGraphQuery for GlobalGraph {
if let Some(partition) = self.graph_partitions.get(&pid) {
if labels.is_empty() {
res = Box::new(res.chain(partition.scan_vertex(si,
None,
condition,
output_property_ids).unwrap()
None,
condition,
output_property_ids).unwrap()
.take(Self::get_limit(limit))
.map(|v| v.unwrap())))
} else {
Expand Down Expand Up @@ -272,6 +280,10 @@ impl GlobalGraphQuery for GlobalGraph {
}
}

thread_local! {
static FIELD_BUF: RefCell<Vec<u8>> = RefCell::new(Vec::with_capacity(64 << 10));
}

impl GraphPartitionManager for GlobalGraph {
fn get_partition_id(&self, vid: i64) -> i32 {
let partition_count = self.total_partition;
Expand All @@ -283,13 +295,111 @@ impl GraphPartitionManager for GlobalGraph {
}

fn get_process_partition_list(&self) -> Vec<u32> {
self.graph_partitions.keys().into_iter().map(|x|*x).collect::<Vec<u32>>()
self.graph_partitions.keys().into_iter().map(|x| *x).collect::<Vec<u32>>()
}

fn get_vertex_id_by_primary_key(&self, label_id: u32, key: &String) -> Option<(u32, i64)> {
// TODO check
None
}

fn get_vertex_id_by_primary_keys(&self, label_id: LabelId, pks: &[Property]) -> Option<VertexId> {
Some(FIELD_BUF.with(|data| {
let pks_bytes = pks.iter().map(|pk| {
let mut data = data.borrow_mut();
data.clear();
match pk {
Property::Bool(v) => {
data.write_u8(*v as u8).unwrap();
}
Property::Char(v) => {
data.write_u8(*v as u8).unwrap();
}
Property::Short(v) => {
data.write_i16::<BigEndian>(*v).unwrap();
}
Property::Int(v) => {
data.write_i32::<BigEndian>(*v).unwrap();
}
Property::Long(v) => {
data.write_i64::<BigEndian>(*v).unwrap();
}
Property::Float(v) => {
data.write_f32::<BigEndian>(*v).unwrap();
}
Property::Double(v) => {
data.write_f64::<BigEndian>(*v).unwrap();
}
Property::Bytes(v) => {
data.extend_from_slice(v);
}
Property::String(v) => {
data.extend(v.as_bytes());
}
Property::Date(v) => {
data.extend(v.as_bytes());
}
Property::ListInt(v) => {
data.write_i32::<BigEndian>(v.len() as i32).unwrap();
for i in v {
data.write_i32::<BigEndian>(*i).unwrap();
}
}
Property::ListLong(v) => {
data.write_i32::<BigEndian>(v.len() as i32).unwrap();
for i in v {
data.write_i64::<BigEndian>(*i).unwrap();
}
}
Property::ListFloat(v) => {
data.write_i32::<BigEndian>(v.len() as i32).unwrap();
for i in v {
data.write_f32::<BigEndian>(*i).unwrap();
}
}
Property::ListDouble(v) => {
data.write_i32::<BigEndian>(v.len() as i32).unwrap();
for i in v {
data.write_f64::<BigEndian>(*i).unwrap();
}
}
Property::ListString(v) => {
data.write_i32::<BigEndian>(v.len() as i32).unwrap();
let mut offset = 0;
let mut bytes_vec = Vec::with_capacity(v.len());
for i in v {
let b = i.as_bytes();
bytes_vec.push(b);
offset += b.len();
data.write_i32::<BigEndian>(offset as i32).unwrap();
}
for b in bytes_vec {
data.write(b).unwrap();
}
}
Property::ListBytes(v) => {
data.write_i32::<BigEndian>(v.len() as i32).unwrap();
let mut offset = 0;
for i in v {
offset += i.len();
data.write_i32::<BigEndian>(offset as i32).unwrap();
}
for i in v {
data.write(i.as_slice()).unwrap();
}
}
Property::Null => {
unimplemented!()
}
Property::Unknown => {
unimplemented!()
}
}
data
});
get_vertex_id_by_primary_keys(label_id as i32, pks_bytes) as VertexId
}))
}
}

fn floor_div(x: i64, y: i64) -> i64 {
Expand Down
10 changes: 10 additions & 0 deletions interactive_engine/executor/store/src/api/graph_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@

use crate::api::{VertexId, PartitionId, Vertex, Edge, MVGraph, LabelId};
use std::sync::Arc;
use crate::api::property::Property;

// Partition manager for graph query
pub trait GraphPartitionManager: Send + Sync {
fn get_partition_id(&self, vid: VertexId) -> i32;
fn get_server_id(&self, pid: PartitionId) -> Option<u32>;
fn get_process_partition_list(&self) -> Vec<PartitionId>;
fn get_vertex_id_by_primary_key(&self, label_id: LabelId, key: &String) -> Option<(PartitionId, VertexId)>;
fn get_vertex_id_by_primary_keys(&self, label_id: LabelId, pks: &[Property]) -> Option<VertexId>;
}

pub struct ConstantPartitionManager {
Expand Down Expand Up @@ -63,6 +65,10 @@ impl GraphPartitionManager for ConstantPartitionManager {
fn get_vertex_id_by_primary_key(&self, _label_id: u32, _key: &String) -> Option<(u32, i64)> {
None
}

fn get_vertex_id_by_primary_keys(&self, _label_id: LabelId, _pks: &[Property]) -> Option<VertexId> {
unimplemented!()
}
}

pub struct FixedStorePartitionManager<V, VI, E, EI>
Expand Down Expand Up @@ -110,4 +116,8 @@ impl<V, VI, E, EI> GraphPartitionManager for FixedStorePartitionManager<V, VI, E
fn get_vertex_id_by_primary_key(&self, _label_id: u32, _key: &String) -> Option<(u32, i64)> {
None
}

fn get_vertex_id_by_primary_keys(&self, _label_id: LabelId, _pks: &[Property]) -> Option<VertexId> {
unimplemented!()
}
}

0 comments on commit 4de8e6c

Please sign in to comment.