Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions timely/examples/logging_replay.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
//! Demonstrates cross-thread capture and replay of timely logging events.
//!
//! A source timely instance (2 workers) runs a simple dataflow and captures its
//! logging events using thread-safe `link_sync::EventLink`s. A sink timely instance
//! (1 worker) replays those events and counts them.

use std::sync::Arc;
use std::time::Duration;

use timely::dataflow::operators::{Exchange, Inspect, ToStream};
use timely::dataflow::operators::capture::event::link_sync::EventLink;
use timely::dataflow::operators::capture::Replay;
use timely::logging::{BatchLogger, TimelyEventBuilder, TimelyEvent};

fn main() {

let source_workers = 2usize;
let sink_workers = 1usize;

// One EventLink per source worker, shared between source (writer) and sink (reader).
let event_links: Vec<_> = (0..source_workers)
.map(|_| Arc::new(EventLink::<Duration, Vec<(Duration, TimelyEvent)>>::new()))
.collect();

// Clone reader handles (they start at the head; the writer will advance past them).
let readers: Vec<_> = event_links.iter().map(Arc::clone).collect();

std::thread::scope(|scope| {

// --- Source instance: 2 workers producing logging events ---
let source = scope.spawn(move || {
timely::execute(timely::Config::process(source_workers), move |worker| {

// Install logging: capture timely events into our shared EventLink.
let link = event_links[worker.index()].clone();
let mut logger = BatchLogger::new(link);
worker.log_register()
.unwrap()
.insert::<TimelyEventBuilder, _>("timely", move |time, data| {
logger.publish_batch(time, data);
});

// A trivial dataflow to generate some logging activity.
worker.dataflow::<u64,_,_>(|scope| {
(0..100u64)
.to_stream(scope)
.container::<Vec<_>>()
.exchange(|&x| x)
.inspect(|_x| { });
});

}).expect("source execution failed");
});

// --- Sink instance: 1 worker replaying the captured logs ---
let sink = scope.spawn(move || {
timely::execute(timely::Config::process(sink_workers), move |worker| {

// Each sink worker replays a disjoint subset of the source streams.
let replayers: Vec<_> = readers.iter().enumerate()
.filter(|(i, _)| i % worker.peers() == worker.index())
.map(|(_, r)| Arc::clone(r))
.collect();

worker.dataflow::<Duration,_,_>(|scope| {
replayers
.replay_into(scope)
.inspect(|event| {
println!(" {:?}", event);
});
});

}).expect("sink execution failed");
});

source.join().expect("source panicked");
sink.join().expect("sink panicked");
});

println!("Done.");
}
86 changes: 86 additions & 0 deletions timely/src/dataflow/operators/core/capture/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,92 @@ pub mod link {
}
}

/// A thread-safe linked-list event pusher and iterator.
pub mod link_sync {

use std::borrow::Cow;
use std::sync::{Arc, Mutex};

use super::{Event, EventPusher, EventIterator};

/// A linked list of Event<T, C> usable across threads.
pub struct EventLink<T, C> {
/// An event, if one exists.
///
/// An event might not exist, if either we want to insert a `None` and have the output iterator pause,
/// or in the case of the very first linked list element, which has no event when constructed.
pub event: Option<Event<T, C>>,
/// The next event, if it exists.
pub next: Mutex<Option<Arc<EventLink<T, C>>>>,
}

impl<T, C> EventLink<T, C> {
/// Allocates a new `EventLink`.
pub fn new() -> EventLink<T, C> {
EventLink { event: None, next: Mutex::new(None) }
}
}

impl<T, C> EventPusher<T, C> for Arc<EventLink<T, C>> {
fn push(&mut self, event: Event<T, C>) {
let mut guard = self.next.lock().unwrap();
*guard = Some(Arc::new(EventLink { event: Some(event), next: Mutex::new(None) }));
let next = Arc::clone(guard.as_ref().unwrap());
drop(guard);
*self = next;
}
}

impl<T: Clone, C: Clone> EventIterator<T, C> for Arc<EventLink<T, C>> {
fn next(&mut self) -> Option<Cow<'_, Event<T, C>>> {
let is_some = self.next.lock().unwrap().is_some();
if is_some {
let next = Arc::clone(self.next.lock().unwrap().as_ref().unwrap());
*self = next;
if let Some(this) = Arc::get_mut(self) {
this.event.take().map(Cow::Owned)
}
else {
self.event.as_ref().map(Cow::Borrowed)
}
}
else {
None
}
}
}

// Drop implementation to prevent stack overflow through naive drop impl.
impl<T, C> Drop for EventLink<T, C> {
fn drop(&mut self) {
while let Some(link) = self.next.get_mut().unwrap().take() {
if let Ok(head) = Arc::try_unwrap(link) {
*self = head;
}
}
}
}

impl<T, C> Default for EventLink<T, C> {
fn default() -> Self {
Self::new()
}
}

#[test]
fn avoid_stack_overflow_in_drop() {
#[cfg(miri)]
let limit = 1_000;
#[cfg(not(miri))]
let limit = 1_000_000;
let mut event1 = Arc::new(EventLink::<(),()>::new());
let _event2 = Arc::clone(&event1);
for _ in 0 .. limit {
event1.push(Event::Progress(vec![]));
}
}
}

/// A binary event pusher and iterator.
pub mod binary {

Expand Down
Loading