diff --git a/timely/src/scheduling/activate.rs b/timely/src/scheduling/activate.rs index 9d8b529db..f4adbbd56 100644 --- a/timely/src/scheduling/activate.rs +++ b/timely/src/scheduling/activate.rs @@ -2,15 +2,18 @@ use std::rc::Rc; use std::cell::RefCell; +use std::sync::mpsc::{Sender, Receiver}; +use std::thread::Thread; /// Allocation-free activation tracker. -#[derive(Default)] pub struct Activations { clean: usize, /// `(offset, length)` bounds: Vec<(usize, usize)>, slices: Vec, buffer: Vec, + tx: Sender>, + rx: Receiver>, } impl Activations { @@ -18,12 +21,7 @@ impl Activations { /// Creates a new activation tracker. #[deprecated(since="0.10",note="Type implements Default")] pub fn new() -> Self { - Self { - clean: 0, - bounds: Vec::new(), - slices: Vec::new(), - buffer: Vec::new(), - } + Self::default() } /// Indicates if there are no pending activations. @@ -40,6 +38,10 @@ impl Activations { /// Discards the current active set and presents the next active set. pub fn advance(&mut self) { + while let Ok(path) = self.rx.try_recv() { + self.activate(&path) + } + self.bounds.drain(.. self.clean); { // Scoped, to allow borrow to drop. @@ -94,6 +96,58 @@ impl Activations { } }); } + + /// Constructs a thread-safe `SyncActivations` handle to this activator. + pub fn sync(&self) -> SyncActivations { + SyncActivations { + tx: self.tx.clone(), + thread: std::thread::current(), + } + } +} + +impl Default for Activations { + fn default() -> Self { + let (tx, rx) = std::sync::mpsc::channel(); + Self { + clean: 0, + bounds: Vec::new(), + slices: Vec::new(), + buffer: Vec::new(), + tx, + rx, + } + } +} + +/// A thread-safe handle to an `Activations`. +pub struct SyncActivations { + tx: Sender>, + thread: Thread, +} + +impl SyncActivations { + /// Unparks the task addressed by `path` and unparks the associated worker + /// thread. + pub fn activate(&self, path: Vec) -> Result<(), SyncActivationError> { + self.activate_batch(std::iter::once(path)) + } + + /// Unparks the tasks addressed by `paths` and unparks the associated worker + /// thread. + /// + /// This method can be more efficient than calling `activate` repeatedly, as + /// it only unparks the worker thread after sending all of the activations. + pub fn activate_batch(&self, paths: I) -> Result<(), SyncActivationError> + where + I: IntoIterator> + { + for path in paths.into_iter() { + self.tx.send(path).map_err(|_| SyncActivationError)?; + } + self.thread.unpark(); + Ok(()) + } } /// A capability to activate a specific path. @@ -118,6 +172,44 @@ impl Activator { } } +/// A thread-safe version of `Activator`. +pub struct SyncActivator { + path: Vec, + queue: SyncActivations, +} + +impl SyncActivator { + /// Creates a new thread-safe activation handle. + pub fn new(path: &[usize], queue: SyncActivations) -> Self { + Self { + path: path.to_vec(), + queue, + } + } + + /// Activates the associated path and unparks the associated worker thread. + pub fn activate(&self) -> Result<(), SyncActivationError> { + self.queue.activate(self.path.clone()) + } +} + +/// The error returned when activation fails across thread boundaries because +/// the receiving end has hung up. +#[derive(Debug)] +pub struct SyncActivationError; + +impl std::fmt::Display for SyncActivationError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.write_str("sync activation error in timely") + } +} + +impl std::error::Error for SyncActivationError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + None + } +} + /// A wrapper that unparks on drop. pub struct ActivateOnDrop { wrapped: T, diff --git a/timely/src/scheduling/mod.rs b/timely/src/scheduling/mod.rs index 921871ade..6e28d8144 100644 --- a/timely/src/scheduling/mod.rs +++ b/timely/src/scheduling/mod.rs @@ -5,7 +5,7 @@ use std::cell::RefCell; pub mod activate; -pub use self::activate::{Activations, Activator, ActivateOnDrop}; +pub use self::activate::{Activations, Activator, ActivateOnDrop, SyncActivator}; /// A type that can be scheduled. pub trait Schedule { @@ -24,9 +24,14 @@ pub trait Schedule { pub trait Scheduler { /// Provides a shared handle to the activation scheduler. fn activations(&self) -> Rc>; - /// + /// Constructs an `Activator` tied to the specified operator address. fn activator_for(&self, path: &[usize]) -> Activator { let activations = self.activations().clone(); Activator::new(path, activations) } + /// Constructs a `SyncActivator` tied to the specified operator address. + fn sync_activator_for(&self, path: &[usize]) -> SyncActivator { + let sync_activations = self.activations().borrow().sync(); + SyncActivator::new(path, sync_activations) + } } \ No newline at end of file