Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(interactive): fix test error of groot and unsafe static reference #3469

Merged
merged 8 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/gaia.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ jobs:
- name: Store Unit Test
run: |
cd ${GITHUB_WORKSPACE}/interactive_engine/executor/store/exp_store && cargo test
# TODO: fix ut in groot
# cd ${GITHUB_WORKSPACE}/interactive_engine/executor/store/groot && cargo test
cd ${GITHUB_WORKSPACE}/interactive_engine/executor/store/groot && cargo test
# TODO: add ut in global_query
# cd ${GITHUB_WORKSPACE}/interactive_engine/executor/store/global_query && cargo test

Expand Down
10 changes: 6 additions & 4 deletions interactive_engine/executor/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ members = [

[profile.release]
opt-level = 3
debug = true
debug = false
rpath = false
lto = true
debug-assertions = false
Expand All @@ -22,7 +22,9 @@ opt-level = 0
debug = true
rpath = false
lto = false
debug-assertions = true
# TODO(siyuan): re-enable debug assertions by addressing the reports for misaligned pointer
# dereferences https://github.com/rust-lang/rust/pull/98112/
# TODO(longbin): Recommend re-implementing encoder/decoder of groot using bincode
debug-assertions = false
codegen-units=1
# Don't change to "abort", since runtime rely on this to catch unexpected errors in worker threads.
panic = "unwind"
panic = "abort"
3 changes: 0 additions & 3 deletions interactive_engine/executor/store/exp_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,3 @@ timely = "0.10"
vec_map = { version = "0.8.2", features = ["serde"] }
walkdir = "2"

[profile.release]
lto = true
panic = "abort"
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,29 @@ pub trait MultiVersionGraph {
type V: RocksVertex;
type E: RocksEdge;

/// Get vertex of given `vertex_id` at `si`, with given properties.
/// In the following interfaces, for properties,
/// * `None`: no properties
/// * `Some(vec![])`: all properties
/// * `Some(property_ids)`: given properties
fn get_vertex(
&self, snapshot_id: SnapshotId, vertex_id: VertexId, label_id: Option<LabelId>,
property_ids: Option<&Vec<PropertyId>>,
) -> GraphResult<Option<Self::V>>;

/// Get edge of given `edge_id` at `si`, with given properties.
fn get_edge(
&self, snapshot_id: SnapshotId, edge_id: EdgeId, edge_relation: Option<&EdgeKind>,
property_ids: Option<&Vec<PropertyId>>,
) -> GraphResult<Option<Self::E>>;

/// Scan vertices of given `label_id` at `si`, with given properties.
fn scan_vertex(
&self, snapshot_id: SnapshotId, label_id: Option<LabelId>, condition: Option<&Condition>,
property_ids: Option<&Vec<PropertyId>>,
) -> GraphResult<Records<Self::V>>;

/// Scan edges of given `label_id` at `si`, with given properties.
fn scan_edge(
&self, snapshot_id: SnapshotId, label_id: Option<LabelId>, condition: Option<&Condition>,
property_ids: Option<&Vec<PropertyId>>,
Expand Down
79 changes: 47 additions & 32 deletions interactive_engine/executor/store/groot/src/db/api/property.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use super::error::*;
use super::GraphResult;
use crate::db::api::PropertyId;
use crate::db::common::bytes::transform;
use crate::db::common::bytes::util::{UnsafeBytesReader, UnsafeBytesWriter};
use crate::db::common::bytes::util::{UnsafeBytesReader, UnsafeBytesWriter, LEN32_SIZE, LEN_SIZE};
use crate::db::common::numeric::*;
use crate::db::proto::schema_common::{DataTypePb, PropertyValuePb};

Expand Down Expand Up @@ -293,12 +293,12 @@ impl<'a> ValueRef<'a> {

fn check_numeric_array(&self, value_type: ValueType) -> GraphResult<UnsafeBytesReader> {
let reader = UnsafeBytesReader::new(self.data);
let len = match value_type {
ValueType::IntList | ValueType::FloatList => 4,
ValueType::LongList | ValueType::DoubleList => 8,
let unit_size = match value_type {
ValueType::IntList | ValueType::FloatList => ::std::mem::size_of::<u32>(),
ValueType::LongList | ValueType::DoubleList => ::std::mem::size_of::<u64>(),
_ => unreachable!(),
};
if reader.read_i32(0).to_be() * len + 4 == self.data.len() as i32 {
if reader.read_u64(0).to_be() as usize * unit_size + LEN_SIZE == self.data.len() {
return Ok(reader);
}
let msg = format!("invalid {:?} bytes, data len is {}", value_type, self.data.len());
Expand All @@ -310,9 +310,11 @@ impl<'a> ValueRef<'a> {
/// in valid utf8 and when user extract the str, the process will be panic
fn weak_check_str_list(&self) -> GraphResult<UnsafeBytesReader> {
let reader = UnsafeBytesReader::new(self.data);
let len = reader.read_i32(0).to_be() as usize;
let total_len = reader.read_i32(4 * len).to_be() as usize;
if total_len == self.data.len() - (4 + 4 * len) {
let len = reader.read_u64(0).to_be() as usize;
let total_len = reader
.read_u32(LEN_SIZE + LEN32_SIZE * (len - 1))
.to_be() as usize;
if total_len == self.data.len() - (LEN_SIZE + LEN32_SIZE * len) {
return Ok(reader);
}
let msg = format!("invalid str array bytes");
Expand Down Expand Up @@ -522,31 +524,31 @@ fn get_double(data: &[u8]) -> f64 {
/// +-----+----+----+-----+----+
/// | len | x1 | x2 | ... | xn |
/// +-----+----+----+-----+----+
/// | 4B | 4B | 4B | ... | 4B |
/// | 8B | 4B | 4B | ... | 4B |
/// +-----+----+----+-----+----+ len and every xi is in int format above
/// long array:
/// +-----+----+----+-----+----+
/// | len | x1 | x2 | ... | xn |
/// +-----+----+----+-----+----+
/// | 4B | 8B | 8B | ... | 8B |
/// | 8B | 8B | 8B | ... | 8B |
/// +-----+----+----+-----+----+ len is in int format above and every xi is in long format above
/// float array:
/// +-----+----+----+-----+----+
/// | len | x1 | x2 | ... | xn |
/// +-----+----+----+-----+----+
/// | 4B | 4B | 4B | ... | 4B |
/// | 8B | 4B | 4B | ... | 4B |
/// +-----+----+----+-----+----+ len is in int format above and every xi is in float format above
/// double array:
/// +-----+----+----+-----+----+
/// | len | x1 | x2 | ... | xn |
/// +-----+----+----+-----+----+
/// | 4B | 8B | 8B | ... | 8B |
/// | 8B | 8B | 8B | ... | 8B |
/// +-----+----+----+-----+----+ len is in int format above and every xi is in double format above
/// string array:
/// +-----+------+------+-----+------+------+------+-----+------+
/// | len | off1 | off2 | ... | offn | str1 | str2 | ... | strn |
/// +-----+------+------+-----+------+------+------+-----+------+
/// | 4B | 4B | 4B | ... | 4B | x1 B | x2 B | ... | xn B |
/// | 8B | 4B | 4B | ... | 4B | x1 B | x2 B | ... | xn B |
/// +-----+------+------+-----+------+------+------+-----+------+
/// len and offi is in int format above, stri is in string format above
/// off1 == x1 means it's str1's end offset
Expand All @@ -562,15 +564,15 @@ pub struct Value {
macro_rules! gen_array {
($arr:ident, $ty:ty, $func:tt) => {{
let size = ::std::mem::size_of::<$ty>();
let total_len = $arr.len() * size + 4;
let total_len = $arr.len() * size + LEN_SIZE;
let mut data = Vec::with_capacity(total_len);
unsafe {
data.set_len(total_len);
}
let mut writer = UnsafeBytesWriter::new(&mut data);
writer.write_i32(0, ($arr.len() as i32).to_be());
writer.write_u64(0, ($arr.len() as u64).to_be());
for i in 0..$arr.len() {
writer.$func(4 + size * i, $arr[i].to_big_endian());
writer.$func(LEN_SIZE + size * i, $arr[i].to_big_endian());
}
data
}};
Expand Down Expand Up @@ -661,7 +663,7 @@ impl Value {
}

pub fn string_list(v: &[String]) -> Self {
let mut size = 4 + 4 * v.len();
let mut size = LEN_SIZE + LEN32_SIZE * v.len();
for s in v {
size += s.len();
}
Expand All @@ -670,13 +672,13 @@ impl Value {
data.set_len(size);
}
let mut writer = UnsafeBytesWriter::new(&mut data);
writer.write_i32(0, (v.len() as i32).to_be());
writer.write_u64(0, (v.len() as u64).to_be());
let mut off = 0;
let mut pos = 4;
let mut pos = LEN_SIZE;
for s in v {
off += s.len() as i32;
writer.write_i32(pos, off.to_be());
pos += 4;
off += s.len() as u32;
writer.write_u32(pos, off.to_be());
pos += LEN32_SIZE;
}
for s in v {
writer.write_bytes(pos, s.as_bytes());
Expand Down Expand Up @@ -874,13 +876,13 @@ impl<'a, T> NumericArray<'a, T> {

impl<'a, T: ToBigEndian> NumericArray<'a, T> {
fn new(reader: UnsafeBytesReader<'a>) -> Self {
let len = reader.read_i32(0).to_be() as usize;
let len = reader.read_u64(0).to_be() as usize;
NumericArray { reader, len, _phantom: Default::default() }
}

pub fn get(&self, idx: usize) -> Option<T> {
if idx < self.len {
let offset = 4 + ::std::mem::size_of::<T>() * idx;
let offset = LEN_SIZE + ::std::mem::size_of::<T>() * idx;
let tmp = *self.reader.read_ref::<T>(offset);
return Some(tmp.to_big_endian());
}
Expand Down Expand Up @@ -978,23 +980,36 @@ pub struct StrArray<'a> {

impl<'a> StrArray<'a> {
fn new(reader: UnsafeBytesReader<'a>) -> Self {
let len = reader.read_i32(0).to_be() as usize;
let len = reader.read_u64(0).to_be() as usize;
StrArray { reader, len }
}

pub fn get(&self, idx: usize) -> Option<&str> {
if idx < self.len {
let str_start_off = 4 + 4 * self.len;
let start_off =
if idx == 0 { 0 } else { self.reader.read_i32(4 + (idx - 1) * 4).to_be() as usize };
let end_off = self.reader.read_i32(4 + idx * 4).to_be() as usize;
let str_start_off = LEN_SIZE + LEN32_SIZE * self.len;
let start_off = if idx == 0 {
0
} else {
self.reader
.read_u32(LEN_SIZE + LEN32_SIZE * (idx - 1))
.to_be() as usize
};
let end_off = self
.reader
.read_u32(LEN_SIZE + LEN32_SIZE * idx)
.to_be() as usize;
let len = end_off - start_off;
let offset = str_start_off + start_off;
let data = self.reader.read_bytes(offset, len);
let ret = ::std::str::from_utf8(data).expect("data in str array is in valid utf8");
return Some(ret);
if let Ok(ret) = ::std::str::from_utf8(data) {
Some(ret)
} else {
error!("data in a str array is invalid utf8");
None
}
} else {
None
}
None
}

pub fn len(&self) -> usize {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ use protobuf::Message;
use crate::db::api::GraphErrorCode::InvalidData;
use crate::db::api::{GraphError, GraphResult};

/// define the size that a length field takes in encoding an array
pub const LEN_SIZE: usize = ::std::mem::size_of::<u64>();
/// half of LEN_SIZE
pub const LEN32_SIZE: usize = ::std::mem::size_of::<u32>();

/// This reader won't check whether the offset is overflow when read bytes.
/// It's for performance purpose. Be careful to use it.
#[derive(Clone)]
Expand Down Expand Up @@ -122,9 +127,9 @@ mod tests {
($r_func:ident, $w_func:ident, $ty:ty) => {
let mut buf = Vec::with_capacity(100);
let mut writer = UnsafeBytesWriter::new(&mut buf);
writer.$w_func(20, 1.0 as $ty);
writer.$w_func(16, 1.0 as $ty);
let reader = UnsafeBytesReader::new(&buf);
assert_eq!(reader.$r_func(20), 1.0 as $ty);
assert_eq!(reader.$r_func(16), 1.0 as $ty);
};
}

Expand Down
Loading
Loading