From 28809ddefce542a2891a924d25245d2f13d31064 Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Tue, 27 Sep 2022 13:25:43 +0800 Subject: [PATCH] [GIE-IR] Profiling for GIE-IR (#2056) * [GIE Profile] provide profiling tools for GIE-IR --- .../src/communication/decorator/evented.rs | 20 ++ .../src/communication/decorator/mod.rs | 11 + .../src/communication/input/session.rs | 49 +++-- .../engine/pegasus/pegasus/src/config.rs | 17 +- .../engine/pegasus/pegasus/src/dataflow.rs | 13 +- .../engine/pegasus/pegasus/src/lib.rs | 2 + .../pegasus/src/operator/concise/map.rs | 2 +- .../pegasus/pegasus/src/operator/mod.rs | 33 ++- .../engine/pegasus/pegasus/src/worker.rs | 4 +- .../engine/pegasus/scripts/log_stat.py | 190 ++++++++++++++++++ 10 files changed, 301 insertions(+), 40 deletions(-) create mode 100644 interactive_engine/executor/engine/pegasus/scripts/log_stat.py diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/evented.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/evented.rs index f4819eecba58..32211d774241 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/evented.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/evented.rs @@ -20,6 +20,7 @@ use crate::event::emitter::EventEmitter; use crate::event::{Event, EventKind}; use crate::progress::{DynPeers, EndOfScope, EndSyncSignal}; use crate::tag::tools::map::TidyTagMap; +use crate::PROFILE_COMM_FLAG; use crate::{Data, Tag}; #[allow(dead_code)] @@ -159,6 +160,25 @@ impl Push> for EventEmitPush { batch.set_seq(*seq as u64); *seq += 1; } + if *PROFILE_COMM_FLAG { + if !self.inner.is_local() { + info_worker!( + "push batches: \t\t[remote_{:?}_{:?}]\t\t push batch of {:?} to channel[{}] to worker {}, len = {}", + self.ch_info.source_port, + self.ch_info.target_port, + batch.tag, + self.ch_info.id.index, + self.target_worker, len) + } else { + info_worker!( + "push batches: \t\t[local_{:?}_{:?}]\t\t push batch of {:?} to channel[{}] to worker {}, len = {}", + self.ch_info.source_port, + self.ch_info.target_port, + batch.tag, + self.ch_info.id.index, + self.target_worker, len) + } + } self.inner.push(batch) } diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/mod.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/mod.rs index fd83d40eea41..695baf6da79a 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/mod.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/decorator/mod.rs @@ -25,6 +25,7 @@ use crate::errors::IOError; use crate::graph::Port; use crate::progress::EndOfScope; use crate::tag::tools::map::TidyTagMap; +use crate::PROFILE_COMM_FLAG; use crate::{Data, Tag}; pub mod aggregate; pub mod broadcast; @@ -119,6 +120,16 @@ impl Push> for LocalMicroBatchPush { c.1 += batch.len(); } } + if *PROFILE_COMM_FLAG { + info_worker!( + "push batches: \t\t[local_{:?}_{:?}]\t\t push batch of {:?} to channel[{}] to self, len = {}", + self.ch_info.source_port, + self.ch_info.target_port, + batch.tag, + self.ch_info.id.index, + batch.len() + ) + } self.inner.push(batch) } diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/input/session.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/input/session.rs index 9b7d790da639..021cfee99071 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/communication/input/session.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/communication/input/session.rs @@ -19,6 +19,7 @@ use crate::communication::input::input::InputBlockGuard; use crate::communication::input::InputHandle; use crate::data::MicroBatch; use crate::errors::{ErrorKind, JobExecError}; +use crate::PROFILE_COMM_FLAG; use crate::{Data, Tag}; pub struct InputSession<'a, D: Data> { @@ -54,19 +55,25 @@ impl<'a, D: Data> InputSession<'a, D> { } else { if let Some(mut batch) = self.input.next()? { let is_last = batch.is_last(); - if log_enabled!(log::Level::Trace) { + if *PROFILE_COMM_FLAG { if !batch.is_empty() { - if is_last { - trace_worker!( - "handle last batch of {:?}, len = {}", - batch.tag, - batch.len() - ); - } else { - trace_worker!("handle batch of {:?}, len = {}", batch.tag, batch.len()); + info_worker!("handle batch of {:?}, len = {}", batch.tag, batch.len()); + } + } else { + if log_enabled!(log::Level::Trace) { + if !batch.is_empty() { + if is_last { + trace_worker!( + "handle last batch of {:?}, len = {}", + batch.tag, + batch.len() + ); + } else { + trace_worker!("handle batch of {:?}, len = {}", batch.tag, batch.len()); + } + } else if is_last { + trace_worker!("handle end of {:?}", batch.tag); } - } else if is_last { - trace_worker!("handle end of {:?}", batch.tag); } } match func(&mut batch) { @@ -107,15 +114,21 @@ impl<'a, D: Data> InputSession<'a, D> { { while let Some(mut batch) = self.input.next_of(tag)? { let is_last = batch.is_last(); - if log_enabled!(log::Level::Trace) { + if *PROFILE_COMM_FLAG { if !batch.is_empty() { - if is_last { - trace_worker!("handle last batch of {:?}, len = {}", batch.tag, batch.len()); - } else { - trace_worker!("handle batch of {:?}, len = {}", batch.tag, batch.len()); + info_worker!("handle batch of {:?}, len = {}", batch.tag, batch.len()); + } + } else { + if log_enabled!(log::Level::Trace) { + if !batch.is_empty() { + if is_last { + trace_worker!("handle last batch of {:?}, len = {}", batch.tag, batch.len()); + } else { + trace_worker!("handle batch of {:?}, len = {}", batch.tag, batch.len()); + } + } else if is_last { + trace_worker!("handle end of {:?}", batch.tag); } - } else if is_last { - trace_worker!("handle end of {:?}", batch.tag); } } match (*func)(&mut batch) { diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/config.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/config.rs index 71ef5e352a5f..a2739a61a987 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/config.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/config.rs @@ -13,15 +13,14 @@ //! See the License for the specific language governing permissions and //! limitations under the License. -use std::hash::Hasher; -use std::path::Path; - +use crate::errors::StartupError; +use crate::{get_servers, get_servers_len}; +use crate::{PROFILE_COMM_FLAG, PROFILE_TIME_FLAG}; use ahash::AHasher; use pegasus_network::config::NetworkConfig; use serde::Deserialize; - -use crate::errors::StartupError; -use crate::{get_servers, get_servers_len}; +use std::hash::Hasher; +use std::path::Path; #[macro_export] macro_rules! configure_with_default { @@ -180,7 +179,11 @@ impl JobConf { impl Default for JobConf { fn default() -> Self { - let plan_print = log_enabled!(log::Level::Trace); + let plan_print = if *PROFILE_COMM_FLAG | *PROFILE_TIME_FLAG { + log_enabled!(log::Level::Info) + } else { + log_enabled!(log::Level::Trace) + }; JobConf { job_id: 0, job_name: "anonymity".to_owned(), diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/dataflow.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/dataflow.rs index f34b973307b5..5d45220b74aa 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/dataflow.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/dataflow.rs @@ -31,6 +31,7 @@ use crate::graph::{Dependency, DotGraph, Edge, Port}; use crate::operator::{GeneralOperator, NotifiableOperator, Operator, OperatorBuilder, OperatorCore}; use crate::schedule::Schedule; use crate::{Data, JobConf, Tag, WorkerId}; +use crate::{PROFILE_COMM_FLAG, PROFILE_TIME_FLAG}; pub struct DataflowBuilder { pub worker_id: WorkerId, @@ -116,7 +117,7 @@ impl DataflowBuilder { if report { writeln!(plan_desc, "\n============ Build Dataflow ==============").ok(); writeln!(plan_desc, "Peers:\t{}", self.worker_id.total_peers()).ok(); - writeln!(plan_desc, "{}", "Operators: ").ok(); + writeln!(plan_desc, "{}", "Operators:\t").ok(); } let mut builds = self.operators.replace(vec![]); @@ -144,20 +145,26 @@ impl DataflowBuilder { op_names.push(op.info.name.clone()); if report { writeln!(plan_desc, "\t{}\t{}({})", op.info.index, op.info.name, op.info.index).ok(); + if *PROFILE_TIME_FLAG | *PROFILE_COMM_FLAG { + info_worker!("job vertices: \t\t[{}_{}]\t\t", op.info.name, op.info.index); + } } operators.push(Some(op)); } let edges = self.edges.replace(vec![]); if report { - writeln!(plan_desc, "Channels ").ok(); + writeln!(plan_desc, "Channels:\t").ok(); for e in edges.iter() { + if *PROFILE_TIME_FLAG | *PROFILE_COMM_FLAG { + info_worker!("job edges: \t\t[{:?}_{:?}]\t\t", e.source, e.target); + } writeln!(plan_desc, "\t{:?}", e).ok(); } } writeln!(plan_desc, "==========================================").ok(); if report { - info!("crate job[{}] with configuration : {:?}", self.config.job_id, self.config); + info!("create job[{}] with configuration : {:?}", self.config.job_id, self.config); info!("{}", plan_desc); let dot_g = DotGraph::new(self.config.job_name.clone(), self.config.job_id, op_names, edges); if let Ok(mut f) = File::create(format!("{}_{}.dot", self.config.job_name, self.config.job_id)) diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs index 0f34ec0c685d..a9cd16101fcf 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs @@ -73,6 +73,8 @@ use crate::worker_id::WorkerIdIter; lazy_static! { static ref SERVER_ID: Mutex> = Mutex::new(None); static ref SERVERS: RwLock> = RwLock::new(vec![]); + pub static ref PROFILE_TIME_FLAG: bool = configure_with_default!(bool, "PROFILE_TIME_FLAG", false); + pub static ref PROFILE_COMM_FLAG: bool = configure_with_default!(bool, "PROFILE_COMM_FLAG", false); } thread_local! { diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/operator/concise/map.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/operator/concise/map.rs index 87939327ea53..0d1932ca4352 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/operator/concise/map.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/operator/concise/map.rs @@ -25,7 +25,7 @@ fn _get_name(base: &str, extra: &str) -> String { if extra.is_empty() { base.to_string() } else { - format!("{:?} [{:?}]", base, extra) + format!("{}_{}", base, extra) } } diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/operator/mod.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/operator/mod.rs index 34c193fc5723..a5348155229e 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/operator/mod.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/operator/mod.rs @@ -34,6 +34,7 @@ use crate::schedule::state::inbound::InputEndNotify; use crate::schedule::state::outbound::OutputCancelState; use crate::tag::tools::map::TidyTagMap; use crate::{Data, Tag}; +use crate::{PROFILE_COMM_FLAG, PROFILE_TIME_FLAG}; pub trait Notifiable: Send + 'static { fn on_end(&mut self, n: End, outputs: &[Box]) -> Result<(), JobExecError>; @@ -329,7 +330,7 @@ impl Operator { } else { result = Err(err); } - }; + } for (port, input) in self.inputs.iter().enumerate() { while let Some(end) = input.extract_end() { @@ -341,7 +342,11 @@ impl Operator { for output in self.outputs.iter() { output.flush()?; } - debug_worker!("after fire operator {:?}", self.info); + if *PROFILE_COMM_FLAG { + info_worker!("after fire operator \t\t{:?}\t\t", self.info); + } else { + debug_worker!("after fire operator {:?}", self.info); + } result } @@ -366,13 +371,23 @@ impl Operator { warn_worker!("close operator {:?}'s output failure, caused by {}", self.info, err); } } - debug_worker!( - "operator {:?}\tfinished, used {:.2}ms, fired {} times, avg fire use {}us", - self.info, - self.exec_st.get() as f64 / 1000.0, - self.fire_times, - self.exec_st.get() / self.fire_times - ); + if *PROFILE_TIME_FLAG { + info_worker!( + "operator finished \t\t{:?}\t\t, fired {} times, avg fire use {}us, in ms time_cost = {:.2}", + self.info, + self.fire_times, + self.exec_st.get() / self.fire_times, + self.exec_st.get() as f64 / 1000.0, + ); + } else { + debug_worker!( + "operator {:?}\tfinished, used {:.2}ms, fired {} times, avg fire use {}us", + self.info, + self.exec_st.get() as f64 / 1000.0, + self.fire_times, + self.exec_st.get() / self.fire_times + ); + } } fn fire_inner(&mut self) -> Result<(), JobExecError> { diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/worker.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/worker.rs index 9734f3edefc5..03a6a47d48cb 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/worker.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/worker.rs @@ -220,10 +220,10 @@ impl Task for Worker { Ok(state) => { if TaskState::Finished == state { info_worker!( - "job({}) '{}' finished, used {:?};", + "job({}) '{}' finished, used {:?} ms;", self.id.job_id, self.conf.job_name, - self.start.elapsed() + self.start.elapsed().as_millis() ) } state diff --git a/interactive_engine/executor/engine/pegasus/scripts/log_stat.py b/interactive_engine/executor/engine/pegasus/scripts/log_stat.py new file mode 100644 index 000000000000..83c81671be45 --- /dev/null +++ b/interactive_engine/executor/engine/pegasus/scripts/log_stat.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2022 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Profile Tool for Querying on GAIA + +This tool provide a way of collecting some information from the log of GAIA, including + * the batch number of intermediate results between operators, + * the communication cost (batches that need to be shuffled intra/inter-processes) between operators, + * the time cost of each operator, and the ratio of operator_time_cost, to total_computation_cost + (specifically, total_computation_cost doesn't include communication time.) + * the total_computation_cost, and the ratio of total_computation_cost to total_time_cost + (Specifically, total_time_cost includes communication time) +help to profile the query phases in GAIA. + +Before using this tool, please start GAIA with +`PROFILE_TIME_FLAG=true PROFILE_COMM_FLAG=true`, +with which we could get necessary logs for profiling time cost and communication cost respectively. +""" + +import argparse +import re +from collections import defaultdict +from graphviz import Digraph + + +def parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument("--input", default='executor.log') + parser.add_argument("--job_id", default='1') + parser.add_argument("--worker_num", default='2') + parser.add_argument("--show_details", default=False) + args = parser.parse_args() + return args + +def accum_job_info(worker_matrix, job_matrix): + for item in worker_matrix: + if item not in job_matrix: + job_matrix[item] = 0 + job_matrix[item] += worker_matrix[item] + +def draw_time_cost_graph(graph_name, worker_operator_time_cost, job_edge_desc, total_timecost): + # draw the time cost graph (of each worker) + worker_cost_dot = Digraph(graph_name) + # draw nodes of time cost graph + for op in worker_operator_time_cost: + op_index = extract_operator_idx(op) + percentage = float(worker_operator_time_cost[op])/float(total_timecost) + worker_cost_dot.node(op_index, label=f'{op}: {worker_operator_time_cost[op]} ms, {format(percentage,".2%")}') + # draw edges of time cost graph + for edge in job_edge_desc: + edge_src_index,edge_dst_index =extract_edge_idx(edge) + worker_cost_dot.edge(edge_src_index,edge_dst_index) + worker_cost_dot.render(view=False) + + +def draw_communication_cost_graph(graph_name, job_operators, job_edges): + # draw the communication cost graph (of each job) + job_comm_dot = Digraph(graph_name) + # draw nodes of communication cost graph + for op in job_operators: + op_index = extract_operator_idx(op) + job_comm_dot.node(op_index, label=f'{op}: {job_operators[op]}') + # draw edges of communication cost graph + job_edge_list = defaultdict(list) + for edge in job_edges: + edge_src_index,edge_dst_index =extract_edge_idx(edge) + job_edge_list[edge_src_index,edge_dst_index].append((edge,job_edges[edge])) + for edge in job_edge_list: + job_comm_dot.edge(edge[0], edge[1], label=f'{job_edge_list[edge]}') + job_comm_dot.render(view=False) + +def extract_operator(line): + # \t\t[operator_name_idx]\t\t + op = line[line.index('\t\t[') + 2: line.index("]\t\t") +1] + return op + +def extract_operator_idx(op): + # [operator_name_idx] + op_index = op.split("_")[-1].split("]")[0] + return op_index + +def extract_edge_idx(edge): + # [local_(0.0)_(1.0)] or [remote(0.0)_(1.0)] + edge_src_index = edge.split(".")[0].split("(")[-1] + edge_dst_index = edge.split(".")[1].split("(")[-1] + return edge_src_index,edge_dst_index + +def extract_val(suffix): + # len = xx or time_cost = xx + len = suffix.split("=")[-1] + return len + +def main(args): + job_ids = [i for i in args.job_id.split(",")] + for job_id in job_ids: + print("job: ", job_id) + worker_num = int(args.worker_num) + job_operators = defaultdict(int) + job_edges = defaultdict(int) + job_edge_desc = defaultdict(int) + + for worker_id in range(worker_num): + print("worker:", worker_id) + worker_operators = defaultdict(int) + worker_edges = defaultdict(int) + worker_operator_time_cost = defaultdict(str) + operator_len = 0 + computation_cost = 0 + total_timecost = 0 + + common_prefix = f"[worker_{worker_id}({job_id})]: " + worker_edge_desc_flag = common_prefix + "job edges: " + worker_vertex_desc_flag = common_prefix + "job vertices: " + worker_push_flag = common_prefix + "push batches: " + worker_handle_flag = common_prefix + "handle batch " + worker_after_fire_flag = common_prefix + "after fire operator " + worker_operator_finished_flag = common_prefix + "operator finished " + + with open(args.input, 'r') as f: + for line in f.readlines(): + # initialize + if worker_vertex_desc_flag in line: + operator = extract_operator(line) + worker_operators[operator] = 0 + computation_cost = 0 + if worker_edge_desc_flag in line: + operator = extract_operator(line) + job_edge_desc[operator] = 0 + + # count local/remote push batches between operators + if worker_push_flag in line: + push_edge = extract_operator(line) + worker_edges[push_edge] += int(extract_val(line)) + + # count process batches for each operator + if worker_handle_flag in line: + operator_len += int(extract_val(line)) + if worker_after_fire_flag in line: + operator = extract_operator(line) + worker_operators[operator] += operator_len + operator_len = 0 + + # count process time for each operator + if worker_operator_finished_flag in line: + operator = extract_operator(line) + time_cost = float(extract_val(line)) + worker_operator_time_cost[operator] = time_cost + computation_cost += time_cost + + job_finished = re.search(f'\[worker_{worker_id}\({job_id}\)\]: job\({job_id}\) .* finished, used (.*) ms;', line) + if job_finished: + total_timecost = job_finished.group(1) + print(job_finished.group()) + percentage = computation_cost/float(total_timecost) + print("computation cost is {:.2f} ms, and the ratio of computation cost to total cost is {:.2f}".format(computation_cost, percentage)) + + if args.show_details: + print("=========== operator intermediate batches ===========\n", dict(worker_operators)) + print("=============== operator push batches ===============\n", dict(worker_edges)) + print("================= operator time cost ================\n", dict(worker_operator_time_cost)) + draw_time_cost_graph(f'job_{job_id}_{worker_id}_cost', worker_operator_time_cost, job_edge_desc, computation_cost) + + # accum process batches of each operator for job + accum_job_info(worker_operators, job_operators) + # accum push batches between operators for job; push batch including local push and remote push; + accum_job_info(worker_edges, job_edges) + + # draw the communication cost table (of each job) + draw_communication_cost_graph(f'job_{job_id}_comm',job_operators,job_edges) + +if __name__ == '__main__': + args = parse_args() + main(args) +