From 6db0fbf22d7a3ff525af17fd7818ff6917493606 Mon Sep 17 00:00:00 2001 From: decovas Date: Fri, 22 Nov 2019 13:38:56 +0100 Subject: [PATCH 1/4] Use timely's logging infrastructure to log Tracker state Add TrackerEvent which records additons or removals of capabilities, as well as propagation events when chanages in implications are propagated along the internal connections and edges of the graph. Add DebugEvent which records the state of pointstamps, implications, and worklist of Tracker. Enabling loggers for these events is done the same way as for logging::TimelyEvent's. --- logging/src/lib.rs | 5 + timely/src/logging.rs | 2 + timely/src/progress/frontier.rs | 7 + timely/src/progress/reachability.rs | 226 ++++++++++++++++++++++++++-- timely/src/progress/subgraph.rs | 23 +-- timely/src/worker.rs | 5 +- 6 files changed, 248 insertions(+), 20 deletions(-) diff --git a/logging/src/lib.rs b/logging/src/lib.rs index f2ba8be5d..0561bdc64 100644 --- a/logging/src/lib.rs +++ b/logging/src/lib.rs @@ -118,6 +118,11 @@ impl Logger { } } + // Returns Logger.id + pub fn id(&self) -> E { + self.id.clone() + } + /// Logs an event. /// /// The event has its timestamp recorded at the moment of logging, but it may be delayed diff --git a/timely/src/logging.rs b/timely/src/logging.rs index ac0316304..e1e0c98e9 100644 --- a/timely/src/logging.rs +++ b/timely/src/logging.rs @@ -53,6 +53,8 @@ pub struct OperatesEvent { pub id: usize, /// Sequence of nested scope identifiers indicating the path from the root to this instance. pub addr: Vec, + /// Internal summary for every combination of input and output port. + pub internal_summaries: Vec>, /// A helpful name. pub name: String, } diff --git a/timely/src/progress/frontier.rs b/timely/src/progress/frontier.rs index b1f6c20fc..721c75328 100644 --- a/timely/src/progress/frontier.rs +++ b/timely/src/progress/frontier.rs @@ -317,6 +317,13 @@ impl MutableAntichain { self.frontier().less_equal(time) } + /// Clones the vector of updates. + /// Only used for debugging purposes. + #[inline] + pub fn updates(&self) -> Vec<(T, i64)> { + self.updates.clone() + } + /// Allows a single-element push, but dirties the antichain and prevents inspection until cleaned. /// /// At the moment inspection is prevented via panic, so best be careful (this should probably be fixed). diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index d9d301f38..7c0af8548 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -14,7 +14,7 @@ //! use timely::progress::reachability::{Builder, Tracker}; //! //! // allocate a new empty topology builder. -//! let mut builder = Builder::::new(); +//! let mut builder = Builder::::new(vec![]); //! //! // Each node with one input connected to one output. //! builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]); @@ -110,7 +110,7 @@ use crate::progress::timestamp::PathSummary; /// use timely::progress::reachability::Builder; /// /// // allocate a new empty topology builder. -/// let mut builder = Builder::::new(); +/// let mut builder = Builder::::new(vec![]); /// /// // Each node with one input connected to one output. /// builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]); @@ -127,6 +127,8 @@ use crate::progress::timestamp::PathSummary; /// ``` #[derive(Clone, Debug)] pub struct Builder { + /// Path of identifiers from the root. + path: Vec, /// Internal connections within hosted operators. /// /// Indexed by operator index, then input port, then output port. This is the @@ -145,8 +147,9 @@ pub struct Builder { impl Builder { /// Create a new empty topology builder. - pub fn new() -> Self { + pub fn new(path: Vec) -> Self { Builder { + path: path, nodes: Vec::new(), edges: Vec::new(), shape: Vec::new(), @@ -200,7 +203,7 @@ impl Builder { println!("{:?}", self); } - Tracker::allocate_from(self) + Tracker::allocate_from(self, self.path.clone()) } /// Tests whether the graph a cycle of default path summaries. @@ -219,7 +222,7 @@ impl Builder { /// use timely::progress::reachability::Builder; /// /// // allocate a new empty topology builder. - /// let mut builder = Builder::::new(); + /// let mut builder = Builder::::new(vec![]); /// /// // Each node with one input connected to one output. /// builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]); @@ -247,7 +250,7 @@ impl Builder { /// use timely::progress::reachability::Builder; /// /// // allocate a new empty topology builder. - /// let mut builder = Builder::::new(); + /// let mut builder = Builder::::new(vec![]); /// /// // Two inputs and outputs, only one of which advances. /// builder.add_node(0, 2, 2, vec![ @@ -347,6 +350,8 @@ impl Builder { /// pointstamp changes at various node input and output ports. These changes may /// alter the potential pointstamps that could arrive at downstream input ports. pub struct Tracker { + /// Path of identifiers from the root. + pub path: Vec, /// Internal connections within hosted operators. /// @@ -394,6 +399,17 @@ pub struct Tracker { /// always be exactly equal to the sum across all operators of the frontier sizes /// of the target and source `pointstamps` member. total_counts: i64, + + /// A timely logging handle for progress tracking events such as updating source and + /// target capabilities and propagation of implication frontiers between sources and targets. + pub tracker_logger: Option>, + + /// A timely logging handle for the state of progress tracking computation. + /// + /// Debug events include pointstamps and implications at every location and + /// self.worklist. If tracker_logger is enabled, there is exactly one TrackerEvent + /// for every DebugEvent. + pub debug_logger: Option>, } /// Target and source information for each operator. @@ -464,11 +480,37 @@ impl Tracker { /// Updates the count for a time at a target (operator input, scope output). #[inline] pub fn update_target(&mut self, target: Target, time: T, value: i64) { + if let Some(logger) = self.tracker_logger.as_ref() { + logger.log(UpdateTargetEvent { + worker_id: logger.id(), + scope_addr: self.path.clone(), + operator: target.node, + port: target.port, + timestamp: format!("{:?}", time.clone()), + delta: value, + }); + } + if let Some(logger) = self.debug_logger.as_ref() { + self.print_trace(logger); + } self.target_changes.update((target, time), value); } /// Updates the count for a time at a source (operator output, scope input). #[inline] pub fn update_source(&mut self, source: Source, time: T, value: i64) { + if let Some(logger) = self.tracker_logger.as_ref() { + logger.log(UpdateSourceEvent { + worker_id: logger.id(), + scope_addr: self.path.clone(), + operator: source.node, + port: source.port, + timestamp: format!("{:?}", time.clone()), + delta: value, + }); + } + if let Some(logger) = self.debug_logger.as_ref() { + self.print_trace(logger); + } self.source_changes.update((source, time), value); } @@ -483,7 +525,7 @@ impl Tracker { /// /// The result is a pair of tracker, and the summaries from each input port to each /// output port. - pub fn allocate_from(builder: &Builder) -> (Self, Vec>>) { + pub fn allocate_from(builder: &Builder, path: Vec) -> (Self, Vec>>) { // Allocate buffer space for each input and input port. let mut per_operator = @@ -526,6 +568,7 @@ impl Tracker { let tracker = Tracker { + path: path.clone(), nodes: builder.nodes.clone(), edges: builder.edges.clone(), per_operator, @@ -535,11 +578,84 @@ impl Tracker { pushed_changes: ChangeBatch::new(), output_changes, total_counts: 0, + tracker_logger: None, + debug_logger: None, }; (tracker, builder_summary) } + #[inline(always)] + fn print_trace(&self, logger: &crate::logging::Logger) { + let ports = self.per_operator.iter().enumerate().flat_map(|(op_n, per_op)| { + let mut ports = per_op.targets.iter().enumerate().map(|(tg_n, target)| { + DebugEventPort { + location: Location { + node: op_n, + port: Port::Target(tg_n), + }, + pointstamps: target.pointstamps.updates().iter().map(|(t, d)| (format!("{:?}", t), *d)).collect(), + implications: target.implications.updates().iter().map(|(t, d)| (format!("{:?}", t), *d)).collect(), + worklist: self.worklist.iter() + .filter(|Reverse((_,loc,_))| loc.node == op_n && loc.port == Port::Target(tg_n)) + .map(|Reverse((t,_,d))| (format!("{:?}", t), *d)).collect() + } + }).collect::>(); + ports.extend(per_op.sources.iter().enumerate().map(|(sc_n, source)| { + DebugEventPort { + location: Location { + node: op_n, + port: Port::Source(sc_n), + }, + pointstamps: source.pointstamps.updates().iter().map(|(t, d)| (format!("{:?}", t), *d)).collect(), + implications: source.implications.updates().iter().map(|(t, d)| (format!("{:?}", t), *d)).collect(), + worklist: self.worklist.iter() + .filter(|Reverse((_,loc,_))| loc.node == op_n && loc.port == Port::Source(sc_n)) + .map(|Reverse((t,_,d))| (format!("{:?}", t), *d)).collect() + } + })); + ports.into_iter() + }).collect::>(); + logger.log(DebugEvent { + worker_id: logger.id(), + scope_addr: self.path.clone(), + ports, + }); + } + + #[inline(always)] + fn log_propagate_target(&self, op: usize, port: usize, time: T) { + if let Some(logger) = self.tracker_logger.as_ref() { + logger.log(PropagateInternalEvent { + worker_id: logger.id(), + scope_addr: self.path.clone(), + operator: op, + port: port, + timestamp: format!("{:?}", time), + }); + } + if let Some(logger) = self.debug_logger.as_ref() { + self.print_trace(logger); + } + } + + #[inline(always)] + fn log_propagate_source(&self, op: usize, port: usize, time: T) { + if let Some(logger) = self.tracker_logger.as_ref() { + logger.log(PropagateEdgeEvent { + worker_id: logger.id(), + scope_addr: self.path.clone(), + operator: op, + port: port, + timestamp: format!("{:?}", time), + }); + } + if let Some(logger) = self.debug_logger.as_ref() { + self.print_trace(logger); + } + } + + /// Propagates all pending updates. /// /// The method drains `self.input_changes` and circulates their implications @@ -597,7 +713,6 @@ impl Tracker { // will discover zero-change times when we first visit them, as no further // changes can be made to them once we complete them. while let Some(Reverse((time, location, mut diff))) = self.worklist.pop() { - // Drain and accumulate all updates that have the same time and location. while self.worklist.peek().map(|x| ((x.0).0 == time) && ((x.0).1 == location)).unwrap_or(false) { diff += (self.worklist.pop().unwrap().0).2; @@ -615,7 +730,7 @@ impl Tracker { self.per_operator[location.node] .targets[port_index] .implications - .update_iter(Some((time, diff))); + .update_iter(Some((time.clone(), diff))); for (time, diff) in changes { let nodes = &self.nodes[location.node][port_index]; @@ -629,6 +744,7 @@ impl Tracker { } self.pushed_changes.update((location, time), diff); } + self.log_propagate_target(location.node, port_index, time); } // Update to an operator output. // Propagate any changes forward along outgoing edges. @@ -638,7 +754,7 @@ impl Tracker { self.per_operator[location.node] .sources[port_index] .implications - .update_iter(Some((time, diff))); + .update_iter(Some((time.clone(), diff))); for (time, diff) in changes { for new_target in self.edges[location.node][port_index].iter() { @@ -650,10 +766,12 @@ impl Tracker { } self.pushed_changes.update((location, time), diff); } + self.log_propagate_source(location.node, port_index, time); }, }; } } + } /// Implications of maintained capabilities projected to each output. @@ -775,3 +893,91 @@ fn summarize_outputs( results } + +#[derive(Serialize, Deserialize, Abomonation, Debug, Clone)] +/// Port information in a tracker log event +pub struct DebugEventPort { + location: Location, + pointstamps: Vec<(String, i64)>, + implications: Vec<(String, i64)>, + worklist: Vec<(String, i64)>, +} + +#[derive(Serialize, Deserialize, Abomonation, Debug, Clone)] +/// Log event +pub struct DebugEvent { + worker_id: usize, + scope_addr: Vec, + ports: Vec, +} + +#[derive(Serialize, Deserialize, Abomonation, Debug, Clone)] +/// Log event +pub struct UpdateSourceEvent { + worker_id: usize, + scope_addr: Vec, + operator: usize, + port: usize, + timestamp: String, + delta: i64, +} + +#[derive(Serialize, Deserialize, Abomonation, Debug, Clone)] +/// Log event +pub struct UpdateTargetEvent { + worker_id: usize, + scope_addr: Vec, + operator: usize, + port: usize, + timestamp: String, + delta: i64, +} + +#[derive(Serialize, Deserialize, Abomonation, Debug, Clone)] +/// Log event +pub struct PropagateEdgeEvent { + worker_id: usize, + scope_addr: Vec, + operator: usize, + port: usize, + timestamp: String, +} + +#[derive(Serialize, Deserialize, Abomonation, Debug, Clone)] +/// Log event +pub struct PropagateInternalEvent { + worker_id: usize, + scope_addr: Vec, + operator: usize, + port: usize, + timestamp: String +} + +#[derive(Serialize, Deserialize, Abomonation, Debug, Clone)] +/// An event to track progress propagation in timely.Antichain +pub enum TrackerEvent { + /// Change pointstamp multiplicity at source. + UpdateSource(UpdateSourceEvent), + /// Change pointstamp multiplicity at target. + UpdateTarget(UpdateTargetEvent), + /// Propagation along en extrenal edge, i.e. from source to connected target. + PropagateEdge(PropagateEdgeEvent), + /// Propagation of implications inside an oeprator, from taregt to connected sources. + PropagateInternal(PropagateInternalEvent), +} + +impl From for TrackerEvent { + fn from(v: UpdateSourceEvent) -> TrackerEvent { TrackerEvent::UpdateSource(v) } +} + +impl From for TrackerEvent { + fn from(v: UpdateTargetEvent) -> TrackerEvent { TrackerEvent::UpdateTarget(v) } +} + +impl From for TrackerEvent { + fn from(v: PropagateEdgeEvent) -> TrackerEvent { TrackerEvent::PropagateEdge(v) } +} + +impl From for TrackerEvent { + fn from(v: PropagateInternalEvent) -> TrackerEvent { TrackerEvent::PropagateInternal(v) } +} \ No newline at end of file diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 0a4582dba..820f65b16 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -123,16 +123,20 @@ where /// Adds a new child to the subgraph. pub fn add_child(&mut self, child: Box>, index: usize, identifier: usize) { + let child_name = child.name().to_owned(); + let per_op_state = PerOperatorState::new(child, index, self.path.clone(), identifier, self.logging.clone()); { let mut child_path = self.path.clone(); child_path.push(index); self.logging.as_mut().map(|l| l.log(crate::logging::OperatesEvent { id: identifier, addr: child_path, - name: child.name().to_owned(), + internal_summaries: per_op_state.internal_summary.iter() + .map(|x| x.iter().map(|x| format!("{:?}", x.elements())).collect()).collect(), + name: child_name, })); } - self.children.push(PerOperatorState::new(child, index, self.path.clone(), identifier, self.logging.clone())) + self.children.push(per_op_state) } /// Now that initialization is complete, actually build a subgraph. @@ -152,7 +156,7 @@ where // Create empty child zero represenative. self.children[0] = PerOperatorState::empty(outputs, inputs); - let mut builder = reachability::Builder::new(); + let mut builder = reachability::Builder::new(self.path.clone()); // Child 0 has `inputs` outputs and `outputs` inputs, not yet connected. builder.add_node(0, outputs, inputs, vec![vec![Antichain::new(); inputs]; outputs]); @@ -165,7 +169,9 @@ where builder.add_edge(source, target); } - let (tracker, scope_summary) = builder.build(); + let (mut tracker, scope_summary) = builder.build(); + tracker.tracker_logger = worker.log_register().get("timely/tracker"); + tracker.debug_logger = worker.log_register().get("timely/debug"); let progcaster = Progcaster::new(worker, &self.path, self.logging.clone()); @@ -510,11 +516,6 @@ where // produces connectivity summaries from inputs to outputs, and reports initial internal // capabilities on each of the outputs (projecting capabilities from contained scopes). fn get_internal_summary(&mut self) -> (Vec>>, Rc>>) { - - // double-check that child 0 (the outside world) is correctly shaped. - assert_eq!(self.children[0].outputs, self.inputs()); - assert_eq!(self.children[0].inputs, self.outputs()); - let mut internal_summary = vec![vec![Antichain::new(); self.outputs()]; self.inputs()]; for input in 0 .. self.scope_summary.len() { for output in 0 .. self.scope_summary[input].len() { @@ -524,6 +525,10 @@ where } } + // double-check that child 0 (the outside world) is correctly shaped. + assert_eq!(self.children[0].outputs, self.inputs()); + assert_eq!(self.children[0].inputs, self.outputs()); + // Each child has expressed initial capabilities (their `shared_progress.internals`). // We introduce these into the progress tracker to determine the scope's initial // internal capabilities. diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 99ac2a324..0a3a055e3 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -419,15 +419,18 @@ impl Worker { let mut operator = subscope.into_inner().build(self); + let (internal_summary, _) = operator.get_internal_summary(); + logging.as_mut().map(|l| l.log(crate::logging::OperatesEvent { id: identifier, addr: operator.path().to_vec(), + internal_summaries: internal_summary + .iter().map(|x| x.iter().map(|x| format!("{:?}", x.elements())).collect()).collect(), name: operator.name().to_string(), })); logging.as_mut().map(|l| l.flush()); - operator.get_internal_summary(); operator.set_external_summary(); let mut temp_channel_ids = self.temp_channel_ids.borrow_mut(); From 9d6612192c594a99aa4e83cb95c006f2df90433c Mon Sep 17 00:00:00 2001 From: decovas Date: Mon, 9 Mar 2020 18:17:07 +0100 Subject: [PATCH 2/4] Reply to comments --- timely/src/progress/frontier.rs | 7 -- timely/src/progress/reachability.rs | 180 +++++----------------------- timely/src/progress/subgraph.rs | 7 +- 3 files changed, 33 insertions(+), 161 deletions(-) diff --git a/timely/src/progress/frontier.rs b/timely/src/progress/frontier.rs index 721c75328..b1f6c20fc 100644 --- a/timely/src/progress/frontier.rs +++ b/timely/src/progress/frontier.rs @@ -317,13 +317,6 @@ impl MutableAntichain { self.frontier().less_equal(time) } - /// Clones the vector of updates. - /// Only used for debugging purposes. - #[inline] - pub fn updates(&self) -> Vec<(T, i64)> { - self.updates.clone() - } - /// Allows a single-element push, but dirties the antichain and prevents inspection until cleaned. /// /// At the moment inspection is prevented via panic, so best be careful (this should probably be fixed). diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 7c0af8548..406959f88 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -14,7 +14,7 @@ //! use timely::progress::reachability::{Builder, Tracker}; //! //! // allocate a new empty topology builder. -//! let mut builder = Builder::::new(vec![]); +//! let mut builder = Builder::::new(); //! //! // Each node with one input connected to one output. //! builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]); @@ -110,7 +110,7 @@ use crate::progress::timestamp::PathSummary; /// use timely::progress::reachability::Builder; /// /// // allocate a new empty topology builder. -/// let mut builder = Builder::::new(vec![]); +/// let mut builder = Builder::::new(); /// /// // Each node with one input connected to one output. /// builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]); @@ -127,8 +127,6 @@ use crate::progress::timestamp::PathSummary; /// ``` #[derive(Clone, Debug)] pub struct Builder { - /// Path of identifiers from the root. - path: Vec, /// Internal connections within hosted operators. /// /// Indexed by operator index, then input port, then output port. This is the @@ -147,9 +145,8 @@ pub struct Builder { impl Builder { /// Create a new empty topology builder. - pub fn new(path: Vec) -> Self { + pub fn new() -> Self { Builder { - path: path, nodes: Vec::new(), edges: Vec::new(), shape: Vec::new(), @@ -203,7 +200,7 @@ impl Builder { println!("{:?}", self); } - Tracker::allocate_from(self, self.path.clone()) + Tracker::allocate_from(self) } /// Tests whether the graph a cycle of default path summaries. @@ -222,7 +219,7 @@ impl Builder { /// use timely::progress::reachability::Builder; /// /// // allocate a new empty topology builder. - /// let mut builder = Builder::::new(vec![]); + /// let mut builder = Builder::::new(); /// /// // Each node with one input connected to one output. /// builder.add_node(0, 1, 1, vec![vec![Antichain::from_elem(0)]]); @@ -250,7 +247,7 @@ impl Builder { /// use timely::progress::reachability::Builder; /// /// // allocate a new empty topology builder. - /// let mut builder = Builder::::new(vec![]); + /// let mut builder = Builder::::new(); /// /// // Two inputs and outputs, only one of which advances. /// builder.add_node(0, 2, 2, vec![ @@ -350,9 +347,6 @@ impl Builder { /// pointstamp changes at various node input and output ports. These changes may /// alter the potential pointstamps that could arrive at downstream input ports. pub struct Tracker { - /// Path of identifiers from the root. - pub path: Vec, - /// Internal connections within hosted operators. /// /// Indexed by operator index, then input port, then output port. This is the @@ -400,16 +394,11 @@ pub struct Tracker { /// of the target and source `pointstamps` member. total_counts: i64, - /// A timely logging handle for progress tracking events such as updating source and - /// target capabilities and propagation of implication frontiers between sources and targets. - pub tracker_logger: Option>, - - /// A timely logging handle for the state of progress tracking computation. + /// A tuple of path of scope identifiers from root, and a timely logging handle. /// - /// Debug events include pointstamps and implications at every location and - /// self.worklist. If tracker_logger is enabled, there is exactly one TrackerEvent - /// for every DebugEvent. - pub debug_logger: Option>, + /// The handle logs progress tracking events such as updating source and target + /// capabilities as well as the start of progress tracking computation. + pub tracker_logger: Option<(Vec, crate::logging::Logger)>, } /// Target and source information for each operator. @@ -480,37 +469,31 @@ impl Tracker { /// Updates the count for a time at a target (operator input, scope output). #[inline] pub fn update_target(&mut self, target: Target, time: T, value: i64) { - if let Some(logger) = self.tracker_logger.as_ref() { + if let Some((path, logger)) = self.tracker_logger.as_ref() { logger.log(UpdateTargetEvent { worker_id: logger.id(), - scope_addr: self.path.clone(), + scope_addr: path.clone(), operator: target.node, port: target.port, timestamp: format!("{:?}", time.clone()), delta: value, }); } - if let Some(logger) = self.debug_logger.as_ref() { - self.print_trace(logger); - } self.target_changes.update((target, time), value); } /// Updates the count for a time at a source (operator output, scope input). #[inline] pub fn update_source(&mut self, source: Source, time: T, value: i64) { - if let Some(logger) = self.tracker_logger.as_ref() { + if let Some((path, logger)) = self.tracker_logger.as_ref() { logger.log(UpdateSourceEvent { worker_id: logger.id(), - scope_addr: self.path.clone(), + scope_addr: path.clone(), operator: source.node, port: source.port, timestamp: format!("{:?}", time.clone()), delta: value, }); } - if let Some(logger) = self.debug_logger.as_ref() { - self.print_trace(logger); - } self.source_changes.update((source, time), value); } @@ -525,7 +508,7 @@ impl Tracker { /// /// The result is a pair of tracker, and the summaries from each input port to each /// output port. - pub fn allocate_from(builder: &Builder, path: Vec) -> (Self, Vec>>) { + pub fn allocate_from(builder: &Builder) -> (Self, Vec>>) { // Allocate buffer space for each input and input port. let mut per_operator = @@ -568,7 +551,6 @@ impl Tracker { let tracker = Tracker { - path: path.clone(), nodes: builder.nodes.clone(), edges: builder.edges.clone(), per_operator, @@ -579,88 +561,22 @@ impl Tracker { output_changes, total_counts: 0, tracker_logger: None, - debug_logger: None, }; (tracker, builder_summary) } - #[inline(always)] - fn print_trace(&self, logger: &crate::logging::Logger) { - let ports = self.per_operator.iter().enumerate().flat_map(|(op_n, per_op)| { - let mut ports = per_op.targets.iter().enumerate().map(|(tg_n, target)| { - DebugEventPort { - location: Location { - node: op_n, - port: Port::Target(tg_n), - }, - pointstamps: target.pointstamps.updates().iter().map(|(t, d)| (format!("{:?}", t), *d)).collect(), - implications: target.implications.updates().iter().map(|(t, d)| (format!("{:?}", t), *d)).collect(), - worklist: self.worklist.iter() - .filter(|Reverse((_,loc,_))| loc.node == op_n && loc.port == Port::Target(tg_n)) - .map(|Reverse((t,_,d))| (format!("{:?}", t), *d)).collect() - } - }).collect::>(); - ports.extend(per_op.sources.iter().enumerate().map(|(sc_n, source)| { - DebugEventPort { - location: Location { - node: op_n, - port: Port::Source(sc_n), - }, - pointstamps: source.pointstamps.updates().iter().map(|(t, d)| (format!("{:?}", t), *d)).collect(), - implications: source.implications.updates().iter().map(|(t, d)| (format!("{:?}", t), *d)).collect(), - worklist: self.worklist.iter() - .filter(|Reverse((_,loc,_))| loc.node == op_n && loc.port == Port::Source(sc_n)) - .map(|Reverse((t,_,d))| (format!("{:?}", t), *d)).collect() - } - })); - ports.into_iter() - }).collect::>(); - logger.log(DebugEvent { - worker_id: logger.id(), - scope_addr: self.path.clone(), - ports, - }); - } - - #[inline(always)] - fn log_propagate_target(&self, op: usize, port: usize, time: T) { - if let Some(logger) = self.tracker_logger.as_ref() { - logger.log(PropagateInternalEvent { - worker_id: logger.id(), - scope_addr: self.path.clone(), - operator: op, - port: port, - timestamp: format!("{:?}", time), - }); - } - if let Some(logger) = self.debug_logger.as_ref() { - self.print_trace(logger); - } - } - - #[inline(always)] - fn log_propagate_source(&self, op: usize, port: usize, time: T) { - if let Some(logger) = self.tracker_logger.as_ref() { - logger.log(PropagateEdgeEvent { - worker_id: logger.id(), - scope_addr: self.path.clone(), - operator: op, - port: port, - timestamp: format!("{:?}", time), - }); - } - if let Some(logger) = self.debug_logger.as_ref() { - self.print_trace(logger); - } - } - - /// Propagates all pending updates. /// /// The method drains `self.input_changes` and circulates their implications /// until we cease deriving new implications. pub fn propagate_all(&mut self) { + if let Some((path, logger)) = self.tracker_logger.as_ref() { + logger.log(StartPropagateEvent { + worker_id: logger.id(), + scope_addr: path.clone(), + }); + } // Step 1: Drain `self.input_changes` and determine actual frontier changes. // @@ -730,7 +646,7 @@ impl Tracker { self.per_operator[location.node] .targets[port_index] .implications - .update_iter(Some((time.clone(), diff))); + .update_iter(Some((time, diff))); for (time, diff) in changes { let nodes = &self.nodes[location.node][port_index]; @@ -744,7 +660,6 @@ impl Tracker { } self.pushed_changes.update((location, time), diff); } - self.log_propagate_target(location.node, port_index, time); } // Update to an operator output. // Propagate any changes forward along outgoing edges. @@ -754,7 +669,7 @@ impl Tracker { self.per_operator[location.node] .sources[port_index] .implications - .update_iter(Some((time.clone(), diff))); + .update_iter(Some((time, diff))); for (time, diff) in changes { for new_target in self.edges[location.node][port_index].iter() { @@ -766,7 +681,6 @@ impl Tracker { } self.pushed_changes.update((location, time), diff); } - self.log_propagate_source(location.node, port_index, time); }, }; } @@ -894,23 +808,6 @@ fn summarize_outputs( results } -#[derive(Serialize, Deserialize, Abomonation, Debug, Clone)] -/// Port information in a tracker log event -pub struct DebugEventPort { - location: Location, - pointstamps: Vec<(String, i64)>, - implications: Vec<(String, i64)>, - worklist: Vec<(String, i64)>, -} - -#[derive(Serialize, Deserialize, Abomonation, Debug, Clone)] -/// Log event -pub struct DebugEvent { - worker_id: usize, - scope_addr: Vec, - ports: Vec, -} - #[derive(Serialize, Deserialize, Abomonation, Debug, Clone)] /// Log event pub struct UpdateSourceEvent { @@ -935,35 +832,20 @@ pub struct UpdateTargetEvent { #[derive(Serialize, Deserialize, Abomonation, Debug, Clone)] /// Log event -pub struct PropagateEdgeEvent { - worker_id: usize, - scope_addr: Vec, - operator: usize, - port: usize, - timestamp: String, -} - -#[derive(Serialize, Deserialize, Abomonation, Debug, Clone)] -/// Log event -pub struct PropagateInternalEvent { +pub struct StartPropagateEvent { worker_id: usize, scope_addr: Vec, - operator: usize, - port: usize, - timestamp: String } #[derive(Serialize, Deserialize, Abomonation, Debug, Clone)] -/// An event to track progress propagation in timely.Antichain +/// An event to track progress propagation in timely. pub enum TrackerEvent { /// Change pointstamp multiplicity at source. UpdateSource(UpdateSourceEvent), /// Change pointstamp multiplicity at target. UpdateTarget(UpdateTargetEvent), - /// Propagation along en extrenal edge, i.e. from source to connected target. - PropagateEdge(PropagateEdgeEvent), - /// Propagation of implications inside an oeprator, from taregt to connected sources. - PropagateInternal(PropagateInternalEvent), + /// Start progress tracking propagation subroutine. + StartPropagate(StartPropagateEvent), } impl From for TrackerEvent { @@ -974,10 +856,6 @@ impl From for TrackerEvent { fn from(v: UpdateTargetEvent) -> TrackerEvent { TrackerEvent::UpdateTarget(v) } } -impl From for TrackerEvent { - fn from(v: PropagateEdgeEvent) -> TrackerEvent { TrackerEvent::PropagateEdge(v) } -} - -impl From for TrackerEvent { - fn from(v: PropagateInternalEvent) -> TrackerEvent { TrackerEvent::PropagateInternal(v) } +impl From for TrackerEvent { + fn from(v: StartPropagateEvent) -> TrackerEvent { TrackerEvent::StartPropagate(v)} } \ No newline at end of file diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 820f65b16..637edf024 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -156,7 +156,7 @@ where // Create empty child zero represenative. self.children[0] = PerOperatorState::empty(outputs, inputs); - let mut builder = reachability::Builder::new(self.path.clone()); + let mut builder = reachability::Builder::new(); // Child 0 has `inputs` outputs and `outputs` inputs, not yet connected. builder.add_node(0, outputs, inputs, vec![vec![Antichain::new(); inputs]; outputs]); @@ -170,8 +170,9 @@ where } let (mut tracker, scope_summary) = builder.build(); - tracker.tracker_logger = worker.log_register().get("timely/tracker"); - tracker.debug_logger = worker.log_register().get("timely/debug"); + + let path = self.path.clone(); + tracker.tracker_logger = worker.log_register().get("timely/tracker").map(|logger| (path, logger)); let progcaster = Progcaster::new(worker, &self.path, self.logging.clone()); From 9617929c90f951d741cfb5544849b041ce8ac99c Mon Sep 17 00:00:00 2001 From: decovas Date: Wed, 11 Mar 2020 23:17:51 +0100 Subject: [PATCH 3/4] Make TrackerEvent fields public. --- timely/src/progress/reachability.rs | 42 +++++++++++++++++++---------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 406959f88..ac3846ae8 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -811,30 +811,44 @@ fn summarize_outputs( #[derive(Serialize, Deserialize, Abomonation, Debug, Clone)] /// Log event pub struct UpdateSourceEvent { - worker_id: usize, - scope_addr: Vec, - operator: usize, - port: usize, - timestamp: String, - delta: i64, + /// Worker index. + pub worker_id: usize, + /// Sequence of nested scope identifiers indicating the path from the root to this scope. + pub scope_addr: Vec, + /// Operator index. + pub operator: usize, + /// Port index. + pub port: usize, + /// Timestamp for which capability was added or removed. + pub timestamp: String, + /// Multiplicity of the capability to added or removed. + pub delta: i64, } #[derive(Serialize, Deserialize, Abomonation, Debug, Clone)] /// Log event pub struct UpdateTargetEvent { - worker_id: usize, - scope_addr: Vec, - operator: usize, - port: usize, - timestamp: String, - delta: i64, + /// Worker index. + pub worker_id: usize, + /// Sequence of nested scope identifiers indicating the path from the root to this scope. + pub scope_addr: Vec, + /// Operator index. + pub operator: usize, + /// Port index. + pub port: usize, + /// Timestamp for which capability was added or removed. + pub timestamp: String, + /// Multiplicity of the capability to be added or removed. + pub delta: i64, } #[derive(Serialize, Deserialize, Abomonation, Debug, Clone)] /// Log event pub struct StartPropagateEvent { - worker_id: usize, - scope_addr: Vec, + /// Worker index. + pub worker_id: usize, + /// Sequence of nested scope identifiers indicating the path from the root to this scope. + pub scope_addr: Vec, } #[derive(Serialize, Deserialize, Abomonation, Debug, Clone)] From 46337454a8fbf45f07afc0143c27bb74c32150bc Mon Sep 17 00:00:00 2001 From: decovas Date: Mon, 16 Mar 2020 16:15:06 +0100 Subject: [PATCH 4/4] Add SubgraphEvent to TimelyEvents Loggs an event whenever a new instance of a Subgraph is created. --- timely/src/logging.rs | 17 +++++++++++++++++ timely/src/progress/subgraph.rs | 11 ++++++++++- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/timely/src/logging.rs b/timely/src/logging.rs index e1e0c98e9..1dce49f3c 100644 --- a/timely/src/logging.rs +++ b/timely/src/logging.rs @@ -46,6 +46,17 @@ impl Drop for BatchLogger where P: EventPusher, + /// Type of the subscope Timestamp. + pub timestamp_type: String, +} + #[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)] /// The creation of an `Operate` implementor. pub struct OperatesEvent { @@ -223,6 +234,8 @@ impl ParkEvent { #[derive(Serialize, Deserialize, Debug, Clone, Abomonation, Hash, Eq, PartialEq, Ord, PartialOrd)] /// An event in a timely worker pub enum TimelyEvent { + /// Subgraph creation. + Subgraph(SubgraphEvent), /// Operator creation. Operates(OperatesEvent), /// Channel creation. @@ -253,6 +266,10 @@ pub enum TimelyEvent { Text(String), } +impl From for TimelyEvent { + fn from(v: SubgraphEvent) -> TimelyEvent { TimelyEvent::Subgraph(v) } +} + impl From for TimelyEvent { fn from(v: OperatesEvent) -> TimelyEvent { TimelyEvent::Operates(v) } } diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 637edf024..b9918225c 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -172,7 +172,16 @@ where let (mut tracker, scope_summary) = builder.build(); let path = self.path.clone(); - tracker.tracker_logger = worker.log_register().get("timely/tracker").map(|logger| (path, logger)); + tracker.tracker_logger = worker.log_register().get("timely/tracker").map(|logger| (path.clone(), logger)); + + // Perhaps log information about the creation of subgraph. + if let Some(l) = self.logging.as_mut() { + l.log(crate::logging::SubgraphEvent{ + id: worker.index(), + addr: path.clone(), + timestamp_type: std::any::type_name::().to_string(), + }); + } let progcaster = Progcaster::new(worker, &self.path, self.logging.clone());