Skip to content

Commit

Permalink
[GIE/Store] Reduce memory footprint in exp_store (#2245)
Browse files Browse the repository at this point in the history
* [GIE/Store] Reduce memory usage in exp_store

* [GIE/Store] Reduce memory usage in exp_store

* [GIE/Store] Compatible with previous binary data.
  • Loading branch information
longbinlai committed Nov 26, 2022
1 parent e375f8d commit 1c4c1cd
Show file tree
Hide file tree
Showing 6 changed files with 428 additions and 33 deletions.
Expand Up @@ -15,13 +15,14 @@
//!

use ahash::{HashMap, HashMapExt};
use indexmap::map::IndexMap;
// use indexmap::map::IndexMap;
use itertools::Itertools;
use petgraph::graph::IndexType;
use petgraph::prelude::{DiGraph, Direction, EdgeIndex, NodeIndex};

use crate::common::{Label, LabelId, INVALID_LABEL_ID};
use crate::graph_db::labeled_topo::{LabeledTopology, MutLabeledTopology};
use crate::sorted_map::SortedMap;
use crate::utils::{Iter, IterList};

struct MutEdgeVec<I: IndexType> {
Expand Down Expand Up @@ -63,51 +64,70 @@ impl<I: IndexType> From<MutEdgeVec<I>> for EdgeVec<I> {
offsets_no_label[node] += offsets_no_label[node - 1];
}
for (node, offset) in offsets_no_label.drain(..).enumerate() {
if offsets[node].inner.is_empty() {
offsets[node]
.inner
.insert(INVALID_LABEL_ID, (I::new(offset), I::default()));
} else {
for (_, range) in offsets[node].inner.iter_mut() {
range.0 = I::new(range.0.index() + offset);
}
}
offsets[node].update_by_offset(offset);
}

let mut edge_vec = EdgeVec { offsets, edges };
let mut edge_vec = EdgeVec {
offsets: offsets
.into_iter()
.map(|x| x.into_immutable())
.collect(),
edges,
};
edge_vec.shrink_to_fit();

edge_vec
}
}

#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
struct RangeByLabel<K: IndexType, V: IndexType> {
#[derive(Default, Clone, Serialize, Deserialize)]
struct RangeByLabel<K: IndexType, V: IndexType, C: AsRef<[(K, (V, V))]> = Vec<(K, (V, V))>> {
/// K -> the key refers to label
/// (V, V) -> the first element refers to the starting index in the sparse row,
/// -> the second element refers to the size of the elements regarding the given `K`
inner: IndexMap<K, (V, V)>,
inner: SortedMap<K, (V, V), C>,
}

impl<K: IndexType, V: IndexType> RangeByLabel<K, V> {
impl<K: IndexType, V: IndexType, C: AsRef<[(K, (V, V))]>> RangeByLabel<K, V, C> {
#[inline]
pub fn get_index(&self) -> Option<usize> {
self.inner.first().map(|(_, r)| r.0.index())
self.inner
.get_entry_at(0)
.map(|(_, r)| r.0.index())
}

#[inline]
pub fn get_range(&self, label: K) -> Option<(usize, usize)> {
self.inner
.get(&label)
.map(|range| (range.0.index(), range.0.index() + range.1.index()))
}
}

impl<K: IndexType, V: IndexType> RangeByLabel<K, V> {
#[inline]
pub fn update_by_offset(&mut self, offset: usize) {
if self.inner.is_empty() {
self.inner
.insert(K::new(INVALID_LABEL_ID as usize), (V::new(offset), V::default()));
} else {
for (_, range) in self.inner.iter_mut() {
range.0 = V::new(range.0.index() + offset);
}
}
}

pub fn into_immutable(self) -> RangeByLabel<K, V, Box<[(K, (V, V))]>> {
RangeByLabel { inner: self.inner.into_immutable() }
}
}
/// This is an extension of a typical CSR structure to support label indexing.
#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
#[derive(Default, Clone, Serialize, Deserialize)]
struct EdgeVec<I: IndexType> {
/// * `offsets[i]`: maintain the adjacent edges of the node of id i,
/// * `offsets[i][j]` maintains the start and end indices of the adjacent edges of
/// the label j for node i, if node i has connection to the edge of label j
offsets: Vec<RangeByLabel<LabelId, I>>,
offsets: Vec<RangeByLabel<LabelId, I, Box<[(LabelId, (I, I))]>>>,
/// A vector to maintain edges' id
edges: Vec<EdgeIndex<I>>,
}
Expand Down Expand Up @@ -159,13 +179,14 @@ impl<I: IndexType + Send + Sync> EdgeVec<I> {
&[]
}
} else {
let start = self.offsets[node.index()]
.get_index()
.unwrap_or(0);
let end = self.offsets[node.index() + 1]
.get_index()
.unwrap_or(0);
&self.edges[start..end]
if let Some(start) = self.offsets[node.index()].get_index() {
let end = self.offsets[node.index() + 1]
.get_index()
.unwrap_or(start);
&self.edges[start..end]
} else {
&[]
}
}
} else {
&[]
Expand Down Expand Up @@ -195,14 +216,14 @@ impl<I: IndexType + Send + Sync> EdgeVec<I> {
}
}

#[derive(Debug, Default, Clone, Serialize, Deserialize)]
#[derive(Default, Clone, Serialize, Deserialize)]
/// To maintain edges of both directions in a directed graph
struct BiDirEdges<I: IndexType> {
incoming: EdgeVec<I>,
outgoing: EdgeVec<I>,
}

#[derive(Debug, Default, Clone, Serialize, Deserialize)]
#[derive(Debug, Default, Copy, Clone, Serialize, Deserialize)]
struct EdgeTuple<I: IndexType> {
start_node: NodeIndex<I>,
end_node: NodeIndex<I>,
Expand Down Expand Up @@ -301,7 +322,7 @@ impl<I: IndexType> From<MutTopo<I>> for CsrTopo<I> {
}
}

#[derive(Debug, Default, Clone, Serialize, Deserialize)]
#[derive(Default, Clone, Serialize, Deserialize)]
/// Record the topology of a graph in a CSR format
pub struct CsrTopo<I: IndexType> {
/// To maintain the label of nodes
Expand Down Expand Up @@ -441,6 +462,12 @@ impl<I: IndexType + Sync + Send> LabeledTopology for CsrTopo<I> {
}),
)
}

#[inline]
fn shrink_to_fit(&mut self) {
self.nodes.shrink_to_fit();
self.edges.shrink_to_fit();
}
}

#[cfg(test)]
Expand Down
Expand Up @@ -400,6 +400,21 @@ where
.contains_key(&global_id)
}

pub fn shrink_to_fit(&mut self) {
self.topology.shrink_to_fit();
self.vertex_prop_table.shrink_to_fit();
self.edge_prop_table.shrink_to_fit();
self.index_data.shrink_to_fit();
}

pub fn get_topology(&self) -> &T {
&self.topology
}

pub fn into_topology(self) -> T {
self.topology
}

/// Print the statistics for debugging
pub fn print_statistics(&self) {
println!("Statics of the graph in partition: {}", self.partition);
Expand Down
Expand Up @@ -70,6 +70,8 @@ pub trait LabeledTopology {
fn get_adjacent_nodes_of_labels_iter(
&self, node: NodeIndex<Self::I>, labels: Vec<LabelId>, dir: Direction,
) -> Iter<NodeIndex<Self::I>>;

fn shrink_to_fit(&mut self);
}

pub trait MutLabeledTopology {
Expand Down Expand Up @@ -242,6 +244,12 @@ impl<I: IndexType + Send + Sync> LabeledTopology for PGWrapper<I> {
}),
)
}

#[inline]
fn shrink_to_fit(&mut self) {
self.inner.shrink_to_fit();
self.adjacent_label_indices.shrink_to_fit();
}
}

impl<I: IndexType + Send + Sync> MutLabeledTopology for PGWrapper<I> {
Expand Down
1 change: 1 addition & 0 deletions interactive_engine/executor/store/exp_store/src/lib.rs
Expand Up @@ -23,6 +23,7 @@ pub mod ldbc;
pub mod parser;
pub mod prelude;
pub mod schema;
pub mod sorted_map;
pub mod table;
pub mod utils;

Expand Down

0 comments on commit 1c4c1cd

Please sign in to comment.