Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use timely's logging infrastructure to log Tracker state #321

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions logging/src/lib.rs
Expand Up @@ -118,6 +118,11 @@ impl<T, E: Clone> Logger<T, E> {
}
}

// Returns Logger.id
pub fn id(&self) -> E {
self.id.clone()
}

/// Logs an event.
///
/// The event has its timestamp recorded at the moment of logging, but it may be delayed
Expand Down
19 changes: 19 additions & 0 deletions timely/src/logging.rs
Expand Up @@ -46,13 +46,26 @@ impl<T, E, P> Drop for BatchLogger<T, E, P> where P: EventPusher<Duration, (Dura
}
}

#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
/// The creation of a `Subgraph`.
pub struct SubgraphEvent {
/// Worker-unique identifier for the operator.
pub id: usize,
/// Sequence of nested scope identifiers indicating the path from the root to this instance.
pub addr: Vec<usize>,
/// Type of the subscope Timestamp.
pub timestamp_type: String,
}

#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
/// The creation of an `Operate` implementor.
pub struct OperatesEvent {
/// Worker-unique identifier for the operator.
pub id: usize,
/// Sequence of nested scope identifiers indicating the path from the root to this instance.
pub addr: Vec<usize>,
/// Internal summary for every combination of input and output port.
pub internal_summaries: Vec<Vec<String>>,
Comment on lines +67 to +68
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain what these are used for? Would it be equally beneficial to have the report from the tracker about its input-to-output summaries? That would leave this event stable and consolidate the timestamp/summary related events to the reachability tracker.

/// A helpful name.
pub name: String,
}
Expand Down Expand Up @@ -221,6 +234,8 @@ impl ParkEvent {
#[derive(Serialize, Deserialize, Debug, Clone, Abomonation, Hash, Eq, PartialEq, Ord, PartialOrd)]
/// An event in a timely worker
pub enum TimelyEvent {
/// Subgraph creation.
Subgraph(SubgraphEvent),
/// Operator creation.
Operates(OperatesEvent),
/// Channel creation.
Expand Down Expand Up @@ -251,6 +266,10 @@ pub enum TimelyEvent {
Text(String),
}

impl From<SubgraphEvent> for TimelyEvent {
fn from(v: SubgraphEvent) -> TimelyEvent { TimelyEvent::Subgraph(v) }
}

impl From<OperatesEvent> for TimelyEvent {
fn from(v: OperatesEvent) -> TimelyEvent { TimelyEvent::Operates(v) }
}
Expand Down
102 changes: 100 additions & 2 deletions timely/src/progress/reachability.rs
Expand Up @@ -347,7 +347,6 @@ impl<T: Timestamp> Builder<T> {
/// pointstamp changes at various node input and output ports. These changes may
/// alter the potential pointstamps that could arrive at downstream input ports.
pub struct Tracker<T:Timestamp> {

/// Internal connections within hosted operators.
///
/// Indexed by operator index, then input port, then output port. This is the
Expand Down Expand Up @@ -394,6 +393,12 @@ pub struct Tracker<T:Timestamp> {
/// always be exactly equal to the sum across all operators of the frontier sizes
/// of the target and source `pointstamps` member.
total_counts: i64,

/// A tuple of path of scope identifiers from root, and a timely logging handle.
///
/// The handle logs progress tracking events such as updating source and target
/// capabilities as well as the start of progress tracking computation.
pub tracker_logger: Option<(Vec<usize>, crate::logging::Logger<TrackerEvent>)>,
}

/// Target and source information for each operator.
Expand Down Expand Up @@ -464,11 +469,31 @@ impl<T:Timestamp> Tracker<T> {
/// Updates the count for a time at a target (operator input, scope output).
#[inline]
pub fn update_target(&mut self, target: Target, time: T, value: i64) {
if let Some((path, logger)) = self.tracker_logger.as_ref() {
logger.log(UpdateTargetEvent {
worker_id: logger.id(),
scope_addr: path.clone(),
operator: target.node,
port: target.port,
timestamp: format!("{:?}", time.clone()),
delta: value,
});
}
self.target_changes.update((target, time), value);
}
/// Updates the count for a time at a source (operator output, scope input).
#[inline]
pub fn update_source(&mut self, source: Source, time: T, value: i64) {
if let Some((path, logger)) = self.tracker_logger.as_ref() {
logger.log(UpdateSourceEvent {
worker_id: logger.id(),
scope_addr: path.clone(),
operator: source.node,
port: source.port,
timestamp: format!("{:?}", time.clone()),
delta: value,
});
}
self.source_changes.update((source, time), value);
}

Expand Down Expand Up @@ -535,6 +560,7 @@ impl<T:Timestamp> Tracker<T> {
pushed_changes: ChangeBatch::new(),
output_changes,
total_counts: 0,
tracker_logger: None,
};

(tracker, builder_summary)
Expand All @@ -545,6 +571,12 @@ impl<T:Timestamp> Tracker<T> {
/// The method drains `self.input_changes` and circulates their implications
/// until we cease deriving new implications.
pub fn propagate_all(&mut self) {
if let Some((path, logger)) = self.tracker_logger.as_ref() {
logger.log(StartPropagateEvent {
worker_id: logger.id(),
scope_addr: path.clone(),
});
}

// Step 1: Drain `self.input_changes` and determine actual frontier changes.
//
Expand Down Expand Up @@ -597,7 +629,6 @@ impl<T:Timestamp> Tracker<T> {
// will discover zero-change times when we first visit them, as no further
// changes can be made to them once we complete them.
while let Some(Reverse((time, location, mut diff))) = self.worklist.pop() {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

random whitespace

// Drain and accumulate all updates that have the same time and location.
while self.worklist.peek().map(|x| ((x.0).0 == time) && ((x.0).1 == location)).unwrap_or(false) {
diff += (self.worklist.pop().unwrap().0).2;
Expand Down Expand Up @@ -654,6 +685,7 @@ impl<T:Timestamp> Tracker<T> {
};
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

random whitespace

}

/// Implications of maintained capabilities projected to each output.
Expand Down Expand Up @@ -775,3 +807,69 @@ fn summarize_outputs<T: Timestamp>(

results
}

#[derive(Serialize, Deserialize, Abomonation, Debug, Clone)]
/// Log event
pub struct UpdateSourceEvent {
/// Worker index.
pub worker_id: usize,
/// Sequence of nested scope identifiers indicating the path from the root to this scope.
pub scope_addr: Vec<usize>,
/// Operator index.
pub operator: usize,
/// Port index.
pub port: usize,
/// Timestamp for which capability was added or removed.
pub timestamp: String,
/// Multiplicity of the capability to added or removed.
pub delta: i64,
}

#[derive(Serialize, Deserialize, Abomonation, Debug, Clone)]
/// Log event
pub struct UpdateTargetEvent {
/// Worker index.
pub worker_id: usize,
/// Sequence of nested scope identifiers indicating the path from the root to this scope.
pub scope_addr: Vec<usize>,
/// Operator index.
pub operator: usize,
/// Port index.
pub port: usize,
/// Timestamp for which capability was added or removed.
pub timestamp: String,
/// Multiplicity of the capability to be added or removed.
pub delta: i64,
}

#[derive(Serialize, Deserialize, Abomonation, Debug, Clone)]
/// Log event
pub struct StartPropagateEvent {
/// Worker index.
pub worker_id: usize,
/// Sequence of nested scope identifiers indicating the path from the root to this scope.
pub scope_addr: Vec<usize>,
}

#[derive(Serialize, Deserialize, Abomonation, Debug, Clone)]
/// An event to track progress propagation in timely.
pub enum TrackerEvent {
/// Change pointstamp multiplicity at source.
UpdateSource(UpdateSourceEvent),
/// Change pointstamp multiplicity at target.
UpdateTarget(UpdateTargetEvent),
/// Start progress tracking propagation subroutine.
StartPropagate(StartPropagateEvent),
}

impl From<UpdateSourceEvent> for TrackerEvent {
fn from(v: UpdateSourceEvent) -> TrackerEvent { TrackerEvent::UpdateSource(v) }
}

impl From<UpdateTargetEvent> for TrackerEvent {
fn from(v: UpdateTargetEvent) -> TrackerEvent { TrackerEvent::UpdateTarget(v) }
}

impl From<StartPropagateEvent> for TrackerEvent {
fn from(v: StartPropagateEvent) -> TrackerEvent { TrackerEvent::StartPropagate(v)}
}
31 changes: 23 additions & 8 deletions timely/src/progress/subgraph.rs
Expand Up @@ -123,16 +123,20 @@ where

/// Adds a new child to the subgraph.
pub fn add_child(&mut self, child: Box<dyn Operate<TInner>>, index: usize, identifier: usize) {
let child_name = child.name().to_owned();
let per_op_state = PerOperatorState::new(child, index, self.path.clone(), identifier, self.logging.clone());
{
let mut child_path = self.path.clone();
child_path.push(index);
self.logging.as_mut().map(|l| l.log(crate::logging::OperatesEvent {
id: identifier,
addr: child_path,
name: child.name().to_owned(),
internal_summaries: per_op_state.internal_summary.iter()
.map(|x| x.iter().map(|x| format!("{:?}", x.elements())).collect()).collect(),
name: child_name,
}));
}
self.children.push(PerOperatorState::new(child, index, self.path.clone(), identifier, self.logging.clone()))
self.children.push(per_op_state)
}

/// Now that initialization is complete, actually build a subgraph.
Expand Down Expand Up @@ -165,7 +169,19 @@ where
builder.add_edge(source, target);
}

let (tracker, scope_summary) = builder.build();
let (mut tracker, scope_summary) = builder.build();

let path = self.path.clone();
tracker.tracker_logger = worker.log_register().get("timely/tracker").map(|logger| (path.clone(), logger));

// Perhaps log information about the creation of subgraph.
if let Some(l) = self.logging.as_mut() {
l.log(crate::logging::SubgraphEvent{
id: worker.index(),
addr: path.clone(),
timestamp_type: std::any::type_name::<TInner>().to_string(),
});
}
Comment on lines +177 to +184
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love to reframe this as a TrackerEvent and have it be part of the line just up above (i.e. "tracker came in to existence"). I suspect something like Tracker::install_logger(...) could do both of those things and wrap up the abstraction well. I'm happy to do that after the fact if that works for you.


let progcaster = Progcaster::new(worker, &self.path, self.logging.clone());

Expand Down Expand Up @@ -510,11 +526,6 @@ where
// produces connectivity summaries from inputs to outputs, and reports initial internal
// capabilities on each of the outputs (projecting capabilities from contained scopes).
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<TOuter::Summary>>>, Rc<RefCell<SharedProgress<TOuter>>>) {

// double-check that child 0 (the outside world) is correctly shaped.
assert_eq!(self.children[0].outputs, self.inputs());
assert_eq!(self.children[0].inputs, self.outputs());

let mut internal_summary = vec![vec![Antichain::new(); self.outputs()]; self.inputs()];
for input in 0 .. self.scope_summary.len() {
for output in 0 .. self.scope_summary[input].len() {
Expand All @@ -524,6 +535,10 @@ where
}
}

// double-check that child 0 (the outside world) is correctly shaped.
assert_eq!(self.children[0].outputs, self.inputs());
assert_eq!(self.children[0].inputs, self.outputs());

Comment on lines +538 to +541
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why these moved down 20 lines?

// Each child has expressed initial capabilities (their `shared_progress.internals`).
// We introduce these into the progress tracker to determine the scope's initial
// internal capabilities.
Expand Down
5 changes: 4 additions & 1 deletion timely/src/worker.rs
Expand Up @@ -419,15 +419,18 @@ impl<A: Allocate> Worker<A> {

let mut operator = subscope.into_inner().build(self);

let (internal_summary, _) = operator.get_internal_summary();

Comment on lines +422 to +423
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If at all possible, I'd like to keep this next to the set_external_summary() call just to be clear that they are paired. I'm happy to have it hoisted as well.

logging.as_mut().map(|l| l.log(crate::logging::OperatesEvent {
id: identifier,
addr: operator.path().to_vec(),
internal_summaries: internal_summary
.iter().map(|x| x.iter().map(|x| format!("{:?}", x.elements())).collect()).collect(),
name: operator.name().to_string(),
}));

logging.as_mut().map(|l| l.flush());

operator.get_internal_summary();
operator.set_external_summary();

let mut temp_channel_ids = self.temp_channel_ids.borrow_mut();
Expand Down