diff --git a/interactive_engine/compiler/src/test/resources/case_when.json b/interactive_engine/compiler/src/test/resources/case_when.json index cc9d70a4557d..045ed2e68ff1 100644 --- a/interactive_engine/compiler/src/test/resources/case_when.json +++ b/interactive_engine/compiler/src/test/resources/case_when.json @@ -26,6 +26,7 @@ "extra": {} }, "idx_predicate": null, + "is_count_only": false, "meta_data": { "type": { "type": { diff --git a/interactive_engine/compiler/src/test/resources/ffi_logical_plan_1.json b/interactive_engine/compiler/src/test/resources/ffi_logical_plan_1.json index 0c8bab63ce63..3153d1fa5f04 100644 --- a/interactive_engine/compiler/src/test/resources/ffi_logical_plan_1.json +++ b/interactive_engine/compiler/src/test/resources/ffi_logical_plan_1.json @@ -26,6 +26,7 @@ "extra": {} }, "idx_predicate": null, + "is_count_only": false, "meta_data": { "type": { "type": { diff --git a/interactive_engine/compiler/src/test/resources/ffi_logical_plan_2.json b/interactive_engine/compiler/src/test/resources/ffi_logical_plan_2.json index 371981b607c2..22ebbe6d70cc 100644 --- a/interactive_engine/compiler/src/test/resources/ffi_logical_plan_2.json +++ b/interactive_engine/compiler/src/test/resources/ffi_logical_plan_2.json @@ -83,6 +83,7 @@ "extra": {} }, "idx_predicate": null, + "is_count_only": false, "meta_data": { "type": { "type": { diff --git a/interactive_engine/compiler/src/test/resources/ffi_logical_plan_3.json b/interactive_engine/compiler/src/test/resources/ffi_logical_plan_3.json index b461958aef4f..d61487a842a5 100644 --- a/interactive_engine/compiler/src/test/resources/ffi_logical_plan_3.json +++ b/interactive_engine/compiler/src/test/resources/ffi_logical_plan_3.json @@ -26,6 +26,7 @@ "extra": {} }, "idx_predicate": null, + "is_count_only": false, "meta_data": { "type": { "type": { diff --git a/interactive_engine/executor/ir/clients/rust/client/src/physical_builder.rs b/interactive_engine/executor/ir/clients/rust/client/src/physical_builder.rs index 345808c2ed65..eef9b2e7dab7 100644 --- a/interactive_engine/executor/ir/clients/rust/client/src/physical_builder.rs +++ b/interactive_engine/executor/ir/clients/rust/client/src/physical_builder.rs @@ -564,6 +564,7 @@ mod test { alias: None, params: None, idx_predicate: None, + is_count_only: false, meta_data: None, }; let sink_pb = algebra_pb::Sink { tags: vec![], sink_target: None }; @@ -587,6 +588,7 @@ mod test { alias: None, params: None, idx_predicate: None, + is_count_only: false, meta_data: None, }; let scan2_pb = scan1_pb.clone(); diff --git a/interactive_engine/executor/ir/common/src/utils.rs b/interactive_engine/executor/ir/common/src/utils.rs index c01a59a43dc2..b6e807331a7f 100644 --- a/interactive_engine/executor/ir/common/src/utils.rs +++ b/interactive_engine/executor/ir/common/src/utils.rs @@ -792,6 +792,7 @@ impl From for physical_pb::Scan { alias: scan.alias.map(|tag| tag.try_into().unwrap()), params: scan.params, idx_predicate: scan.idx_predicate, + is_count_only: scan.is_count_only, } } } diff --git a/interactive_engine/executor/ir/core/src/glogue/pattern.rs b/interactive_engine/executor/ir/core/src/glogue/pattern.rs index 6aac9019c93b..d8935af4c11c 100644 --- a/interactive_engine/executor/ir/core/src/glogue/pattern.rs +++ b/interactive_engine/executor/ir/core/src/glogue/pattern.rs @@ -678,6 +678,7 @@ fn generate_source_operator( alias: Some((source_vertex_id as KeyId).into()), params: Some(source_vertex_param), idx_predicate: None, + is_count_only: false, meta_data: None, }; Ok(source_scan.into()) diff --git a/interactive_engine/executor/ir/core/src/plan/ffi.rs b/interactive_engine/executor/ir/core/src/plan/ffi.rs index 76874596bf64..1b1de7938d62 100644 --- a/interactive_engine/executor/ir/core/src/plan/ffi.rs +++ b/interactive_engine/executor/ir/core/src/plan/ffi.rs @@ -1740,6 +1740,7 @@ mod scan { extra: HashMap::new(), }), idx_predicate: None, + is_count_only: false, meta_data: None, }); Box::into_raw(scan) as *const c_void diff --git a/interactive_engine/executor/ir/core/src/plan/logical.rs b/interactive_engine/executor/ir/core/src/plan/logical.rs index 433eaabdade6..9a72fc98f5a9 100644 --- a/interactive_engine/executor/ir/core/src/plan/logical.rs +++ b/interactive_engine/executor/ir/core/src/plan/logical.rs @@ -2072,6 +2072,7 @@ mod test { extra: HashMap::new(), }), idx_predicate: Some(vec!["software".to_string()].into()), + is_count_only: false, meta_data: None, }; scan.preprocess(&meta, &mut plan_meta).unwrap(); @@ -2151,6 +2152,7 @@ mod test { extra: HashMap::new(), }), idx_predicate: None, + is_count_only: false, meta_data: None, }; @@ -2197,6 +2199,7 @@ mod test { extra: HashMap::new(), }), idx_predicate: None, + is_count_only: false, meta_data: None, }; @@ -2229,6 +2232,7 @@ mod test { alias: None, params: Some(query_params(vec![], vec![])), idx_predicate: None, + is_count_only: false, meta_data: None, }; plan.append_operator_as_node(scan.into(), vec![0]) @@ -2284,6 +2288,7 @@ mod test { alias: None, params: Some(query_params(vec![], vec![])), idx_predicate: None, + is_count_only: false, meta_data: None, }; plan.append_operator_as_node(scan.into(), vec![0]) @@ -2361,6 +2366,7 @@ mod test { alias: None, params: Some(query_params(vec![], vec![])), idx_predicate: None, + is_count_only: false, meta_data: None, }; plan.append_operator_as_node(scan.into(), vec![0]) @@ -2482,6 +2488,7 @@ mod test { alias: None, params: Some(query_params(vec!["person".into()], vec![])), idx_predicate: None, + is_count_only: false, meta_data: None, }; let mut opr_id = plan @@ -2620,6 +2627,7 @@ mod test { extra: Default::default(), }), idx_predicate: None, + is_count_only: false, meta_data: None, }; @@ -2648,6 +2656,7 @@ mod test { extra: Default::default(), }), idx_predicate: None, + is_count_only: false, meta_data: None, }; @@ -2685,6 +2694,7 @@ mod test { alias: None, params: Some(query_params(vec![], vec![])), idx_predicate: None, + is_count_only: false, meta_data: None, }; plan.append_operator_as_node(scan.into(), vec![0]) @@ -2762,6 +2772,7 @@ mod test { alias: None, params: Some(query_params(vec![], vec![])), idx_predicate: None, + is_count_only: false, meta_data: None, }; plan.append_operator_as_node(scan.into(), vec![0]) @@ -2825,6 +2836,7 @@ mod test { alias: None, params: Some(query_params(vec![], vec![])), idx_predicate: None, + is_count_only: false, meta_data: None, }; plan.append_operator_as_node(scan.into(), vec![0]) @@ -2883,6 +2895,7 @@ mod test { alias: None, params: Some(query_params(vec![], vec![])), idx_predicate: None, + is_count_only: false, meta_data: None, }; plan.append_operator_as_node(scan.into(), vec![0]) @@ -2930,6 +2943,7 @@ mod test { alias: None, params: Some(query_params(vec![], vec![])), idx_predicate: None, + is_count_only: false, meta_data: None, }; plan.append_operator_as_node(scan.into(), vec![0]) @@ -2989,6 +3003,7 @@ mod test { alias: None, params: Some(query_params(vec![], vec![])), idx_predicate: None, + is_count_only: false, meta_data: None, }; plan.append_operator_as_node(scan.into(), vec![0]) @@ -3080,6 +3095,7 @@ mod test { alias: None, params: Some(query_params(vec![], vec![])), idx_predicate: None, + is_count_only: false, meta_data: None, }; plan.append_operator_as_node(scan.into(), vec![0]) @@ -3126,6 +3142,7 @@ mod test { alias: None, params: Some(query_params(vec![], vec![])), idx_predicate: None, + is_count_only: false, meta_data: None, }; plan.append_operator_as_node(scan.into(), vec![0]) @@ -3222,6 +3239,7 @@ mod test { alias: Some("v".into()), params: Some(query_params(vec![], vec![])), idx_predicate: None, + is_count_only: false, meta_data: None, }; @@ -3301,6 +3319,7 @@ mod test { alias: None, params: Some(query_params(vec![], vec![])), idx_predicate: None, + is_count_only: false, meta_data: None, }; diff --git a/interactive_engine/executor/ir/core/src/plan/physical.rs b/interactive_engine/executor/ir/core/src/plan/physical.rs index f8fdef18ed03..0ee8e42f844d 100644 --- a/interactive_engine/executor/ir/core/src/plan/physical.rs +++ b/interactive_engine/executor/ir/core/src/plan/physical.rs @@ -1129,6 +1129,7 @@ mod test { alias: None, params: Some(query_params(vec![], columns)), idx_predicate: None, + is_count_only: false, meta_data: None, } } @@ -1636,6 +1637,7 @@ mod test { alias: None, params: Some(query_params(vec!["person".into()], vec![])), idx_predicate: None, + is_count_only: false, meta_data: None, }; let select_opr = pb::Select { predicate: str_to_expr_pb("@.id == 10".to_string()).ok() }; @@ -1692,6 +1694,7 @@ mod test { alias: None, params: Some(query_params(vec!["person".into()], vec![])), idx_predicate: None, + is_count_only: false, meta_data: None, }; @@ -1730,6 +1733,7 @@ mod test { alias: None, params: Some(query_params(vec!["person".into()], vec![])), idx_predicate: None, + is_count_only: false, meta_data: None, }; @@ -1793,6 +1797,7 @@ mod test { alias: None, params: Some(query_params(vec!["person".into()], vec![])), idx_predicate: None, + is_count_only: false, meta_data: None, }; @@ -1882,6 +1887,7 @@ mod test { alias: None, params: Some(query_params(vec!["person".into()], vec![])), idx_predicate: None, + is_count_only: false, meta_data: None, }; @@ -1994,6 +2000,7 @@ mod test { alias: None, params: Some(query_params(vec![], vec![])), idx_predicate: None, + is_count_only: false, meta_data: None, }; @@ -2026,6 +2033,7 @@ mod test { alias: Some(0.into()), params: Some(query_params(vec![], vec![])), idx_predicate: None, + is_count_only: false, meta_data: None, }; @@ -2115,6 +2123,7 @@ mod test { alias: Some(0.into()), params: Some(query_params(vec![], vec![])), idx_predicate: None, + is_count_only: false, meta_data: None, }; @@ -2180,6 +2189,7 @@ mod test { alias: Some(0.into()), params: Some(query_params(vec![], vec![])), idx_predicate: None, + is_count_only: false, meta_data: None, }; @@ -2250,6 +2260,7 @@ mod test { alias: None, params: Some(query_params(vec![], vec![])), idx_predicate: None, + is_count_only: false, meta_data: None, }; let expand_opr = pb::EdgeExpand { @@ -2309,6 +2320,7 @@ mod test { alias: Some(0.into()), params: None, idx_predicate: None, + is_count_only: false, meta_data: None, }; @@ -2426,6 +2438,7 @@ mod test { alias: Some(0.into()), params: None, idx_predicate: None, + is_count_only: false, meta_data: None, }; @@ -3058,6 +3071,7 @@ mod test { alias: None, params: Some(query_params(vec!["person".into()], vec![])), idx_predicate: None, + is_count_only: false, meta_data: None, }; diff --git a/interactive_engine/executor/ir/core/tests/test_multi_source.rs b/interactive_engine/executor/ir/core/tests/test_multi_source.rs index fee1e8f9c5d2..877609378fae 100644 --- a/interactive_engine/executor/ir/core/tests/test_multi_source.rs +++ b/interactive_engine/executor/ir/core/tests/test_multi_source.rs @@ -39,8 +39,14 @@ mod tests { // join(scan.match1, scan.match2) fn single_source_multi_match_join_logical_plan() -> LogicalPlan { - let scan_opr = - pb::Scan { scan_opt: 0, alias: None, params: None, idx_predicate: None, meta_data: None }; + let scan_opr = pb::Scan { + scan_opt: 0, + alias: None, + params: None, + idx_predicate: None, + is_count_only: false, + meta_data: None, + }; let expand_opr1 = pb::EdgeExpand { v_tag: None, @@ -112,8 +118,14 @@ mod tests { // join(scan1.match1, scan2.match2) fn multi_source_multi_match_join_logical_plan() -> LogicalPlan { - let scan_opr = - pb::Scan { scan_opt: 0, alias: None, params: None, idx_predicate: None, meta_data: None }; + let scan_opr = pb::Scan { + scan_opt: 0, + alias: None, + params: None, + idx_predicate: None, + is_count_only: false, + meta_data: None, + }; let expand_opr1 = pb::EdgeExpand { v_tag: None, @@ -189,8 +201,14 @@ mod tests { // join(join(scan1.match1, scan2.match2), scan3.match3) fn multi_source_multi_match_multi_join_logical_plan() -> LogicalPlan { - let scan_opr = - pb::Scan { scan_opt: 0, alias: None, params: None, idx_predicate: None, meta_data: None }; + let scan_opr = pb::Scan { + scan_opt: 0, + alias: None, + params: None, + idx_predicate: None, + is_count_only: false, + meta_data: None, + }; let expand_opr = pb::EdgeExpand { v_tag: None, @@ -344,8 +362,14 @@ mod tests { // join(join(scan1.match1, scan2.match2), scan3.match3) fn multi_join_logical_plan() -> LogicalPlan { - let scan_opr = - pb::Scan { scan_opt: 0, alias: None, params: None, idx_predicate: None, meta_data: None }; + let scan_opr = pb::Scan { + scan_opt: 0, + alias: None, + params: None, + idx_predicate: None, + is_count_only: false, + meta_data: None, + }; let join_opr = pb::Join { kind: 0, left_keys: vec![common_pb::Variable { tag: None, property: None, node_type: None }], @@ -403,6 +427,7 @@ mod tests { alias: Some(0.into()), params: None, idx_predicate: None, + is_count_only: false, meta_data: None, }; let scan2_opr = pb::Scan { @@ -410,6 +435,7 @@ mod tests { alias: Some(1.into()), params: None, idx_predicate: None, + is_count_only: false, meta_data: None, }; let scan3_opr = pb::Scan { @@ -417,6 +443,7 @@ mod tests { alias: Some(2.into()), params: None, idx_predicate: None, + is_count_only: false, meta_data: None, }; let dummy_opr = pb::Root {}; diff --git a/interactive_engine/executor/ir/graph_proxy/src/adapters/csr_store/read_graph.rs b/interactive_engine/executor/ir/graph_proxy/src/adapters/csr_store/read_graph.rs index f8cbc8874d00..c638bf93d964 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/adapters/csr_store/read_graph.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/adapters/csr_store/read_graph.rs @@ -172,6 +172,32 @@ impl ReadGraph for CSRStore { let pk_val = Object::from(outer_id); Ok(Some((CSR_STORE_PK.into(), pk_val).into())) } + + fn count_vertex(&self, params: &QueryParams) -> GraphProxyResult { + let worker_index = self.cluster_info.get_worker_index()?; + let workers_num = self.cluster_info.get_local_worker_num()?; + if worker_index % workers_num == 0 { + let label_ids = encode_storage_label(¶ms.labels); + let count = self + .store + .count_all_vertices(label_ids.as_ref()); + Ok(count as u64) + } else { + Ok(0) + } + } + + fn count_edge(&self, params: &QueryParams) -> GraphProxyResult { + let worker_index = self.cluster_info.get_worker_index()?; + let workers_num = self.cluster_info.get_local_worker_num()?; + if worker_index % workers_num == 0 { + let label_ids = encode_storage_label(¶ms.labels); + let count = self.store.count_all_edges(label_ids.as_ref()); + Ok(count as u64) + } else { + Ok(0) + } + } } #[inline] diff --git a/interactive_engine/executor/ir/graph_proxy/src/adapters/exp_store/read_graph.rs b/interactive_engine/executor/ir/graph_proxy/src/adapters/exp_store/read_graph.rs index 2408dfc0f58d..ec6a4d70f2f4 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/adapters/exp_store/read_graph.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/adapters/exp_store/read_graph.rs @@ -365,6 +365,30 @@ impl ReadGraph for ExpStore { let pk_val = Object::from(outer_id); Ok(Some((EXP_STORE_PK.into(), pk_val).into())) } + + fn count_vertex(&self, params: &QueryParams) -> GraphProxyResult { + let worker_idx = self.cluster_info.get_worker_index()?; + let workers_num = self.cluster_info.get_local_worker_num()?; + if worker_idx % workers_num == 0 { + let label_ids = encode_storage_label(¶ms.labels); + Ok(self + .store + .count_all_vertices(label_ids.as_ref()) as u64) + } else { + Ok(0) + } + } + + fn count_edge(&self, params: &QueryParams) -> GraphProxyResult { + let worker_idx = self.cluster_info.get_worker_index()?; + let workers_num = self.cluster_info.get_local_worker_num()?; + if worker_idx % workers_num == 0 { + let label_ids = encode_storage_label(¶ms.labels); + Ok(self.store.count_all_edges(label_ids.as_ref()) as u64) + } else { + Ok(0) + } + } } #[inline] diff --git a/interactive_engine/executor/ir/graph_proxy/src/adapters/gs_store/read_graph.rs b/interactive_engine/executor/ir/graph_proxy/src/adapters/gs_store/read_graph.rs index 0cda00d00398..ed805c7c2d12 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/adapters/gs_store/read_graph.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/adapters/gs_store/read_graph.rs @@ -476,6 +476,44 @@ where trace!("get_primary_key: id: {}, outer_id {:?}, pk_val: {:?}", id, outer_id, pk_val); Ok(Some((GS_STORE_PK.into(), pk_val).into())) } + + fn count_vertex(&self, params: &QueryParams) -> GraphProxyResult { + let worker_partitions = assign_worker_partitions(&self.server_partitions, &self.cluster_info)?; + if !worker_partitions.is_empty() { + let store = self.store.clone(); + let si = params + .get_extra_param(SNAPSHOT_ID) + .map(|s| { + s.parse::() + .unwrap_or(DEFAULT_SNAPSHOT_ID) + }) + .unwrap_or(DEFAULT_SNAPSHOT_ID); + let label_ids = encode_storage_labels(params.labels.as_ref())?; + let count = store.count_all_vertices(si, label_ids.as_ref(), None, worker_partitions.as_ref()); + Ok(count) + } else { + Ok(0) + } + } + + fn count_edge(&self, params: &QueryParams) -> GraphProxyResult { + let worker_partitions = assign_worker_partitions(&self.server_partitions, &self.cluster_info)?; + if !worker_partitions.is_empty() { + let store = self.store.clone(); + let si = params + .get_extra_param(SNAPSHOT_ID) + .map(|s| { + s.parse::() + .unwrap_or(DEFAULT_SNAPSHOT_ID) + }) + .unwrap_or(DEFAULT_SNAPSHOT_ID); + let label_ids = encode_storage_labels(params.labels.as_ref())?; + let count = store.count_all_edges(si, label_ids.as_ref(), None, worker_partitions.as_ref()); + Ok(count) + } else { + Ok(0) + } + } } #[inline] diff --git a/interactive_engine/executor/ir/graph_proxy/src/apis/read_graph.rs b/interactive_engine/executor/ir/graph_proxy/src/apis/read_graph.rs index 16a64135e7fd..73e7e490c7f8 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/apis/read_graph.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/apis/read_graph.rs @@ -81,6 +81,12 @@ pub trait ReadGraph: Send + Sync { &self, direction: Direction, params: &QueryParams, ) -> GraphProxyResult>>; + /// Count vertices with query parameters, and return the number of vertices. + fn count_vertex(&self, params: &QueryParams) -> GraphProxyResult; + + /// Count edges with query parameters, and return the number of edges. + fn count_edge(&self, params: &QueryParams) -> GraphProxyResult; + /// Get primary key value(s) with the given global_id, /// and return the primary key value(s) if exists fn get_primary_key(&self, id: &ID) -> GraphProxyResult>; diff --git a/interactive_engine/executor/ir/integrated/tests/apply_test.rs b/interactive_engine/executor/ir/integrated/tests/apply_test.rs index e3b0be36c267..679e6a7d4bff 100644 --- a/interactive_engine/executor/ir/integrated/tests/apply_test.rs +++ b/interactive_engine/executor/ir/integrated/tests/apply_test.rs @@ -38,6 +38,7 @@ mod test { alias: None, params: Some(query_params(vec![PERSON_LABEL.into()], vec![], None)), idx_predicate: None, + is_count_only: false, meta_data: None, }; @@ -148,6 +149,7 @@ mod test { alias: None, params: Some(query_params(vec![PERSON_LABEL.into()], vec![], None)), idx_predicate: None, + is_count_only: false, meta_data: None, }; diff --git a/interactive_engine/executor/ir/integrated/tests/auxilia_test.rs b/interactive_engine/executor/ir/integrated/tests/auxilia_test.rs index 6b9261101259..78a02d071ae5 100644 --- a/interactive_engine/executor/ir/integrated/tests/auxilia_test.rs +++ b/interactive_engine/executor/ir/integrated/tests/auxilia_test.rs @@ -43,7 +43,8 @@ mod test { fn source_gen(alias: Option) -> Box + Send> { let graph = create_exp_store(Arc::new(TestCluster {})); register_graph(graph); - let scan_opr_pb = pb::Scan { scan_opt: 0, alias, params: None, idx_predicate: None }; + let scan_opr_pb = + pb::Scan { scan_opt: 0, alias, params: None, idx_predicate: None, is_count_only: false }; let source = SourceOperator::new(scan_opr_pb.into(), Arc::new(TestRouter::default())).unwrap(); source.gen_source(0).unwrap() } diff --git a/interactive_engine/executor/ir/integrated/tests/expand_test.rs b/interactive_engine/executor/ir/integrated/tests/expand_test.rs index 7e24618cbb47..7c71e9b03f1f 100644 --- a/interactive_engine/executor/ir/integrated/tests/expand_test.rs +++ b/interactive_engine/executor/ir/integrated/tests/expand_test.rs @@ -43,7 +43,13 @@ mod test { // g.V() fn source_gen(alias: Option) -> Box + Send> { - source_gen_with_scan_opr(pb::Scan { scan_opt: 0, alias, params: None, idx_predicate: None }) + source_gen_with_scan_opr(pb::Scan { + scan_opt: 0, + alias, + params: None, + idx_predicate: None, + is_count_only: false, + }) } fn source_gen_with_scan_opr(scan_opr_pb: pb::Scan) -> Box + Send> { @@ -688,6 +694,7 @@ mod test { alias: Some(TAG_A.into()), params: None, idx_predicate: Some(vec![1].into()), + is_count_only: false, }); let mut stream = input.input_from(source_iter)?; let flatmap_func1 = expand1.gen_flat_map().unwrap(); @@ -767,6 +774,7 @@ mod test { alias: Some(TAG_A.into()), params: None, idx_predicate: Some(vec![1].into()), + is_count_only: false, }); let mut stream = input.input_from(source_iter)?; let flatmap_func1 = expand1.gen_flat_map().unwrap(); @@ -848,6 +856,7 @@ mod test { alias: Some(TAG_A.into()), params: None, idx_predicate: Some(vec![1].into()), + is_count_only: false, }); let mut stream = input.input_from(source_iter)?; let flatmap_func1 = expand1.gen_flat_map().unwrap(); @@ -994,6 +1003,7 @@ mod test { alias: Some(TAG_A.into()), params: None, idx_predicate: Some(vec![1].into()), + is_count_only: false, }); let mut stream = input.input_from(source_iter)?; let flatmap_func1 = expand1.gen_flat_map().unwrap(); diff --git a/interactive_engine/executor/ir/integrated/tests/graph_query_test.rs b/interactive_engine/executor/ir/integrated/tests/graph_query_test.rs index 2637d8640523..44291ad5eb22 100644 --- a/interactive_engine/executor/ir/integrated/tests/graph_query_test.rs +++ b/interactive_engine/executor/ir/integrated/tests/graph_query_test.rs @@ -37,6 +37,7 @@ mod test { alias: None, params: Some(query_params(vec![PERSON_LABEL.into()], vec!["id".into()], None)), idx_predicate: None, + is_count_only: false, meta_data: None, }; let select_opr = pb::Select { predicate: Some(str_to_expr_pb("@.id == 1".to_string()).unwrap()) }; @@ -100,6 +101,7 @@ mod test { alias: None, params: Some(query_params_all_columns(vec![PERSON_LABEL.into()], vec![], None)), idx_predicate: None, + is_count_only: false, meta_data: None, }; @@ -126,6 +128,7 @@ mod test { alias: Some(TAG_A.into()), params: Some(query_params_all_columns(vec![], vec![], None)), idx_predicate: None, + is_count_only: false, meta_data: None, }; diff --git a/interactive_engine/executor/ir/integrated/tests/join_test.rs b/interactive_engine/executor/ir/integrated/tests/join_test.rs index 38eb3d3288e0..5a96d3807114 100644 --- a/interactive_engine/executor/ir/integrated/tests/join_test.rs +++ b/interactive_engine/executor/ir/integrated/tests/join_test.rs @@ -40,6 +40,7 @@ mod test { alias: Some(TAG_A.into()), params: Some(query_params(vec![], vec![], None)), idx_predicate: None, + is_count_only: false, meta_data: None, }; @@ -49,6 +50,7 @@ mod test { alias: Some(TAG_A.into()), params: Some(query_params(vec![PERSON_LABEL.into()], vec![], None)), idx_predicate: None, + is_count_only: false, meta_data: None, }; @@ -321,6 +323,7 @@ mod test { alias: Some(TAG_A.into()), params: Some(query_params(vec![], vec![], None)), idx_predicate: None, + is_count_only: false, meta_data: None, }; @@ -330,6 +333,7 @@ mod test { alias: Some(TAG_B.into()), params: Some(query_params(vec![PERSON_LABEL.into()], vec![], None)), idx_predicate: None, + is_count_only: false, meta_data: None, }; @@ -394,6 +398,7 @@ mod test { alias: Some(TAG_A.into()), params: Some(query_params(vec![], vec![], None)), idx_predicate: None, + is_count_only: false, meta_data: None, }; @@ -403,6 +408,7 @@ mod test { alias: Some(TAG_B.into()), params: Some(query_params(vec![PERSON_LABEL.into()], vec![], None)), idx_predicate: None, + is_count_only: false, meta_data: None, }; diff --git a/interactive_engine/executor/ir/integrated/tests/match_test.rs b/interactive_engine/executor/ir/integrated/tests/match_test.rs index 7c893126f659..dbe19c4e124c 100644 --- a/interactive_engine/executor/ir/integrated/tests/match_test.rs +++ b/interactive_engine/executor/ir/integrated/tests/match_test.rs @@ -58,6 +58,7 @@ mod test { alias: None, params: Some(query_params(vec![PERSON_LABEL.into()], vec![], None)), idx_predicate: None, + is_count_only: false, meta_data: None, } } diff --git a/interactive_engine/executor/ir/integrated/tests/nested_branch_test.rs b/interactive_engine/executor/ir/integrated/tests/nested_branch_test.rs index 221c537eea98..a6f02ef44a92 100644 --- a/interactive_engine/executor/ir/integrated/tests/nested_branch_test.rs +++ b/interactive_engine/executor/ir/integrated/tests/nested_branch_test.rs @@ -51,6 +51,7 @@ mod test { alias, params: Some(query_params(vec![PERSON_LABEL.into()], vec![], None)), idx_predicate: None, + is_count_only: false, meta_data: None, } } @@ -61,6 +62,7 @@ mod test { alias, params: Some(query_params(vec![SOFTWARE_LABEL.into()], vec![], None)), idx_predicate: None, + is_count_only: false, meta_data: None, } } diff --git a/interactive_engine/executor/ir/integrated/tests/pathxd_test.rs b/interactive_engine/executor/ir/integrated/tests/pathxd_test.rs index 532485542738..9cf650772a45 100644 --- a/interactive_engine/executor/ir/integrated/tests/pathxd_test.rs +++ b/interactive_engine/executor/ir/integrated/tests/pathxd_test.rs @@ -37,6 +37,7 @@ mod test { alias: None, params: Some(query_params(vec![PERSON_LABEL.into()], vec![], None)), idx_predicate: None, + is_count_only: false, meta_data: None, }; @@ -86,6 +87,7 @@ mod test { alias: None, params: Some(query_params(vec![PERSON_LABEL.into()], vec![], None)), idx_predicate: None, + is_count_only: false, meta_data: None, }; @@ -125,6 +127,7 @@ mod test { alias: None, params: Some(query_params(vec![PERSON_LABEL.into()], vec![], None)), idx_predicate: None, + is_count_only: false, meta_data: None, }; @@ -882,6 +885,7 @@ mod test { alias: None, params: Some(query_params(vec![PERSON_LABEL.into()], vec![], None)), idx_predicate: None, + is_count_only: false, meta_data: None, }; @@ -963,6 +967,7 @@ mod test { alias: None, params: Some(query_params(vec![PERSON_LABEL.into()], vec![], None)), idx_predicate: None, + is_count_only: false, meta_data: None, }; diff --git a/interactive_engine/executor/ir/integrated/tests/sample_test.rs b/interactive_engine/executor/ir/integrated/tests/sample_test.rs index d5d3b3c3413d..b47f2876cd8a 100644 --- a/interactive_engine/executor/ir/integrated/tests/sample_test.rs +++ b/interactive_engine/executor/ir/integrated/tests/sample_test.rs @@ -56,6 +56,7 @@ mod test { alias: None, params: Some(query_params(vec![], vec![], None)), idx_predicate: None, + is_count_only: false, meta_data: None, }; @@ -74,6 +75,7 @@ mod test { alias: None, params: Some(query_params(vec![], vec![], None)), idx_predicate: None, + is_count_only: false, meta_data: None, }; diff --git a/interactive_engine/executor/ir/integrated/tests/scan_test.rs b/interactive_engine/executor/ir/integrated/tests/scan_test.rs index b7b729ad0459..30f3e4463355 100644 --- a/interactive_engine/executor/ir/integrated/tests/scan_test.rs +++ b/interactive_engine/executor/ir/integrated/tests/scan_test.rs @@ -43,8 +43,13 @@ mod test { // g.V() #[test] fn scan_test() { - let source_iter = - scan_gen(pb::Scan { scan_opt: 0, alias: None, params: None, idx_predicate: None }); + let source_iter = scan_gen(pb::Scan { + scan_opt: 0, + alias: None, + params: None, + idx_predicate: None, + is_count_only: false, + }); let mut result_ids = vec![]; let v1: DefaultId = LDBCVertexParser::to_global_id(1, 0); let v2: DefaultId = LDBCVertexParser::to_global_id(2, 0); @@ -71,6 +76,7 @@ mod test { alias: None, params: Some(query_params(vec![PERSON_LABEL.into()], vec![], None)), idx_predicate: None, + is_count_only: false, }); let mut result_ids = vec![]; let v1: DefaultId = LDBCVertexParser::to_global_id(1, 0); @@ -96,6 +102,7 @@ mod test { alias: None, params: Some(query_params(vec![PERSON_LABEL.into(), SOFTWARE_LABEL.into()], vec![], None)), idx_predicate: None, + is_count_only: false, }); let mut result_ids = vec![]; let v1: DefaultId = LDBCVertexParser::to_global_id(1, 0); @@ -123,6 +130,7 @@ mod test { alias: None, params: None, idx_predicate: Some(vec![1].into()), + is_count_only: false, }); let mut result_ids = vec![]; @@ -146,6 +154,7 @@ mod test { alias: None, params: None, idx_predicate: Some(vec![1, 2].into()), + is_count_only: false, }); let mut result_ids = vec![]; @@ -167,8 +176,13 @@ mod test { fn scan_sample_test() { let mut params = query_params(vec![], vec![], None); params.sample_ratio = 0.1; - let source_iter = - scan_gen(pb::Scan { scan_opt: 0, alias: None, params: Some(params), idx_predicate: None }); + let source_iter = scan_gen(pb::Scan { + scan_opt: 0, + alias: None, + params: Some(params), + idx_predicate: None, + is_count_only: false, + }); let mut result_count = 0; for record in source_iter { if let Some(_element) = record.get(None).unwrap().as_vertex() { @@ -182,8 +196,13 @@ mod test { // g.E() #[test] fn scan_edge_test() { - let source_iter = - scan_gen(pb::Scan { scan_opt: 1, alias: None, params: None, idx_predicate: None }); + let source_iter = scan_gen(pb::Scan { + scan_opt: 1, + alias: None, + params: None, + idx_predicate: None, + is_count_only: false, + }); let mut result_ids = vec![]; let v1: DefaultId = LDBCVertexParser::to_global_id(1, 0); let v2: DefaultId = LDBCVertexParser::to_global_id(2, 0); @@ -210,6 +229,7 @@ mod test { alias: None, params: Some(query_params(vec![KNOWS_LABEL.into()], vec![], None)), idx_predicate: None, + is_count_only: false, }); let mut result_ids = vec![]; let v1: DefaultId = LDBCVertexParser::to_global_id(1, 0); @@ -231,8 +251,13 @@ mod test { fn scan_edge_sample_test() { let mut params = query_params(vec![], vec![], None); params.sample_ratio = 0.1; - let source_iter = - scan_gen(pb::Scan { scan_opt: 1, alias: None, params: Some(params), idx_predicate: None }); + let source_iter = scan_gen(pb::Scan { + scan_opt: 1, + alias: None, + params: Some(params), + idx_predicate: None, + is_count_only: false, + }); let mut result_count = 0; for record in source_iter { if let Some(_element) = record.get(None).unwrap().as_edge() { @@ -242,4 +267,88 @@ mod test { // It is almost impossible to sample 6 edges assert!(result_count < 6); } + + // g.V().count() + #[test] + fn scan_vertex_count_test() { + let params = query_params(vec![], vec![], None); + let source_iter = scan_gen(pb::Scan { + scan_opt: 0, // vertex + alias: None, + params: Some(params), + idx_predicate: None, + is_count_only: true, + }); + let expected = 6; + let mut result = 0; + for record in source_iter { + if let Some(object) = record.get(None).unwrap().as_object() { + result = object.as_i32().unwrap(); + } + } + assert_eq!(result, expected) + } + + // g.V().hasLabel("person").count() + #[test] + fn scan_person_vertex_count_test() { + let params = query_params(vec![PERSON_LABEL.into()], vec![], None); + let source_iter = scan_gen(pb::Scan { + scan_opt: 0, // vertex + alias: None, + params: Some(params), + idx_predicate: None, + is_count_only: true, + }); + let expected = 4; + let mut result = 0; + for record in source_iter { + if let Some(object) = record.get(None).unwrap().as_object() { + result = object.as_i32().unwrap(); + } + } + assert_eq!(result, expected) + } + + // g.E().count() + #[test] + fn scan_edge_count_test() { + let params = query_params(vec![], vec![], None); + let source_iter = scan_gen(pb::Scan { + scan_opt: 1, // edge + alias: None, + params: Some(params), + idx_predicate: None, + is_count_only: true, + }); + let expected = 6; + let mut result = 0; + for record in source_iter { + if let Some(object) = record.get(None).unwrap().as_object() { + result = object.as_i32().unwrap(); + } + } + assert_eq!(result, expected) + } + + // g.E().hasLabel("knows").count() + #[test] + fn scan_knows_edge_count_test() { + let params = query_params(vec![KNOWS_LABEL.into()], vec![], None); + let source_iter = scan_gen(pb::Scan { + scan_opt: 1, // edge + alias: None, + params: Some(params), + idx_predicate: None, + is_count_only: true, + }); + let expected = 2; + let mut result = 0; + for record in source_iter { + if let Some(object) = record.get(None).unwrap().as_object() { + result = object.as_i32().unwrap(); + } + } + assert_eq!(result, expected) + } } diff --git a/interactive_engine/executor/ir/integrated/tests/sink_test.rs b/interactive_engine/executor/ir/integrated/tests/sink_test.rs index 2a4523079454..18c1367e8b0c 100644 --- a/interactive_engine/executor/ir/integrated/tests/sink_test.rs +++ b/interactive_engine/executor/ir/integrated/tests/sink_test.rs @@ -41,6 +41,7 @@ mod test { alias: source_alias.map(|tag| tag.into()), params: Some(query_params(vec![], vec![], None)), idx_predicate: None, + is_count_only: false, meta_data: None, }; diff --git a/interactive_engine/executor/ir/proto/algebra.proto b/interactive_engine/executor/ir/proto/algebra.proto index 04869867962d..533d9dff4e56 100644 --- a/interactive_engine/executor/ir/proto/algebra.proto +++ b/interactive_engine/executor/ir/proto/algebra.proto @@ -243,7 +243,9 @@ message Scan { QueryParams params = 3; // The optional filtering predicate for the field that have been indexed IndexPredicate idx_predicate = 4; - MetaData meta_data = 5; + // The flag that indicates SCAN + COUNT + bool is_count_only = 5; + MetaData meta_data = 6; } // It is typical to use the operator together with EdgeExpand or PathExpand, with the functionality of diff --git a/interactive_engine/executor/ir/proto/physical.proto b/interactive_engine/executor/ir/proto/physical.proto index b5faf0f5ba3f..3297bca6fb61 100644 --- a/interactive_engine/executor/ir/proto/physical.proto +++ b/interactive_engine/executor/ir/proto/physical.proto @@ -151,6 +151,8 @@ message Scan { algebra.QueryParams params = 3; // The optional filtering predicate for the field that have been indexed algebra.IndexPredicate idx_predicate = 4; + // The flag that indicates to SCAN + COUNT + bool is_count_only = 5; } // It is typical to use the operator when: diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/source.rs b/interactive_engine/executor/ir/runtime/src/process/operator/source.rs index 67c8730556a9..76f9179ca349 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/source.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/source.rs @@ -17,7 +17,7 @@ use std::collections::HashMap; use std::convert::{TryFrom, TryInto}; use std::sync::Arc; -use dyn_type::Object; +use dyn_type::{object, Object}; use graph_proxy::apis::graph::PKV; use graph_proxy::apis::partitioner::{PartitionInfo, PartitionedData}; use graph_proxy::apis::{get_graph, ClusterInfo, Edge, QueryParams, Vertex, ID}; @@ -46,6 +46,7 @@ pub struct SourceOperator { primary_key_values: Option, alias: Option, source_type: SourceType, + is_count_only: bool, } impl Default for SourceOperator { @@ -56,6 +57,7 @@ impl Default for SourceOperator { primary_key_values: None, alias: None, source_type: SourceType::Dummy, + is_count_only: false, } } } @@ -127,48 +129,61 @@ impl SourceOperator { match self.source_type { SourceType::Vertex => { - let mut v_source = Box::new(std::iter::empty()) as Box + Send>; - if let Some(ref seeds) = self.src { - if let Some(src) = seeds.get(&(worker_index as u64)) { - if !src.is_empty() { - v_source = graph.get_vertex(src, &self.query_params)?; + if self.is_count_only { + let count = graph.count_vertex(&self.query_params)?; + Ok(Box::new(vec![Record::new(object!(count), self.alias.clone())].into_iter())) + } else { + let mut v_source = + Box::new(std::iter::empty()) as Box + Send>; + if let Some(ref seeds) = self.src { + if let Some(src) = seeds.get(&(worker_index as u64)) { + if !src.is_empty() { + v_source = graph.get_vertex(src, &self.query_params)?; + } } - } - } else if let Some(ref indexed_values) = self.primary_key_values { - if self.query_params.labels.is_empty() { - Err(FnGenError::unsupported_error( - "Empty label in `IndexScan` self.query_params.labels", - ))? - } - let mut source_vertices = vec![]; - for label in &self.query_params.labels { - if let Some(v) = - graph.index_scan_vertex(*label, indexed_values, &self.query_params)? - { - source_vertices.push(v); + } else if let Some(ref indexed_values) = self.primary_key_values { + if self.query_params.labels.is_empty() { + Err(FnGenError::unsupported_error( + "Empty label in `IndexScan` self.query_params.labels", + ))? } - } - v_source = Box::new(source_vertices.into_iter()); - } else { - // parallel scan, and each worker should scan the partitions assigned to it in self.v_params.partitions - v_source = graph.scan_vertex(&self.query_params)?; - }; - Ok(Box::new(v_source.map(move |v| Record::new(v, self.alias.clone())))) + let mut source_vertices = vec![]; + for label in &self.query_params.labels { + if let Some(v) = + graph.index_scan_vertex(*label, indexed_values, &self.query_params)? + { + source_vertices.push(v); + } + } + v_source = Box::new(source_vertices.into_iter()); + } else { + // parallel scan, and each worker should scan the partitions assigned to it in self.v_params.partitions + v_source = graph.scan_vertex(&self.query_params)?; + }; + Ok(Box::new(v_source.map(move |v| Record::new(v, self.alias.clone())))) + } } SourceType::Edge => { - let mut e_source = Box::new(std::iter::empty()) as Box + Send>; - if let Some(ref seeds) = self.src { - if let Some(src) = seeds.get(&(worker_index as u64)) { - if !src.is_empty() { - e_source = graph.get_edge(src, &self.query_params)?; + if self.is_count_only { + let count = graph.count_edge(&self.query_params)?; + Ok(Box::new(vec![Record::new(object!(count), self.alias.clone())].into_iter())) + } else { + let mut e_source = + Box::new(std::iter::empty()) as Box + Send>; + if let Some(ref seeds) = self.src { + if let Some(src) = seeds.get(&(worker_index as u64)) { + if !src.is_empty() { + e_source = graph.get_edge(src, &self.query_params)?; + } } + } else { + // parallel scan, and each worker should scan the partitions assigned to it in self.e_params.partitions + e_source = graph.scan_edge(&self.query_params)?; } - } else { - // parallel scan, and each worker should scan the partitions assigned to it in self.e_params.partitions - e_source = graph.scan_edge(&self.query_params)?; + Ok(Box::new(e_source.map(move |e| Record::new(e, self.alias.clone())))) } - Ok(Box::new(e_source.map(move |e| Record::new(e, self.alias.clone())))) } + SourceType::Table => Err(FnGenError::unsupported_error( "neither `Edge` nor `Vertex` but `Table` type `Source` opr", ))?, @@ -198,6 +213,7 @@ impl TryFrom for SourceOperator { primary_key_values: None, alias: scan_pb.alias, source_type, + is_count_only: scan_pb.is_count_only, }) } }