Skip to content

Commit

Permalink
[GIE] Support parallel scan on ExpStore (#2253)
Browse files Browse the repository at this point in the history
* [GIE/Runtime] enable parallel scan on ExpStore

* minor refine

* [GIE/Runtime] refine get_worker_partitions() for ExpStore

* fix ci tests

* [GIE/Runtime] parallel edge_scan for ExpStore
  • Loading branch information
BingqingLyu committed Nov 30, 2022
1 parent 432e65c commit 5ea5b87
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub use pegasus_memory::alloc::check_current_task_memory;
pub use pegasus_network::ServerDetect;
pub use tag::Tag;
pub use worker::Worker;
pub use worker_id::{get_current_worker, set_current_worker, WorkerId};
pub use worker_id::{get_current_worker, get_current_worker_checked, set_current_worker, WorkerId};

use crate::api::Source;
pub use crate::errors::{BuildJobError, JobSubmitError, SpawnJobError, StartupError};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,11 @@ impl Partitioner for SimplePartition {
}

fn get_worker_partitions(
&self, job_workers: usize, worker_id: u32,
&self, _job_workers: usize, _worker_id: u32,
) -> GraphProxyResult<Option<Vec<u64>>> {
// In graph that one server contains a single graph partition,
// we assign the first worker on current server to process (scan) the partition,
// and we assume the partition id is identity to the server id
if worker_id as usize % job_workers == 0 {
Ok(Some(vec![worker_id as u64 / job_workers as u64]))
} else {
Ok(None)
}
// there's no need to assign the specific partition id for query (as all workers will scan part of current partition).
// In source scan, workers will scan the vertices in a parallel way
Ok(None)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,20 +231,35 @@ impl ReadGraph for ExpStore {
) -> GraphProxyResult<Box<dyn Iterator<Item = Vertex> + Send>> {
// DemoGraph contains a single graph partition on each server,
// therefore, there's no need to use the specific partition id for query.
// Besides, we guarantee only one worker (on each server) is going to scan (with params.partitions.is_some())
if params.partitions.is_some() {
let label_ids = encode_storage_label(&params.labels);
let props = params.columns.clone();
let result = self
.store
.get_all_vertices(label_ids.as_ref())
.map(move |v| to_runtime_vertex(v, props.clone()));

// it is defined as filter > sample > limit; Same as follows.
Ok(filter_sample_limit!(result, params.filter, params.sample_ratio, params.limit))
// Besides, workers will scan the vertices in a parallel way
let label_ids = encode_storage_label(&params.labels);
let props = params.columns.clone();

// get_current_worker_checked() in case pegasus not started, i.e., for ci tests.
let worker_id = pegasus::get_current_worker_checked()
.map(|worker| worker.index)
.unwrap_or(0);
let workers_num = pegasus::get_current_worker_checked()
.map(|worker| worker.local_peers)
.unwrap_or(1);
let count = self
.store
.count_all_vertices(label_ids.as_ref());
let partial_count = count / workers_num as usize;
let take_count = if (worker_id + 1) % workers_num == 0 {
count - partial_count * (workers_num as usize - 1)
} else {
Ok(Box::new(std::iter::empty()))
}
partial_count
};

let result = self
.store
.get_all_vertices(label_ids.as_ref())
.skip((worker_id % workers_num) as usize * partial_count)
.take(take_count)
.map(move |v| to_runtime_vertex(v, props.clone()));

Ok(filter_sample_limit!(result, params.filter, params.sample_ratio, params.limit))
}

fn index_scan_vertex(
Expand All @@ -256,18 +271,35 @@ impl ReadGraph for ExpStore {
}

fn scan_edge(&self, params: &QueryParams) -> GraphProxyResult<Box<dyn Iterator<Item = Edge> + Send>> {
if params.partitions.is_some() {
let label_ids = encode_storage_label(&params.labels);
let props = params.columns.clone();
let result = self
.store
.get_all_edges(label_ids.as_ref())
.map(move |e| to_runtime_edge(e, props.clone()));

Ok(filter_sample_limit!(result, params.filter, params.sample_ratio, params.limit))
// DemoGraph contains a single graph partition on each server,
// therefore, there's no need to use the specific partition id for query.
// Besides, workers will scan the edges in a parallel way
let label_ids = encode_storage_label(&params.labels);
let props = params.columns.clone();

// get_current_worker_checked() in case pegasus not started, i.e., for ci tests.
let worker_id = pegasus::get_current_worker_checked()
.map(|worker| worker.index)
.unwrap_or(0);
let workers_num = pegasus::get_current_worker_checked()
.map(|worker| worker.local_peers)
.unwrap_or(1);
let count = self.store.count_all_edges(label_ids.as_ref());
let partial_count = count / workers_num as usize;
let take_count = if (worker_id + 1) % workers_num == 0 {
count - partial_count * (workers_num as usize - 1)
} else {
Ok(Box::new(std::iter::empty()))
}
partial_count
};

let result = self
.store
.get_all_edges(label_ids.as_ref())
.skip((worker_id % workers_num) as usize * partial_count)
.take(take_count)
.map(move |v| to_runtime_edge(v, props.clone()));

Ok(filter_sample_limit!(result, params.filter, params.sample_ratio, params.limit))
}

fn get_vertex(
Expand Down
47 changes: 47 additions & 0 deletions interactive_engine/executor/ir/integrated/tests/scan_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,53 @@ mod test {
assert!(result_count < 6);
}

// g.E()
#[test]
fn scan_edge_test() {
let source_iter =
scan_gen(pb::Scan { scan_opt: 1, alias: None, params: None, idx_predicate: None });
let mut result_ids = vec![];
let v1: DefaultId = LDBCVertexParser::to_global_id(1, 0);
let v2: DefaultId = LDBCVertexParser::to_global_id(2, 0);
let v3: DefaultId = LDBCVertexParser::to_global_id(3, 1);
let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0);
let v5: DefaultId = LDBCVertexParser::to_global_id(5, 1);
let v6: DefaultId = LDBCVertexParser::to_global_id(6, 0);
let mut expected_ids = vec![(v1, v2), (v1, v3), (v1, v4), (v4, v3), (v4, v5), (v6, v3)];
for record in source_iter {
if let Some(element) = record.get(None).unwrap().as_graph_edge() {
result_ids.push((element.src_id as usize, element.dst_id as usize))
}
}
result_ids.sort();
expected_ids.sort();
assert_eq!(result_ids, expected_ids)
}

// g.E().hasLabel('knows')
#[test]
fn scan_edge_with_label_test() {
let source_iter = scan_gen(pb::Scan {
scan_opt: 1,
alias: None,
params: Some(query_params(vec![KNOWS_LABEL.into()], vec![], None)),
idx_predicate: None,
});
let mut result_ids = vec![];
let v1: DefaultId = LDBCVertexParser::to_global_id(1, 0);
let v2: DefaultId = LDBCVertexParser::to_global_id(2, 0);
let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0);
let mut expected_ids = vec![(v1, v2), (v1, v4)];
for record in source_iter {
if let Some(element) = record.get(None).unwrap().as_graph_edge() {
result_ids.push((element.src_id as usize, element.dst_id as usize))
}
}
result_ids.sort();
expected_ids.sort();
assert_eq!(result_ids, expected_ids)
}

// g.E().coin(0.1)
#[test]
fn scan_edge_sample_test() {
Expand Down

0 comments on commit 5ea5b87

Please sign in to comment.