Skip to content

Commit

Permalink
[GIE Runtime] Debug runtime op info on in a more clearly way (#2226)
Browse files Browse the repository at this point in the history
* [GIE Runtime] debug runtime op info on worker 0
  • Loading branch information
BingqingLyu committed Nov 28, 2022
1 parent f77627d commit 84f449c
Show file tree
Hide file tree
Showing 21 changed files with 74 additions and 30 deletions.
2 changes: 1 addition & 1 deletion interactive_engine/executor/ir/core/src/glogue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
//! See the License for the specific language governing permissions and
//! limitations under the License.

use std::cmp::Ordering;
use std::collections::HashMap;

use ir_common::generated::algebra as pb;
use ir_common::generated::common as common_pb;
use std::cmp::Ordering;

pub type PatternId = usize;
pub type PatternLabelId = ir_common::LabelId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

use std::collections::{BTreeSet, HashMap};

use ir_common::expr_parse::str_to_expr_pb;
use ir_common::generated::algebra as pb;
use ir_common::generated::common as common_pb;
use ir_common::KeyId;
Expand All @@ -27,7 +28,6 @@ use rand::seq::SliceRandom;
use rand::SeedableRng;

use crate::common::pattern_meta_cases::*;
use ir_common::expr_parse::str_to_expr_pb;

pub const TAG_A: KeyId = 0;
pub const TAG_B: KeyId = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,9 @@ impl AccumFactoryGen for algebra_pb::GroupBy {
};
accum_ops.push((entry_accumulator, tag_key, alias));
}
debug!("Runtime accumulator operator: {:?}", accum_ops);
if log_enabled!(log::Level::Debug) && pegasus::get_current_worker().index == 0 {
debug!("Runtime accumulator operator: {:?}", accum_ops);
}
Ok(RecordAccumulator { accum_ops })
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ impl FilterFuncGen for algebra_pb::Select {
fn gen_filter(self) -> FnGenResult<Box<dyn FilterFunction<Record>>> {
if let Some(predicate) = self.predicate {
let select_operator = SelectOperator { filter: predicate.try_into()? };
debug!("Runtime select operator: {:?}", select_operator);
if log_enabled!(log::Level::Debug) && pegasus::get_current_worker().index == 0 {
debug!("Runtime select operator: {:?}", select_operator);
}
Ok(Box::new(select_operator))
} else {
Err(ParsePbError::EmptyFieldError("empty select pb".to_string()).into())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,12 @@ impl FlatMapFuncGen for algebra_pb::EdgeExpand {
let direction = Direction::from(direction_pb);
let query_params: QueryParams = self.params.try_into()?;
let expand_opt: ExpandOpt = unsafe { ::std::mem::transmute(self.expand_opt) };
debug!(
"Runtime expand operator of edge with start_v_tag {:?}, edge_tag {:?}, direction {:?}, query_params {:?}, expand_opt {:?}",
start_v_tag, edge_or_end_v_tag, direction, query_params, expand_opt
);
if log_enabled!(log::Level::Debug) && pegasus::get_current_worker().index == 0 {
debug!(
"Runtime expand operator of edge with start_v_tag {:?}, end_tag {:?}, direction {:?}, query_params {:?}, expand_opt {:?}",
start_v_tag, edge_or_end_v_tag, direction, query_params, expand_opt
);
}

match expand_opt {
ExpandOpt::Vertex => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ impl FlatMapFuncGen for algebra_pb::GetV {
.map(|name_or_id| name_or_id.try_into())
.transpose()?;
let get_both_v_operator = GetBothVOperator { start_tag, alias };
debug!("Runtime get_both_v operator: {:?}", get_both_v_operator);
if log_enabled!(log::Level::Debug) && pegasus::get_current_worker().index == 0 {
debug!("Runtime get_both_v operator: {:?}", get_both_v_operator);
}
Ok(Box::new(get_both_v_operator))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ impl FlatMapFuncGen for algebra_pb::Unfold {
.map(|alias| alias.try_into())
.transpose()?;
let unfold_operator = UnfoldOperator { tag, alias };
debug!("Runtime unfold operator {:?}", unfold_operator);
if log_enabled!(log::Level::Debug) && pegasus::get_current_worker().index == 0 {
debug!("Runtime unfold operator {:?}", unfold_operator);
}
Ok(Box::new(unfold_operator))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ impl GroupGen<Record, RecordKey, Record> for algebra_pb::GroupBy {
key_aliases.push(alias);
}
let group_map = GroupMap { key_aliases };
debug!("Runtime group operator group_map: {:?}", group_map);
if log_enabled!(log::Level::Debug) && pegasus::get_current_worker().index == 0 {
debug!("Runtime group operator group_map: {:?}", group_map);
}
Ok(Box::new(group_map))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,25 @@ use crate::process::record::{Record, RecordKey};
impl JoinKeyGen<Record, RecordKey, Record> for algebra_pb::Join {
fn gen_left_kv_fn(&self) -> FnGenResult<Box<dyn KeyFunction<Record, RecordKey, Record>>> {
let left_kv_fn = KeySelector::with(self.left_keys.clone())?;
debug!("Runtime join operator left_kv_fn {:?}", left_kv_fn);
if log_enabled!(log::Level::Debug) && pegasus::get_current_worker().index == 0 {
debug!("Runtime join operator left_kv_fn {:?}", left_kv_fn);
}
Ok(Box::new(left_kv_fn))
}

fn gen_right_kv_fn(&self) -> FnGenResult<Box<dyn KeyFunction<Record, RecordKey, Record>>> {
let right_kv_fn = KeySelector::with(self.right_keys.clone())?;
debug!("Runtime join operator right_kv_fn {:?}", right_kv_fn);
if log_enabled!(log::Level::Debug) && pegasus::get_current_worker().index == 0 {
debug!("Runtime join operator right_kv_fn {:?}", right_kv_fn);
}
Ok(Box::new(right_kv_fn))
}

fn get_join_kind(&self) -> JoinKind {
let join_kind = unsafe { ::std::mem::transmute(self.kind) };
debug!("Runtime join operator join_kind {:?}", join_kind);
if log_enabled!(log::Level::Debug) && pegasus::get_current_worker().index == 0 {
debug!("Runtime join operator join_kind {:?}", join_kind);
}
join_kind
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,19 @@ impl KeyFunctionGen for algebra_pb::GroupBy {
.map(|mapping| mapping.key.clone().unwrap())
.collect::<Vec<_>>(),
)?;
debug!("Runtime group operator key_selector: {:?}", key_selector);
if log_enabled!(log::Level::Debug) && pegasus::get_current_worker().index == 0 {
debug!("Runtime group operator key_selector: {:?}", key_selector);
}
Ok(Box::new(key_selector))
}
}

impl KeyFunctionGen for algebra_pb::Dedup {
fn gen_key(self) -> FnGenResult<Box<dyn KeyFunction<Record, RecordKey, Record>>> {
let key_selector = KeySelector::with(self.keys)?;
debug!("Runtime dedup operator key_selector: {:?}", key_selector);
if log_enabled!(log::Level::Debug) && pegasus::get_current_worker().index == 0 {
debug!("Runtime dedup operator key_selector: {:?}", key_selector);
}
Ok(Box::new(key_selector))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ impl FilterMapFuncGen for algebra_pb::Auxilia {
.map(|alias| alias.try_into())
.collect::<Result<_, _>>()?;
let auxilia_operator = AuxiliaOperator { tag, query_params, alias, remove_tags };
debug!("Runtime AuxiliaOperator: {:?}", auxilia_operator);
if log_enabled!(log::Level::Debug) && pegasus::get_current_worker().index == 0 {
debug!("Runtime AuxiliaOperator: {:?}", auxilia_operator);
}
Ok(Box::new(auxilia_operator))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,12 @@ impl FilterMapFuncGen for algebra_pb::EdgeExpand {
unsafe { ::std::mem::transmute(self.direction) };
let direction = Direction::from(direction_pb);
let query_params: QueryParams = self.params.try_into()?;
debug!(
"Runtime expand collection operator of edge with start_v_tag {:?}, edge_tag {:?}, direction {:?}, query_params {:?}",
start_v_tag, edge_or_end_v_tag, direction, query_params
);
if log_enabled!(log::Level::Debug) && pegasus::get_current_worker().index == 0 {
debug!(
"Runtime expand collection operator of edge with start_v_tag {:?}, end_tag {:?}, direction {:?}, query_params {:?}",
start_v_tag, edge_or_end_v_tag, direction, query_params
);
}
if self.expand_opt != algebra_pb::edge_expand::ExpandOpt::Vertex as i32 {
Err(FnGenError::unsupported_error("expand edges in ExpandIntersection"))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ impl FilterMapFuncGen for algebra_pb::GetV {
.map(|name_or_id| name_or_id.try_into())
.transpose()?;
let get_vertex_operator = GetVertexOperator { start_tag, opt, alias };
debug!("Runtime get_vertex operator: {:?}", get_vertex_operator);
if log_enabled!(log::Level::Debug) && pegasus::get_current_worker().index == 0 {
debug!("Runtime get_vertex operator: {:?}", get_vertex_operator);
}
Ok(Box::new(get_vertex_operator))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ impl MapFuncGen for algebra_pb::PathEnd {
.map(|alias| alias.try_into())
.transpose()?;
let path_end = PathEndOperator { alias };
debug!("Runtime path end operator: {:?}", path_end);
if log_enabled!(log::Level::Debug) && pegasus::get_current_worker().index == 0 {
debug!("Runtime path end operator: {:?}", path_end);
}
Ok(Box::new(path_end))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ impl FilterMapFuncGen for algebra_pb::PathStart {
path_opt: unsafe { std::mem::transmute(self.path_opt) },
result_opt: unsafe { std::mem::transmute(self.result_opt) },
};
debug!("Runtime path start operator: {:?}", path_start_operator);
if log_enabled!(log::Level::Debug) && pegasus::get_current_worker().index == 0 {
debug!("Runtime path start operator: {:?}", path_start_operator);
}
Ok(Box::new(path_start_operator))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ impl FilterMapFuncGen for algebra_pb::Project {
projected_columns.push((projector, alias));
}
let project_operator = ProjectOperator { is_append: self.is_append, projected_columns };
debug!("Runtime project operator {:?}", project_operator);
if log_enabled!(log::Level::Debug) && pegasus::get_current_worker().index == 0 {
debug!("Runtime project operator {:?}", project_operator);
}
Ok(Box::new(project_operator))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ impl RecordRouter {
.key
.map(|e| e.try_into())
.transpose()?;
debug!("Runtime shuffle number of worker {:?} and shuffle key {:?}", num_workers, shuffle_key);
if log_enabled!(log::Level::Debug) && pegasus::get_current_worker().index == 0 {
debug!("Runtime shuffle number of worker {:?} and shuffle key {:?}", num_workers, shuffle_key);
}
Ok(RecordRouter { p, num_workers, shuffle_key })
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ impl SinkGen for DefaultSinkOp {
sink_keys,
schema_map: if schema_map.is_empty() { None } else { Some(schema_map) },
};
debug!("Runtime sink operator: {:?}", record_sinker);
if log_enabled!(log::Level::Debug) && pegasus::get_current_worker().index == 0 {
debug!("Runtime sink operator: {:?}", record_sinker);
}
Ok(Sinker::DefaultSinker(record_sinker))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ impl SinkGen for SinkVineyardOp {
)?;
let graph_sink_encoder =
GraphSinkEncoder { graph_writer: Arc::new(Mutex::new(graph_writer)), sink_keys };
debug!("Runtime sink graph operator: {:?}", graph_sink_encoder,);
if log_enabled!(log::Level::Debug) && pegasus::get_current_worker().index == 0 {
debug!("Runtime sink graph operator: {:?}", graph_sink_encoder,);
}
Ok(Sinker::GraphSinker(graph_sink_encoder))
} else {
Err(ParsePbError::EmptyFieldError("graph_schema in SinkVineyardOp".to_string()))?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ impl CompareFunction<Record> for RecordCompare {
impl CompareFunctionGen for algebra_pb::OrderBy {
fn gen_cmp(self) -> FnGenResult<Box<dyn CompareFunction<Record>>> {
let record_compare = RecordCompare::try_from(self)?;
debug!("Runtime order operator cmp: {:?}", record_compare);
if log_enabled!(log::Level::Debug) && pegasus::get_current_worker().index == 0 {
debug!("Runtime order operator cmp: {:?}", record_compare);
}
Ok(Box::new(record_compare))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ impl ApplyGen<Record, Vec<Record>, Option<Record>> for algebra_pb::Apply {
}
}
let apply_operator = ApplyOperator { join_kind, alias };
debug!("Runtime apply operator {:?}", apply_operator);
if log_enabled!(log::Level::Debug) && pegasus::get_current_worker().index == 0 {
debug!("Runtime apply operator {:?}", apply_operator);
}
Ok(Box::new(apply_operator))
}
}

0 comments on commit 84f449c

Please sign in to comment.