Skip to content

Commit

Permalink
Merge pull request TimelyDataflow#287 from benesch/sync-activate
Browse files Browse the repository at this point in the history
Add interfaces for activating across thread boundaries
  • Loading branch information
frankmcsherry committed Aug 22, 2019
2 parents b63bea6 + a8cfadc commit f2b0cc7
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 9 deletions.
106 changes: 99 additions & 7 deletions timely/src/scheduling/activate.rs
Expand Up @@ -2,28 +2,26 @@

use std::rc::Rc;
use std::cell::RefCell;
use std::sync::mpsc::{Sender, Receiver};
use std::thread::Thread;

/// Allocation-free activation tracker.
#[derive(Default)]
pub struct Activations {
clean: usize,
/// `(offset, length)`
bounds: Vec<(usize, usize)>,
slices: Vec<usize>,
buffer: Vec<usize>,
tx: Sender<Vec<usize>>,
rx: Receiver<Vec<usize>>,
}

impl Activations {

/// Creates a new activation tracker.
#[deprecated(since="0.10",note="Type implements Default")]
pub fn new() -> Self {
Self {
clean: 0,
bounds: Vec::new(),
slices: Vec::new(),
buffer: Vec::new(),
}
Self::default()
}

/// Indicates if there are no pending activations.
Expand All @@ -40,6 +38,10 @@ impl Activations {
/// Discards the current active set and presents the next active set.
pub fn advance(&mut self) {

while let Ok(path) = self.rx.try_recv() {
self.activate(&path)
}

self.bounds.drain(.. self.clean);

{ // Scoped, to allow borrow to drop.
Expand Down Expand Up @@ -94,6 +96,58 @@ impl Activations {
}
});
}

/// Constructs a thread-safe `SyncActivations` handle to this activator.
pub fn sync(&self) -> SyncActivations {
SyncActivations {
tx: self.tx.clone(),
thread: std::thread::current(),
}
}
}

impl Default for Activations {
fn default() -> Self {
let (tx, rx) = std::sync::mpsc::channel();
Self {
clean: 0,
bounds: Vec::new(),
slices: Vec::new(),
buffer: Vec::new(),
tx,
rx,
}
}
}

/// A thread-safe handle to an `Activations`.
pub struct SyncActivations {
tx: Sender<Vec<usize>>,
thread: Thread,
}

impl SyncActivations {
/// Unparks the task addressed by `path` and unparks the associated worker
/// thread.
pub fn activate(&self, path: Vec<usize>) -> Result<(), SyncActivationError> {
self.activate_batch(std::iter::once(path))
}

/// Unparks the tasks addressed by `paths` and unparks the associated worker
/// thread.
///
/// This method can be more efficient than calling `activate` repeatedly, as
/// it only unparks the worker thread after sending all of the activations.
pub fn activate_batch<I>(&self, paths: I) -> Result<(), SyncActivationError>
where
I: IntoIterator<Item = Vec<usize>>
{
for path in paths.into_iter() {
self.tx.send(path).map_err(|_| SyncActivationError)?;
}
self.thread.unpark();
Ok(())
}
}

/// A capability to activate a specific path.
Expand All @@ -118,6 +172,44 @@ impl Activator {
}
}

/// A thread-safe version of `Activator`.
pub struct SyncActivator {
path: Vec<usize>,
queue: SyncActivations,
}

impl SyncActivator {
/// Creates a new thread-safe activation handle.
pub fn new(path: &[usize], queue: SyncActivations) -> Self {
Self {
path: path.to_vec(),
queue,
}
}

/// Activates the associated path and unparks the associated worker thread.
pub fn activate(&self) -> Result<(), SyncActivationError> {
self.queue.activate(self.path.clone())
}
}

/// The error returned when activation fails across thread boundaries because
/// the receiving end has hung up.
#[derive(Debug)]
pub struct SyncActivationError;

impl std::fmt::Display for SyncActivationError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.write_str("sync activation error in timely")
}
}

impl std::error::Error for SyncActivationError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
None
}
}

/// A wrapper that unparks on drop.
pub struct ActivateOnDrop<T> {
wrapped: T,
Expand Down
9 changes: 7 additions & 2 deletions timely/src/scheduling/mod.rs
Expand Up @@ -5,7 +5,7 @@ use std::cell::RefCell;

pub mod activate;

pub use self::activate::{Activations, Activator, ActivateOnDrop};
pub use self::activate::{Activations, Activator, ActivateOnDrop, SyncActivator};

/// A type that can be scheduled.
pub trait Schedule {
Expand All @@ -24,9 +24,14 @@ pub trait Schedule {
pub trait Scheduler {
/// Provides a shared handle to the activation scheduler.
fn activations(&self) -> Rc<RefCell<Activations>>;
///
/// Constructs an `Activator` tied to the specified operator address.
fn activator_for(&self, path: &[usize]) -> Activator {
let activations = self.activations().clone();
Activator::new(path, activations)
}
/// Constructs a `SyncActivator` tied to the specified operator address.
fn sync_activator_for(&self, path: &[usize]) -> SyncActivator {
let sync_activations = self.activations().borrow().sync();
SyncActivator::new(path, sync_activations)
}
}

0 comments on commit f2b0cc7

Please sign in to comment.