diff --git a/logging/src/lib.rs b/logging/src/lib.rs index f2ba8be5d..0561bdc64 100644 --- a/logging/src/lib.rs +++ b/logging/src/lib.rs @@ -118,6 +118,11 @@ impl Logger { } } + // Returns Logger.id + pub fn id(&self) -> E { + self.id.clone() + } + /// Logs an event. /// /// The event has its timestamp recorded at the moment of logging, but it may be delayed diff --git a/timely/src/logging.rs b/timely/src/logging.rs index ac0316304..1dce49f3c 100644 --- a/timely/src/logging.rs +++ b/timely/src/logging.rs @@ -46,6 +46,17 @@ impl Drop for BatchLogger where P: EventPusher, + /// Type of the subscope Timestamp. + pub timestamp_type: String, +} + #[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)] /// The creation of an `Operate` implementor. pub struct OperatesEvent { @@ -53,6 +64,8 @@ pub struct OperatesEvent { pub id: usize, /// Sequence of nested scope identifiers indicating the path from the root to this instance. pub addr: Vec, + /// Internal summary for every combination of input and output port. + pub internal_summaries: Vec>, /// A helpful name. pub name: String, } @@ -221,6 +234,8 @@ impl ParkEvent { #[derive(Serialize, Deserialize, Debug, Clone, Abomonation, Hash, Eq, PartialEq, Ord, PartialOrd)] /// An event in a timely worker pub enum TimelyEvent { + /// Subgraph creation. + Subgraph(SubgraphEvent), /// Operator creation. Operates(OperatesEvent), /// Channel creation. @@ -251,6 +266,10 @@ pub enum TimelyEvent { Text(String), } +impl From for TimelyEvent { + fn from(v: SubgraphEvent) -> TimelyEvent { TimelyEvent::Subgraph(v) } +} + impl From for TimelyEvent { fn from(v: OperatesEvent) -> TimelyEvent { TimelyEvent::Operates(v) } } diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index d9d301f38..ac3846ae8 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -347,7 +347,6 @@ impl Builder { /// pointstamp changes at various node input and output ports. These changes may /// alter the potential pointstamps that could arrive at downstream input ports. pub struct Tracker { - /// Internal connections within hosted operators. /// /// Indexed by operator index, then input port, then output port. This is the @@ -394,6 +393,12 @@ pub struct Tracker { /// always be exactly equal to the sum across all operators of the frontier sizes /// of the target and source `pointstamps` member. total_counts: i64, + + /// A tuple of path of scope identifiers from root, and a timely logging handle. + /// + /// The handle logs progress tracking events such as updating source and target + /// capabilities as well as the start of progress tracking computation. + pub tracker_logger: Option<(Vec, crate::logging::Logger)>, } /// Target and source information for each operator. @@ -464,11 +469,31 @@ impl Tracker { /// Updates the count for a time at a target (operator input, scope output). #[inline] pub fn update_target(&mut self, target: Target, time: T, value: i64) { + if let Some((path, logger)) = self.tracker_logger.as_ref() { + logger.log(UpdateTargetEvent { + worker_id: logger.id(), + scope_addr: path.clone(), + operator: target.node, + port: target.port, + timestamp: format!("{:?}", time.clone()), + delta: value, + }); + } self.target_changes.update((target, time), value); } /// Updates the count for a time at a source (operator output, scope input). #[inline] pub fn update_source(&mut self, source: Source, time: T, value: i64) { + if let Some((path, logger)) = self.tracker_logger.as_ref() { + logger.log(UpdateSourceEvent { + worker_id: logger.id(), + scope_addr: path.clone(), + operator: source.node, + port: source.port, + timestamp: format!("{:?}", time.clone()), + delta: value, + }); + } self.source_changes.update((source, time), value); } @@ -535,6 +560,7 @@ impl Tracker { pushed_changes: ChangeBatch::new(), output_changes, total_counts: 0, + tracker_logger: None, }; (tracker, builder_summary) @@ -545,6 +571,12 @@ impl Tracker { /// The method drains `self.input_changes` and circulates their implications /// until we cease deriving new implications. pub fn propagate_all(&mut self) { + if let Some((path, logger)) = self.tracker_logger.as_ref() { + logger.log(StartPropagateEvent { + worker_id: logger.id(), + scope_addr: path.clone(), + }); + } // Step 1: Drain `self.input_changes` and determine actual frontier changes. // @@ -597,7 +629,6 @@ impl Tracker { // will discover zero-change times when we first visit them, as no further // changes can be made to them once we complete them. while let Some(Reverse((time, location, mut diff))) = self.worklist.pop() { - // Drain and accumulate all updates that have the same time and location. while self.worklist.peek().map(|x| ((x.0).0 == time) && ((x.0).1 == location)).unwrap_or(false) { diff += (self.worklist.pop().unwrap().0).2; @@ -654,6 +685,7 @@ impl Tracker { }; } } + } /// Implications of maintained capabilities projected to each output. @@ -775,3 +807,69 @@ fn summarize_outputs( results } + +#[derive(Serialize, Deserialize, Abomonation, Debug, Clone)] +/// Log event +pub struct UpdateSourceEvent { + /// Worker index. + pub worker_id: usize, + /// Sequence of nested scope identifiers indicating the path from the root to this scope. + pub scope_addr: Vec, + /// Operator index. + pub operator: usize, + /// Port index. + pub port: usize, + /// Timestamp for which capability was added or removed. + pub timestamp: String, + /// Multiplicity of the capability to added or removed. + pub delta: i64, +} + +#[derive(Serialize, Deserialize, Abomonation, Debug, Clone)] +/// Log event +pub struct UpdateTargetEvent { + /// Worker index. + pub worker_id: usize, + /// Sequence of nested scope identifiers indicating the path from the root to this scope. + pub scope_addr: Vec, + /// Operator index. + pub operator: usize, + /// Port index. + pub port: usize, + /// Timestamp for which capability was added or removed. + pub timestamp: String, + /// Multiplicity of the capability to be added or removed. + pub delta: i64, +} + +#[derive(Serialize, Deserialize, Abomonation, Debug, Clone)] +/// Log event +pub struct StartPropagateEvent { + /// Worker index. + pub worker_id: usize, + /// Sequence of nested scope identifiers indicating the path from the root to this scope. + pub scope_addr: Vec, +} + +#[derive(Serialize, Deserialize, Abomonation, Debug, Clone)] +/// An event to track progress propagation in timely. +pub enum TrackerEvent { + /// Change pointstamp multiplicity at source. + UpdateSource(UpdateSourceEvent), + /// Change pointstamp multiplicity at target. + UpdateTarget(UpdateTargetEvent), + /// Start progress tracking propagation subroutine. + StartPropagate(StartPropagateEvent), +} + +impl From for TrackerEvent { + fn from(v: UpdateSourceEvent) -> TrackerEvent { TrackerEvent::UpdateSource(v) } +} + +impl From for TrackerEvent { + fn from(v: UpdateTargetEvent) -> TrackerEvent { TrackerEvent::UpdateTarget(v) } +} + +impl From for TrackerEvent { + fn from(v: StartPropagateEvent) -> TrackerEvent { TrackerEvent::StartPropagate(v)} +} \ No newline at end of file diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 0a4582dba..b9918225c 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -123,16 +123,20 @@ where /// Adds a new child to the subgraph. pub fn add_child(&mut self, child: Box>, index: usize, identifier: usize) { + let child_name = child.name().to_owned(); + let per_op_state = PerOperatorState::new(child, index, self.path.clone(), identifier, self.logging.clone()); { let mut child_path = self.path.clone(); child_path.push(index); self.logging.as_mut().map(|l| l.log(crate::logging::OperatesEvent { id: identifier, addr: child_path, - name: child.name().to_owned(), + internal_summaries: per_op_state.internal_summary.iter() + .map(|x| x.iter().map(|x| format!("{:?}", x.elements())).collect()).collect(), + name: child_name, })); } - self.children.push(PerOperatorState::new(child, index, self.path.clone(), identifier, self.logging.clone())) + self.children.push(per_op_state) } /// Now that initialization is complete, actually build a subgraph. @@ -165,7 +169,19 @@ where builder.add_edge(source, target); } - let (tracker, scope_summary) = builder.build(); + let (mut tracker, scope_summary) = builder.build(); + + let path = self.path.clone(); + tracker.tracker_logger = worker.log_register().get("timely/tracker").map(|logger| (path.clone(), logger)); + + // Perhaps log information about the creation of subgraph. + if let Some(l) = self.logging.as_mut() { + l.log(crate::logging::SubgraphEvent{ + id: worker.index(), + addr: path.clone(), + timestamp_type: std::any::type_name::().to_string(), + }); + } let progcaster = Progcaster::new(worker, &self.path, self.logging.clone()); @@ -510,11 +526,6 @@ where // produces connectivity summaries from inputs to outputs, and reports initial internal // capabilities on each of the outputs (projecting capabilities from contained scopes). fn get_internal_summary(&mut self) -> (Vec>>, Rc>>) { - - // double-check that child 0 (the outside world) is correctly shaped. - assert_eq!(self.children[0].outputs, self.inputs()); - assert_eq!(self.children[0].inputs, self.outputs()); - let mut internal_summary = vec![vec![Antichain::new(); self.outputs()]; self.inputs()]; for input in 0 .. self.scope_summary.len() { for output in 0 .. self.scope_summary[input].len() { @@ -524,6 +535,10 @@ where } } + // double-check that child 0 (the outside world) is correctly shaped. + assert_eq!(self.children[0].outputs, self.inputs()); + assert_eq!(self.children[0].inputs, self.outputs()); + // Each child has expressed initial capabilities (their `shared_progress.internals`). // We introduce these into the progress tracker to determine the scope's initial // internal capabilities. diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 99ac2a324..0a3a055e3 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -419,15 +419,18 @@ impl Worker { let mut operator = subscope.into_inner().build(self); + let (internal_summary, _) = operator.get_internal_summary(); + logging.as_mut().map(|l| l.log(crate::logging::OperatesEvent { id: identifier, addr: operator.path().to_vec(), + internal_summaries: internal_summary + .iter().map(|x| x.iter().map(|x| format!("{:?}", x.elements())).collect()).collect(), name: operator.name().to_string(), })); logging.as_mut().map(|l| l.flush()); - operator.get_internal_summary(); operator.set_external_summary(); let mut temp_channel_ids = self.temp_channel_ids.borrow_mut();