Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GIE/Store] Reduce memory footprint in exp_store #2245

Merged
merged 3 commits into from
Nov 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
//!

use ahash::{HashMap, HashMapExt};
use indexmap::map::IndexMap;
// use indexmap::map::IndexMap;
use itertools::Itertools;
use petgraph::graph::IndexType;
use petgraph::prelude::{DiGraph, Direction, EdgeIndex, NodeIndex};

use crate::common::{Label, LabelId, INVALID_LABEL_ID};
use crate::graph_db::labeled_topo::{LabeledTopology, MutLabeledTopology};
use crate::sorted_map::SortedMap;
use crate::utils::{Iter, IterList};

struct MutEdgeVec<I: IndexType> {
Expand Down Expand Up @@ -63,51 +64,70 @@ impl<I: IndexType> From<MutEdgeVec<I>> for EdgeVec<I> {
offsets_no_label[node] += offsets_no_label[node - 1];
}
for (node, offset) in offsets_no_label.drain(..).enumerate() {
if offsets[node].inner.is_empty() {
offsets[node]
.inner
.insert(INVALID_LABEL_ID, (I::new(offset), I::default()));
} else {
for (_, range) in offsets[node].inner.iter_mut() {
range.0 = I::new(range.0.index() + offset);
}
}
offsets[node].update_by_offset(offset);
}

let mut edge_vec = EdgeVec { offsets, edges };
let mut edge_vec = EdgeVec {
offsets: offsets
.into_iter()
.map(|x| x.into_immutable())
.collect(),
edges,
};
edge_vec.shrink_to_fit();

edge_vec
}
}

#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
struct RangeByLabel<K: IndexType, V: IndexType> {
#[derive(Default, Clone, Serialize, Deserialize)]
struct RangeByLabel<K: IndexType, V: IndexType, C: AsRef<[(K, (V, V))]> = Vec<(K, (V, V))>> {
/// K -> the key refers to label
/// (V, V) -> the first element refers to the starting index in the sparse row,
/// -> the second element refers to the size of the elements regarding the given `K`
inner: IndexMap<K, (V, V)>,
inner: SortedMap<K, (V, V), C>,
}

impl<K: IndexType, V: IndexType> RangeByLabel<K, V> {
impl<K: IndexType, V: IndexType, C: AsRef<[(K, (V, V))]>> RangeByLabel<K, V, C> {
#[inline]
pub fn get_index(&self) -> Option<usize> {
self.inner.first().map(|(_, r)| r.0.index())
self.inner
.get_entry_at(0)
.map(|(_, r)| r.0.index())
}

#[inline]
pub fn get_range(&self, label: K) -> Option<(usize, usize)> {
self.inner
.get(&label)
.map(|range| (range.0.index(), range.0.index() + range.1.index()))
}
}

impl<K: IndexType, V: IndexType> RangeByLabel<K, V> {
#[inline]
pub fn update_by_offset(&mut self, offset: usize) {
if self.inner.is_empty() {
self.inner
.insert(K::new(INVALID_LABEL_ID as usize), (V::new(offset), V::default()));
} else {
for (_, range) in self.inner.iter_mut() {
range.0 = V::new(range.0.index() + offset);
}
}
}

pub fn into_immutable(self) -> RangeByLabel<K, V, Box<[(K, (V, V))]>> {
RangeByLabel { inner: self.inner.into_immutable() }
}
}
/// This is an extension of a typical CSR structure to support label indexing.
#[derive(Debug, Default, PartialEq, Eq, Clone, Serialize, Deserialize)]
#[derive(Default, Clone, Serialize, Deserialize)]
struct EdgeVec<I: IndexType> {
/// * `offsets[i]`: maintain the adjacent edges of the node of id i,
/// * `offsets[i][j]` maintains the start and end indices of the adjacent edges of
/// the label j for node i, if node i has connection to the edge of label j
offsets: Vec<RangeByLabel<LabelId, I>>,
offsets: Vec<RangeByLabel<LabelId, I, Box<[(LabelId, (I, I))]>>>,
/// A vector to maintain edges' id
edges: Vec<EdgeIndex<I>>,
}
Expand Down Expand Up @@ -159,13 +179,14 @@ impl<I: IndexType + Send + Sync> EdgeVec<I> {
&[]
}
} else {
let start = self.offsets[node.index()]
.get_index()
.unwrap_or(0);
let end = self.offsets[node.index() + 1]
.get_index()
.unwrap_or(0);
&self.edges[start..end]
if let Some(start) = self.offsets[node.index()].get_index() {
let end = self.offsets[node.index() + 1]
.get_index()
.unwrap_or(start);
&self.edges[start..end]
} else {
&[]
}
}
} else {
&[]
Expand Down Expand Up @@ -195,14 +216,14 @@ impl<I: IndexType + Send + Sync> EdgeVec<I> {
}
}

#[derive(Debug, Default, Clone, Serialize, Deserialize)]
#[derive(Default, Clone, Serialize, Deserialize)]
/// To maintain edges of both directions in a directed graph
struct BiDirEdges<I: IndexType> {
incoming: EdgeVec<I>,
outgoing: EdgeVec<I>,
}

#[derive(Debug, Default, Clone, Serialize, Deserialize)]
#[derive(Debug, Default, Copy, Clone, Serialize, Deserialize)]
struct EdgeTuple<I: IndexType> {
start_node: NodeIndex<I>,
end_node: NodeIndex<I>,
Expand Down Expand Up @@ -301,7 +322,7 @@ impl<I: IndexType> From<MutTopo<I>> for CsrTopo<I> {
}
}

#[derive(Debug, Default, Clone, Serialize, Deserialize)]
#[derive(Default, Clone, Serialize, Deserialize)]
/// Record the topology of a graph in a CSR format
pub struct CsrTopo<I: IndexType> {
/// To maintain the label of nodes
Expand Down Expand Up @@ -441,6 +462,12 @@ impl<I: IndexType + Sync + Send> LabeledTopology for CsrTopo<I> {
}),
)
}

#[inline]
fn shrink_to_fit(&mut self) {
self.nodes.shrink_to_fit();
self.edges.shrink_to_fit();
}
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,21 @@ where
.contains_key(&global_id)
}

pub fn shrink_to_fit(&mut self) {
self.topology.shrink_to_fit();
self.vertex_prop_table.shrink_to_fit();
self.edge_prop_table.shrink_to_fit();
self.index_data.shrink_to_fit();
}

pub fn get_topology(&self) -> &T {
&self.topology
}

pub fn into_topology(self) -> T {
self.topology
}

/// Print the statistics for debugging
pub fn print_statistics(&self) {
println!("Statics of the graph in partition: {}", self.partition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ pub trait LabeledTopology {
fn get_adjacent_nodes_of_labels_iter(
&self, node: NodeIndex<Self::I>, labels: Vec<LabelId>, dir: Direction,
) -> Iter<NodeIndex<Self::I>>;

fn shrink_to_fit(&mut self);
}

pub trait MutLabeledTopology {
Expand Down Expand Up @@ -242,6 +244,12 @@ impl<I: IndexType + Send + Sync> LabeledTopology for PGWrapper<I> {
}),
)
}

#[inline]
fn shrink_to_fit(&mut self) {
self.inner.shrink_to_fit();
self.adjacent_label_indices.shrink_to_fit();
}
}

impl<I: IndexType + Send + Sync> MutLabeledTopology for PGWrapper<I> {
Expand Down
1 change: 1 addition & 0 deletions interactive_engine/executor/store/exp_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub mod ldbc;
pub mod parser;
pub mod prelude;
pub mod schema;
pub mod sorted_map;
pub mod table;
pub mod utils;

Expand Down
Loading