Skip to content

Commit

Permalink
[GIE-IR] Profiling for GIE-IR (#2056)
Browse files Browse the repository at this point in the history
* [GIE Profile] provide profiling tools for GIE-IR
  • Loading branch information
BingqingLyu committed Sep 27, 2022
1 parent 2d57a3d commit 28809dd
Show file tree
Hide file tree
Showing 10 changed files with 301 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::event::emitter::EventEmitter;
use crate::event::{Event, EventKind};
use crate::progress::{DynPeers, EndOfScope, EndSyncSignal};
use crate::tag::tools::map::TidyTagMap;
use crate::PROFILE_COMM_FLAG;
use crate::{Data, Tag};

#[allow(dead_code)]
Expand Down Expand Up @@ -159,6 +160,25 @@ impl<D: Data> Push<MicroBatch<D>> for EventEmitPush<D> {
batch.set_seq(*seq as u64);
*seq += 1;
}
if *PROFILE_COMM_FLAG {
if !self.inner.is_local() {
info_worker!(
"push batches: \t\t[remote_{:?}_{:?}]\t\t push batch of {:?} to channel[{}] to worker {}, len = {}",
self.ch_info.source_port,
self.ch_info.target_port,
batch.tag,
self.ch_info.id.index,
self.target_worker, len)
} else {
info_worker!(
"push batches: \t\t[local_{:?}_{:?}]\t\t push batch of {:?} to channel[{}] to worker {}, len = {}",
self.ch_info.source_port,
self.ch_info.target_port,
batch.tag,
self.ch_info.id.index,
self.target_worker, len)
}
}
self.inner.push(batch)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::errors::IOError;
use crate::graph::Port;
use crate::progress::EndOfScope;
use crate::tag::tools::map::TidyTagMap;
use crate::PROFILE_COMM_FLAG;
use crate::{Data, Tag};
pub mod aggregate;
pub mod broadcast;
Expand Down Expand Up @@ -119,6 +120,16 @@ impl<T: Data> Push<MicroBatch<T>> for LocalMicroBatchPush<T> {
c.1 += batch.len();
}
}
if *PROFILE_COMM_FLAG {
info_worker!(
"push batches: \t\t[local_{:?}_{:?}]\t\t push batch of {:?} to channel[{}] to self, len = {}",
self.ch_info.source_port,
self.ch_info.target_port,
batch.tag,
self.ch_info.id.index,
batch.len()
)
}
self.inner.push(batch)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::communication::input::input::InputBlockGuard;
use crate::communication::input::InputHandle;
use crate::data::MicroBatch;
use crate::errors::{ErrorKind, JobExecError};
use crate::PROFILE_COMM_FLAG;
use crate::{Data, Tag};

pub struct InputSession<'a, D: Data> {
Expand Down Expand Up @@ -54,19 +55,25 @@ impl<'a, D: Data> InputSession<'a, D> {
} else {
if let Some(mut batch) = self.input.next()? {
let is_last = batch.is_last();
if log_enabled!(log::Level::Trace) {
if *PROFILE_COMM_FLAG {
if !batch.is_empty() {
if is_last {
trace_worker!(
"handle last batch of {:?}, len = {}",
batch.tag,
batch.len()
);
} else {
trace_worker!("handle batch of {:?}, len = {}", batch.tag, batch.len());
info_worker!("handle batch of {:?}, len = {}", batch.tag, batch.len());
}
} else {
if log_enabled!(log::Level::Trace) {
if !batch.is_empty() {
if is_last {
trace_worker!(
"handle last batch of {:?}, len = {}",
batch.tag,
batch.len()
);
} else {
trace_worker!("handle batch of {:?}, len = {}", batch.tag, batch.len());
}
} else if is_last {
trace_worker!("handle end of {:?}", batch.tag);
}
} else if is_last {
trace_worker!("handle end of {:?}", batch.tag);
}
}
match func(&mut batch) {
Expand Down Expand Up @@ -107,15 +114,21 @@ impl<'a, D: Data> InputSession<'a, D> {
{
while let Some(mut batch) = self.input.next_of(tag)? {
let is_last = batch.is_last();
if log_enabled!(log::Level::Trace) {
if *PROFILE_COMM_FLAG {
if !batch.is_empty() {
if is_last {
trace_worker!("handle last batch of {:?}, len = {}", batch.tag, batch.len());
} else {
trace_worker!("handle batch of {:?}, len = {}", batch.tag, batch.len());
info_worker!("handle batch of {:?}, len = {}", batch.tag, batch.len());
}
} else {
if log_enabled!(log::Level::Trace) {
if !batch.is_empty() {
if is_last {
trace_worker!("handle last batch of {:?}, len = {}", batch.tag, batch.len());
} else {
trace_worker!("handle batch of {:?}, len = {}", batch.tag, batch.len());
}
} else if is_last {
trace_worker!("handle end of {:?}", batch.tag);
}
} else if is_last {
trace_worker!("handle end of {:?}", batch.tag);
}
}
match (*func)(&mut batch) {
Expand Down
17 changes: 10 additions & 7 deletions interactive_engine/executor/engine/pegasus/pegasus/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@
//! See the License for the specific language governing permissions and
//! limitations under the License.

use std::hash::Hasher;
use std::path::Path;

use crate::errors::StartupError;
use crate::{get_servers, get_servers_len};
use crate::{PROFILE_COMM_FLAG, PROFILE_TIME_FLAG};
use ahash::AHasher;
use pegasus_network::config::NetworkConfig;
use serde::Deserialize;

use crate::errors::StartupError;
use crate::{get_servers, get_servers_len};
use std::hash::Hasher;
use std::path::Path;

#[macro_export]
macro_rules! configure_with_default {
Expand Down Expand Up @@ -180,7 +179,11 @@ impl JobConf {

impl Default for JobConf {
fn default() -> Self {
let plan_print = log_enabled!(log::Level::Trace);
let plan_print = if *PROFILE_COMM_FLAG | *PROFILE_TIME_FLAG {
log_enabled!(log::Level::Info)
} else {
log_enabled!(log::Level::Trace)
};
JobConf {
job_id: 0,
job_name: "anonymity".to_owned(),
Expand Down
13 changes: 10 additions & 3 deletions interactive_engine/executor/engine/pegasus/pegasus/src/dataflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::graph::{Dependency, DotGraph, Edge, Port};
use crate::operator::{GeneralOperator, NotifiableOperator, Operator, OperatorBuilder, OperatorCore};
use crate::schedule::Schedule;
use crate::{Data, JobConf, Tag, WorkerId};
use crate::{PROFILE_COMM_FLAG, PROFILE_TIME_FLAG};

pub struct DataflowBuilder {
pub worker_id: WorkerId,
Expand Down Expand Up @@ -116,7 +117,7 @@ impl DataflowBuilder {
if report {
writeln!(plan_desc, "\n============ Build Dataflow ==============").ok();
writeln!(plan_desc, "Peers:\t{}", self.worker_id.total_peers()).ok();
writeln!(plan_desc, "{}", "Operators: ").ok();
writeln!(plan_desc, "{}", "Operators:\t").ok();
}

let mut builds = self.operators.replace(vec![]);
Expand Down Expand Up @@ -144,20 +145,26 @@ impl DataflowBuilder {
op_names.push(op.info.name.clone());
if report {
writeln!(plan_desc, "\t{}\t{}({})", op.info.index, op.info.name, op.info.index).ok();
if *PROFILE_TIME_FLAG | *PROFILE_COMM_FLAG {
info_worker!("job vertices: \t\t[{}_{}]\t\t", op.info.name, op.info.index);
}
}
operators.push(Some(op));
}
let edges = self.edges.replace(vec![]);
if report {
writeln!(plan_desc, "Channels ").ok();
writeln!(plan_desc, "Channels:\t").ok();
for e in edges.iter() {
if *PROFILE_TIME_FLAG | *PROFILE_COMM_FLAG {
info_worker!("job edges: \t\t[{:?}_{:?}]\t\t", e.source, e.target);
}
writeln!(plan_desc, "\t{:?}", e).ok();
}
}

writeln!(plan_desc, "==========================================").ok();
if report {
info!("crate job[{}] with configuration : {:?}", self.config.job_id, self.config);
info!("create job[{}] with configuration : {:?}", self.config.job_id, self.config);
info!("{}", plan_desc);
let dot_g = DotGraph::new(self.config.job_name.clone(), self.config.job_id, op_names, edges);
if let Ok(mut f) = File::create(format!("{}_{}.dot", self.config.job_name, self.config.job_id))
Expand Down
2 changes: 2 additions & 0 deletions interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ use crate::worker_id::WorkerIdIter;
lazy_static! {
static ref SERVER_ID: Mutex<Option<u64>> = Mutex::new(None);
static ref SERVERS: RwLock<Vec<u64>> = RwLock::new(vec![]);
pub static ref PROFILE_TIME_FLAG: bool = configure_with_default!(bool, "PROFILE_TIME_FLAG", false);
pub static ref PROFILE_COMM_FLAG: bool = configure_with_default!(bool, "PROFILE_COMM_FLAG", false);
}

thread_local! {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ fn _get_name(base: &str, extra: &str) -> String {
if extra.is_empty() {
base.to_string()
} else {
format!("{:?} [{:?}]", base, extra)
format!("{}_{}", base, extra)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::schedule::state::inbound::InputEndNotify;
use crate::schedule::state::outbound::OutputCancelState;
use crate::tag::tools::map::TidyTagMap;
use crate::{Data, Tag};
use crate::{PROFILE_COMM_FLAG, PROFILE_TIME_FLAG};

pub trait Notifiable: Send + 'static {
fn on_end(&mut self, n: End, outputs: &[Box<dyn OutputProxy>]) -> Result<(), JobExecError>;
Expand Down Expand Up @@ -329,7 +330,7 @@ impl Operator {
} else {
result = Err(err);
}
};
}

for (port, input) in self.inputs.iter().enumerate() {
while let Some(end) = input.extract_end() {
Expand All @@ -341,7 +342,11 @@ impl Operator {
for output in self.outputs.iter() {
output.flush()?;
}
debug_worker!("after fire operator {:?}", self.info);
if *PROFILE_COMM_FLAG {
info_worker!("after fire operator \t\t{:?}\t\t", self.info);
} else {
debug_worker!("after fire operator {:?}", self.info);
}
result
}

Expand All @@ -366,13 +371,23 @@ impl Operator {
warn_worker!("close operator {:?}'s output failure, caused by {}", self.info, err);
}
}
debug_worker!(
"operator {:?}\tfinished, used {:.2}ms, fired {} times, avg fire use {}us",
self.info,
self.exec_st.get() as f64 / 1000.0,
self.fire_times,
self.exec_st.get() / self.fire_times
);
if *PROFILE_TIME_FLAG {
info_worker!(
"operator finished \t\t{:?}\t\t, fired {} times, avg fire use {}us, in ms time_cost = {:.2}",
self.info,
self.fire_times,
self.exec_st.get() / self.fire_times,
self.exec_st.get() as f64 / 1000.0,
);
} else {
debug_worker!(
"operator {:?}\tfinished, used {:.2}ms, fired {} times, avg fire use {}us",
self.info,
self.exec_st.get() as f64 / 1000.0,
self.fire_times,
self.exec_st.get() / self.fire_times
);
}
}

fn fire_inner(&mut self) -> Result<(), JobExecError> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,10 @@ impl<D: Data, T: Debug + Send + 'static> Task for Worker<D, T> {
Ok(state) => {
if TaskState::Finished == state {
info_worker!(
"job({}) '{}' finished, used {:?};",
"job({}) '{}' finished, used {:?} ms;",
self.id.job_id,
self.conf.job_name,
self.start.elapsed()
self.start.elapsed().as_millis()
)
}
state
Expand Down

0 comments on commit 28809dd

Please sign in to comment.