diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs index a9cd16101fcf..631cd9e2a0ae 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs @@ -62,7 +62,7 @@ pub use pegasus_memory::alloc::check_current_task_memory; pub use pegasus_network::ServerDetect; pub use tag::Tag; pub use worker::Worker; -pub use worker_id::{get_current_worker, set_current_worker, WorkerId}; +pub use worker_id::{get_current_worker, get_current_worker_checked, set_current_worker, WorkerId}; use crate::api::Source; pub use crate::errors::{BuildJobError, JobSubmitError, SpawnJobError, StartupError}; diff --git a/interactive_engine/executor/ir/graph_proxy/src/adapters/exp_store/partitioner.rs b/interactive_engine/executor/ir/graph_proxy/src/adapters/exp_store/partitioner.rs index a0c88a3f7142..93b6ceba80f8 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/adapters/exp_store/partitioner.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/adapters/exp_store/partitioner.rs @@ -35,15 +35,11 @@ impl Partitioner for SimplePartition { } fn get_worker_partitions( - &self, job_workers: usize, worker_id: u32, + &self, _job_workers: usize, _worker_id: u32, ) -> GraphProxyResult>> { // In graph that one server contains a single graph partition, - // we assign the first worker on current server to process (scan) the partition, - // and we assume the partition id is identity to the server id - if worker_id as usize % job_workers == 0 { - Ok(Some(vec![worker_id as u64 / job_workers as u64])) - } else { - Ok(None) - } + // there's no need to assign the specific partition id for query (as all workers will scan part of current partition). + // In source scan, workers will scan the vertices in a parallel way + Ok(None) } } diff --git a/interactive_engine/executor/ir/graph_proxy/src/adapters/exp_store/read_graph.rs b/interactive_engine/executor/ir/graph_proxy/src/adapters/exp_store/read_graph.rs index 5d8e343fa95a..a10d5eb088a2 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/adapters/exp_store/read_graph.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/adapters/exp_store/read_graph.rs @@ -231,20 +231,35 @@ impl ReadGraph for ExpStore { ) -> GraphProxyResult + Send>> { // DemoGraph contains a single graph partition on each server, // therefore, there's no need to use the specific partition id for query. - // Besides, we guarantee only one worker (on each server) is going to scan (with params.partitions.is_some()) - if params.partitions.is_some() { - let label_ids = encode_storage_label(¶ms.labels); - let props = params.columns.clone(); - let result = self - .store - .get_all_vertices(label_ids.as_ref()) - .map(move |v| to_runtime_vertex(v, props.clone())); - - // it is defined as filter > sample > limit; Same as follows. - Ok(filter_sample_limit!(result, params.filter, params.sample_ratio, params.limit)) + // Besides, workers will scan the vertices in a parallel way + let label_ids = encode_storage_label(¶ms.labels); + let props = params.columns.clone(); + + // get_current_worker_checked() in case pegasus not started, i.e., for ci tests. + let worker_id = pegasus::get_current_worker_checked() + .map(|worker| worker.index) + .unwrap_or(0); + let workers_num = pegasus::get_current_worker_checked() + .map(|worker| worker.local_peers) + .unwrap_or(1); + let count = self + .store + .count_all_vertices(label_ids.as_ref()); + let partial_count = count / workers_num as usize; + let take_count = if (worker_id + 1) % workers_num == 0 { + count - partial_count * (workers_num as usize - 1) } else { - Ok(Box::new(std::iter::empty())) - } + partial_count + }; + + let result = self + .store + .get_all_vertices(label_ids.as_ref()) + .skip((worker_id % workers_num) as usize * partial_count) + .take(take_count) + .map(move |v| to_runtime_vertex(v, props.clone())); + + Ok(filter_sample_limit!(result, params.filter, params.sample_ratio, params.limit)) } fn index_scan_vertex( @@ -256,18 +271,35 @@ impl ReadGraph for ExpStore { } fn scan_edge(&self, params: &QueryParams) -> GraphProxyResult + Send>> { - if params.partitions.is_some() { - let label_ids = encode_storage_label(¶ms.labels); - let props = params.columns.clone(); - let result = self - .store - .get_all_edges(label_ids.as_ref()) - .map(move |e| to_runtime_edge(e, props.clone())); - - Ok(filter_sample_limit!(result, params.filter, params.sample_ratio, params.limit)) + // DemoGraph contains a single graph partition on each server, + // therefore, there's no need to use the specific partition id for query. + // Besides, workers will scan the edges in a parallel way + let label_ids = encode_storage_label(¶ms.labels); + let props = params.columns.clone(); + + // get_current_worker_checked() in case pegasus not started, i.e., for ci tests. + let worker_id = pegasus::get_current_worker_checked() + .map(|worker| worker.index) + .unwrap_or(0); + let workers_num = pegasus::get_current_worker_checked() + .map(|worker| worker.local_peers) + .unwrap_or(1); + let count = self.store.count_all_edges(label_ids.as_ref()); + let partial_count = count / workers_num as usize; + let take_count = if (worker_id + 1) % workers_num == 0 { + count - partial_count * (workers_num as usize - 1) } else { - Ok(Box::new(std::iter::empty())) - } + partial_count + }; + + let result = self + .store + .get_all_edges(label_ids.as_ref()) + .skip((worker_id % workers_num) as usize * partial_count) + .take(take_count) + .map(move |v| to_runtime_edge(v, props.clone())); + + Ok(filter_sample_limit!(result, params.filter, params.sample_ratio, params.limit)) } fn get_vertex( diff --git a/interactive_engine/executor/ir/integrated/tests/scan_test.rs b/interactive_engine/executor/ir/integrated/tests/scan_test.rs index da1b8110ed90..bd90d4a651d6 100644 --- a/interactive_engine/executor/ir/integrated/tests/scan_test.rs +++ b/interactive_engine/executor/ir/integrated/tests/scan_test.rs @@ -179,6 +179,53 @@ mod test { assert!(result_count < 6); } + // g.E() + #[test] + fn scan_edge_test() { + let source_iter = + scan_gen(pb::Scan { scan_opt: 1, alias: None, params: None, idx_predicate: None }); + let mut result_ids = vec![]; + let v1: DefaultId = LDBCVertexParser::to_global_id(1, 0); + let v2: DefaultId = LDBCVertexParser::to_global_id(2, 0); + let v3: DefaultId = LDBCVertexParser::to_global_id(3, 1); + let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0); + let v5: DefaultId = LDBCVertexParser::to_global_id(5, 1); + let v6: DefaultId = LDBCVertexParser::to_global_id(6, 0); + let mut expected_ids = vec![(v1, v2), (v1, v3), (v1, v4), (v4, v3), (v4, v5), (v6, v3)]; + for record in source_iter { + if let Some(element) = record.get(None).unwrap().as_graph_edge() { + result_ids.push((element.src_id as usize, element.dst_id as usize)) + } + } + result_ids.sort(); + expected_ids.sort(); + assert_eq!(result_ids, expected_ids) + } + + // g.E().hasLabel('knows') + #[test] + fn scan_edge_with_label_test() { + let source_iter = scan_gen(pb::Scan { + scan_opt: 1, + alias: None, + params: Some(query_params(vec![KNOWS_LABEL.into()], vec![], None)), + idx_predicate: None, + }); + let mut result_ids = vec![]; + let v1: DefaultId = LDBCVertexParser::to_global_id(1, 0); + let v2: DefaultId = LDBCVertexParser::to_global_id(2, 0); + let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0); + let mut expected_ids = vec![(v1, v2), (v1, v4)]; + for record in source_iter { + if let Some(element) = record.get(None).unwrap().as_graph_edge() { + result_ids.push((element.src_id as usize, element.dst_id as usize)) + } + } + result_ids.sort(); + expected_ids.sort(); + assert_eq!(result_ids, expected_ids) + } + // g.E().coin(0.1) #[test] fn scan_edge_sample_test() {