Skip to content
Permalink
Browse files

Mutable Exchange

  • Loading branch information...
frankmcsherry committed May 26, 2019
1 parent 3e0eb5e commit 37474012e1906b901ae333a3d78537448138acf8
Showing with 7 additions and 7 deletions.
  1. +4 −4 timely/src/dataflow/channels/pact.rs
  2. +3 −3 timely/src/dataflow/channels/pushers/exchange.rs
@@ -43,8 +43,8 @@ impl<T: 'static, D: 'static> ParallelizationContract<T, D> for Pipeline {
}

/// An exchange between multiple observers by data
pub struct Exchange<D, F: Fn(&D)->u64+'static> { hash_func: F, phantom: PhantomData<D>, }
impl<D, F: Fn(&D)->u64> Exchange<D, F> {
pub struct Exchange<D, F: FnMut(&D)->u64+'static> { hash_func: F, phantom: PhantomData<D>, }
impl<D, F: FnMut(&D)->u64> Exchange<D, F> {
/// Allocates a new `Exchange` pact from a distribution function.
pub fn new(func: F) -> Exchange<D, F> {
Exchange {
@@ -55,12 +55,12 @@ impl<D, F: Fn(&D)->u64> Exchange<D, F> {
}

// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
impl<T: Eq+Data+Clone, D: Data+Clone, F: Fn(&D)->u64+'static> ParallelizationContract<T, D> for Exchange<D, F> {
impl<T: Eq+Data+Clone, D: Data+Clone, F: FnMut(&D)->u64+'static> ParallelizationContract<T, D> for Exchange<D, F> {
// TODO: The closure in the type prevents us from naming it.
// Could specialize `ExchangePusher` to a time-free version.
type Pusher = Box<Push<Bundle<T, D>>>;
type Puller = Box<Pull<Bundle<T, D>>>;
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
fn connect<A: AsWorker>(mut self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (senders, receiver) = allocator.allocate::<Message<T, D>>(identifier, address);
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
(Box::new(ExchangePusher::new(senders, move |_, d| (self.hash_func)(d))), Box::new(LogPuller::new(receiver, allocator.index(), identifier, logging.clone())))
@@ -6,14 +6,14 @@ use crate::dataflow::channels::{Bundle, Message};

// TODO : Software write combining
/// Distributes records among target pushees according to a distribution function.
pub struct Exchange<T, D, P: Push<Bundle<T, D>>, H: Fn(&T, &D) -> u64> {
pub struct Exchange<T, D, P: Push<Bundle<T, D>>, H: FnMut(&T, &D) -> u64> {
pushers: Vec<P>,
buffers: Vec<Vec<D>>,
current: Option<T>,
hash_func: H,
}

impl<T: Clone, D, P: Push<Bundle<T, D>>, H: Fn(&T, &D)->u64> Exchange<T, D, P, H> {
impl<T: Clone, D, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> Exchange<T, D, P, H> {
/// Allocates a new `Exchange` from a supplied set of pushers and a distribution function.
pub fn new(pushers: Vec<P>, key: H) -> Exchange<T, D, P, H> {
let mut buffers = vec![];
@@ -37,7 +37,7 @@ impl<T: Clone, D, P: Push<Bundle<T, D>>, H: Fn(&T, &D)->u64> Exchange<T, D, P,
}
}

impl<T: Eq+Data, D: Data, P: Push<Bundle<T, D>>, H: Fn(&T, &D)->u64> Push<Bundle<T, D>> for Exchange<T, D, P, H> {
impl<T: Eq+Data, D: Data, P: Push<Bundle<T, D>>, H: FnMut(&T, &D)->u64> Push<Bundle<T, D>> for Exchange<T, D, P, H> {
#[inline(never)]
fn push(&mut self, message: &mut Option<Bundle<T, D>>) {
// if only one pusher, no exchange

0 comments on commit 3747401

Please sign in to comment.
You can’t perform that action at this time.