Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add step_or_park #260

Merged
merged 6 commits into from Apr 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions communication/src/allocator/counters.rs
Expand Up @@ -93,6 +93,7 @@ impl<T, P: Push<T>> Push<T> for ArcPusher<T, P> {
// }
self.events
.send((self.index, Event::Pushed(1)))
// TODO : Perhaps this shouldn't be a fatal error (e.g. in shutdown).
.expect("Failed to send message count");

self.pusher.push(element)
Expand Down
9 changes: 9 additions & 0 deletions communication/src/allocator/mod.rs
Expand Up @@ -2,6 +2,7 @@

use std::rc::Rc;
use std::cell::RefCell;
use std::time::Duration;
use std::collections::VecDeque;

pub use self::thread::Thread;
Expand Down Expand Up @@ -51,6 +52,14 @@ pub trait Allocate {
/// into a performance problem.
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>>;

/// Awaits communication events.
///
/// This method may park the current thread, for at most `duration`,
/// until new events arrive.
/// The method is not guaranteed to wait for any amount of time, but
/// good implementations should use this as a hint to park the thread.
fn await_events(&self, _duration: Option<Duration>) { }

/// Ensure that received messages are surfaced in each channel.
///
/// This method should be called to ensure that received messages are
Expand Down
11 changes: 11 additions & 0 deletions communication/src/allocator/thread.rs
Expand Up @@ -2,6 +2,7 @@

use std::rc::Rc;
use std::cell::RefCell;
use std::time::Duration;
use std::collections::VecDeque;

use crate::allocator::{Allocate, AllocateBuilder, Event};
Expand Down Expand Up @@ -34,6 +35,16 @@ impl Allocate for Thread {
fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> {
&self.events
}
fn await_events(&self, duration: Option<Duration>) {
if self.events.borrow().is_empty() {
if let Some(duration) = duration {
std::thread::park_timeout(duration);
}
else {
std::thread::park();
}
}
}
}

/// Thread-local counting channel push endpoint.
Expand Down
6 changes: 3 additions & 3 deletions timely/src/execute.rs
Expand Up @@ -84,7 +84,7 @@ where
let alloc = crate::communication::allocator::thread::Thread::new();
let mut worker = crate::worker::Worker::new(alloc);
let result = func(&mut worker);
while worker.step() { }
while worker.step_or_park(None) { }
result
}

Expand Down Expand Up @@ -199,7 +199,7 @@ where
}

let result = func(&mut worker);
while worker.step() { }
while worker.step_or_park(None) { }
result
})
}
Expand Down Expand Up @@ -284,7 +284,7 @@ where
initialize_from(builders, others, move |allocator| {
let mut worker = Worker::new(allocator);
let result = func(&mut worker);
while worker.step() { }
while worker.step_or_park(None) { }
result
})
}
5 changes: 5 additions & 0 deletions timely/src/scheduling/activate.rs
Expand Up @@ -24,6 +24,11 @@ impl Activations {
}
}

/// Indicates if there no pending activations.
pub fn is_empty(&self) -> bool {
self.bounds.is_empty()
}

/// Unparks task addressed by `path`.
pub fn activate(&mut self, path: &[usize]) {
self.bounds.push((self.slices.len(), path.len()));
Expand Down
37 changes: 35 additions & 2 deletions timely/src/worker.rs
Expand Up @@ -3,7 +3,7 @@
use std::rc::Rc;
use std::cell::{RefCell, RefMut};
use std::any::Any;
use std::time::Instant;
use std::time::{Instant, Duration};
use std::collections::HashMap;
use std::collections::hash_map::Entry;

Expand Down Expand Up @@ -141,6 +141,34 @@ impl<A: Allocate> Worker<A> {
/// });
/// ```
pub fn step(&mut self) -> bool {
self.step_or_park(Some(Duration::from_secs(0)))
}

/// Performs one step of the computation.
///
/// A step gives each dataflow operator a chance to run, and is the
/// main way to ensure that a computation proceeds. This method may
/// park the thread until there is work to perform, with an optional
/// timeout.
///
/// # Examples
///
/// ```
/// timely::execute_from_args(::std::env::args(), |worker| {
///
/// use std::time::Duration;
/// use timely::dataflow::operators::{ToStream, Inspect};
///
/// worker.dataflow::<usize,_,_>(|scope| {
/// (0 .. 10)
/// .to_stream(scope)
/// .inspect(|x| println!("{:?}", x));
/// });
///
/// worker.step_or_park(Some(Duration::from_secs(1)));
/// });
/// ```
pub fn step_or_park(&mut self, duration: Option<Duration>) -> bool {

{ // Process channel events. Activate responders.
let mut allocator = self.allocator.borrow_mut();
Expand Down Expand Up @@ -168,7 +196,12 @@ impl<A: Allocate> Worker<A> {
.borrow_mut()
.advance();

{ // Schedule active dataflows.
if self.activations.borrow().is_empty() {
self.allocator
.borrow()
.await_events(duration);
}
else { // Schedule active dataflows.

let active_dataflows = &mut self.active_dataflows;
self.activations
Expand Down