Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion mdbook/src/chapter_2/chapter_2_4.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ fn main() {
source(scope, "Source", |capability, info| {

// Acquire a re-activator for this operator.
use timely::scheduling::Scheduler;
let activator = scope.activator_for(info.address);

let mut cap = Some(capability);
Expand Down
24 changes: 12 additions & 12 deletions timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
use crate::communication::{Push, Pull};
use crate::dataflow::channels::Message;
use crate::logging::TimelyLogger as Logger;
use crate::worker::AsWorker;
use crate::worker::Worker;

/// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors.
pub trait ParallelizationContract<T, C> {
Expand All @@ -24,7 +24,7 @@ pub trait ParallelizationContract<T, C> {
/// Type implementing `Pull` produced by this pact.
type Puller: Pull<Message<T, C>>+'static;
/// Allocates a matched pair of push and pull endpoints implementing the pact.
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
fn connect(self, worker: &Worker, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
}

/// A direct connection
Expand All @@ -34,10 +34,10 @@ pub struct Pipeline;
impl<T: 'static, C: Accountable + 'static> ParallelizationContract<T, C> for Pipeline {
type Pusher = LogPusher<ThreadPusher<Message<T, C>>>;
type Puller = LogPuller<ThreadPuller<Message<T, C>>>;
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (pusher, puller) = allocator.pipeline::<Message<T, C>>(identifier, address);
(LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()),
LogPuller::new(puller, allocator.index(), identifier, logging))
fn connect(self, worker: &Worker, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (pusher, puller) = worker.pipeline::<Message<T, C>>(identifier, address);
(LogPusher::new(pusher, worker.index(), worker.index(), identifier, logging.clone()),
LogPuller::new(puller, worker.index(), identifier, logging))
}
}

Expand Down Expand Up @@ -92,7 +92,7 @@ mod distributor {
use crate::dataflow::channels::{ContainerBytes, Message};
use crate::logging::TimelyLogger;
use crate::progress::Timestamp;
use crate::worker::AsWorker;
use crate::worker::Worker;

use super::{ParallelizationContract, LogPusher, LogPuller};

Expand All @@ -112,11 +112,11 @@ mod distributor {
{
type Pusher = Exchange<T, LogPusher<Box<dyn Push<Message<T, C>>>>, D>;
type Puller = LogPuller<Box<dyn Pull<Message<T, C>>>>;
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<TimelyLogger>) -> (Self::Pusher, Self::Puller) {
let (senders, receiver) = allocator.allocate::<Message<T, C>>(identifier, address);
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
let distributor = (self.0)(allocator.peers());
(Exchange::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
fn connect(self, worker: &Worker, identifier: usize, address: Rc<[usize]>, logging: Option<TimelyLogger>) -> (Self::Pusher, Self::Puller) {
let (senders, receiver) = worker.allocate::<Message<T, C>>(identifier, address);
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, worker.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
let distributor = (self.0)(worker.peers());
(Exchange::new(senders, distributor), LogPuller::new(receiver, worker.index(), identifier, logging.clone()))
}
}
}
Expand Down
9 changes: 4 additions & 5 deletions timely/src/dataflow/operators/core/capture/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
//! than that in which the stream was captured.

use crate::dataflow::{Scope, Stream};
use crate::scheduling::Scheduler;
use crate::dataflow::channels::pushers::Counter as PushCounter;
use crate::dataflow::operators::generic::builder_raw::OperatorBuilder;
use crate::progress::Timestamp;
Expand All @@ -52,25 +51,25 @@ use crate::dataflow::channels::Message;
/// Replay a capture stream into a scope with the same timestamp.
pub trait Replay<T: Timestamp, C> : Sized {
/// Replays `self` into the provided scope, as a `Stream<'scope, T, C>`.
fn replay_into<'scope>(self, scope: &mut Scope<'scope, T>) -> Stream<'scope, T, C> {
fn replay_into<'scope>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, C> {
self.replay_core(scope, Some(std::time::Duration::new(0, 0)))
}
/// Replays `self` into the provided scope, as a `Stream<'scope, T, C>`.
///
/// The `period` argument allows the specification of a re-activation period, where the operator
/// will re-activate itself every so often. The `None` argument instructs the operator not to
/// re-activate itself.
fn replay_core<'scope>(self, scope: &mut Scope<'scope, T>, period: Option<std::time::Duration>) -> Stream<'scope, T, C>;
fn replay_core<'scope>(self, scope: Scope<'scope, T>, period: Option<std::time::Duration>) -> Stream<'scope, T, C>;
}

impl<T: Timestamp, C: Container+Clone, I> Replay<T, C> for I
where
I : IntoIterator,
<I as IntoIterator>::Item: EventIterator<T, C>+'static,
{
fn replay_core<'scope>(self, scope: &mut Scope<'scope, T>, period: Option<std::time::Duration>) -> Stream<'scope, T, C>{
fn replay_core<'scope>(self, scope: Scope<'scope, T>, period: Option<std::time::Duration>) -> Stream<'scope, T, C>{

let mut builder = OperatorBuilder::new("Replay".to_owned(), scope.clone());
let mut builder = OperatorBuilder::new("Replay".to_owned(), scope);

let address = builder.operator_info().address;
let activator = scope.activator_for(address);
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl<'scope, T: Timestamp> Concatenate<'scope, T> for Scope<'scope, T> {

// create an operator builder.
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
let mut builder = OperatorBuilder::new("Concatenate".to_string(), self.clone());
let mut builder = OperatorBuilder::new("Concatenate".to_string(), *self);

// create new input handles for each input stream.
let mut handles = sources.into_iter().map(|s| builder.new_input(s, Pipeline)).collect::<Vec<_>>();
Expand Down
31 changes: 10 additions & 21 deletions timely/src/dataflow/operators/core/enterleave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use crate::{Accountable, Container};
use crate::communication::Push;
use crate::dataflow::channels::pushers::{Counter, Tee};
use crate::dataflow::channels::Message;
use crate::worker::AsWorker;
use crate::dataflow::{Stream, Scope};

/// Extension trait to move a `Stream` into a child of its current `Scope`.
Expand All @@ -51,7 +50,7 @@ pub trait Enter<'outer, TOuter: Timestamp, TInner: Timestamp+Refines<TOuter>, C>
/// });
/// });
/// ```
fn enter<'inner>(self, inner: &Scope<'inner, TInner>) -> Stream<'inner, TInner, C>;
fn enter<'inner>(self, inner: Scope<'inner, TInner>) -> Stream<'inner, TInner, C>;
}

impl<'outer, TOuter, TInner, C> Enter<'outer, TOuter, TInner, C> for Stream<'outer, TOuter, C>
Expand All @@ -60,9 +59,7 @@ where
TInner: Timestamp + Refines<TOuter>,
C: Container,
{
fn enter<'inner>(self, inner: &Scope<'inner, TInner>) -> Stream<'inner, TInner, C> {

use crate::scheduling::Scheduler;
fn enter<'inner>(self, inner: Scope<'inner, TInner>) -> Stream<'inner, TInner, C> {

// Validate that `inner` is a child of `self`'s scope.
let inner_addr = inner.addr();
Expand All @@ -85,20 +82,16 @@ where
};
let produced = Rc::clone(ingress.targets.produced());
let input = inner.subgraph.borrow_mut().new_input(produced);
let channel_id = inner.clone().new_identifier();
let channel_id = inner.worker().new_identifier();

if let Some(logger) = inner.logging() {
if let Some(logger) = inner.worker().logging() {
let pusher = LogPusher::new(ingress, channel_id, inner.index(), logger);
self.connect_to(input, pusher, channel_id);
} else {
self.connect_to(input, ingress, channel_id);
}

Stream::new(
Source::new(0, input.port),
registrar,
inner.clone(),
)
Stream::new(Source::new(0, input.port), registrar, inner)
}
}

Expand All @@ -122,15 +115,15 @@ pub trait Leave<'inner, TInner: Timestamp, C> {
/// });
/// });
/// ```
fn leave<'outer, TOuter: Timestamp>(self, outer: &Scope<'outer, TOuter>) -> Stream<'outer, TOuter, C> where TInner: Refines<TOuter>;
fn leave<'outer, TOuter: Timestamp>(self, outer: Scope<'outer, TOuter>) -> Stream<'outer, TOuter, C> where TInner: Refines<TOuter>;
}

impl<'inner, TInner, C> Leave<'inner, TInner, C> for Stream<'inner, TInner, C>
where
TInner: Timestamp,
C: Container,
{
fn leave<'outer, TOuter: Timestamp>(self, outer: &Scope<'outer, TOuter>) -> Stream<'outer, TOuter, C> where TInner: Refines<TOuter> {
fn leave<'outer, TOuter: Timestamp>(self, outer: Scope<'outer, TOuter>) -> Stream<'outer, TOuter, C> where TInner: Refines<TOuter> {

let scope = self.scope();

Expand All @@ -150,20 +143,16 @@ where
let target = Target::new(0, output.port);
let (targets, registrar) = Tee::<TOuter, C>::new();
let egress = EgressNub { targets, phantom: PhantomData };
let channel_id = scope.clone().new_identifier();
let channel_id = scope.worker().new_identifier();

if let Some(logger) = scope.logging() {
if let Some(logger) = scope.worker().logging() {
let pusher = LogPusher::new(egress, channel_id, scope.index(), logger);
self.connect_to(target, pusher, channel_id);
} else {
self.connect_to(target, egress, channel_id);
}

Stream::new(
output,
registrar,
outer.clone(),
)
Stream::new(output, registrar, outer)
}
}

Expand Down
10 changes: 5 additions & 5 deletions timely/src/dataflow/operators/core/feedback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub trait Feedback<'scope, T: Timestamp> {
/// .connect_loop(handle);
/// });
/// ```
fn feedback<C: Container>(&mut self, summary: <T as Timestamp>::Summary) -> (Handle<'scope, T, C>, Stream<'scope, T, C>);
fn feedback<C: Container>(&self, summary: <T as Timestamp>::Summary) -> (Handle<'scope, T, C>, Stream<'scope, T, C>);
}

/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`.
Expand Down Expand Up @@ -65,22 +65,22 @@ pub trait LoopVariable<'scope, TOuter: Timestamp, TInner: Timestamp> {
/// });
/// });
/// ```
fn loop_variable<C: Container>(&mut self, summary: TInner::Summary) -> (Handle<'scope, Product<TOuter, TInner>, C>, Stream<'scope, Product<TOuter, TInner>, C>);
fn loop_variable<C: Container>(&self, summary: TInner::Summary) -> (Handle<'scope, Product<TOuter, TInner>, C>, Stream<'scope, Product<TOuter, TInner>, C>);
}

impl<'scope, T: Timestamp> Feedback<'scope, T> for Scope<'scope, T> {

fn feedback<C: Container>(&mut self, summary: <T as Timestamp>::Summary) -> (Handle<'scope, T, C>, Stream<'scope, T, C>) {
fn feedback<C: Container>(&self, summary: <T as Timestamp>::Summary) -> (Handle<'scope, T, C>, Stream<'scope, T, C>) {

let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone());
let mut builder = OperatorBuilder::new("Feedback".to_owned(), *self);
let (output, stream) = builder.new_output();

(Handle { builder, summary, output }, stream)
}
}

impl<'scope, TOuter: Timestamp, TInner: Timestamp> LoopVariable<'scope, TOuter, TInner> for Iterative<'scope, TOuter, TInner> {
fn loop_variable<C: Container>(&mut self, summary: TInner::Summary) -> (Handle<'scope, Product<TOuter, TInner>, C>, Stream<'scope, Product<TOuter, TInner>, C>) {
fn loop_variable<C: Container>(&self, summary: TInner::Summary) -> (Handle<'scope, Product<TOuter, TInner>, C>, Stream<'scope, Product<TOuter, TInner>, C>) {
self.feedback(Product::new(Default::default(), summary))
}
}
Expand Down
18 changes: 8 additions & 10 deletions timely/src/dataflow/operators/core/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ use std::rc::Rc;
use std::cell::RefCell;

use crate::container::{CapacityContainerBuilder, PushInto};
use crate::scheduling::Scheduler;

use crate::scheduling::{Schedule, Activator};

use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch};
Expand Down Expand Up @@ -63,7 +61,7 @@ pub trait Input<'scope> {
/// }
/// });
/// ```
fn new_input<C: Container+Clone>(&mut self) -> (Handle<Self::Timestamp, CapacityContainerBuilder<C>>, Stream<'scope, Self::Timestamp, C>);
fn new_input<C: Container+Clone>(&self) -> (Handle<Self::Timestamp, CapacityContainerBuilder<C>>, Stream<'scope, Self::Timestamp, C>);

/// Create a new [Stream] and [Handle] through which to supply input.
///
Expand Down Expand Up @@ -100,7 +98,7 @@ pub trait Input<'scope> {
/// }
/// });
/// ```
fn new_input_with_builder<CB: ContainerBuilder<Container: Clone>>(&mut self) -> (Handle<Self::Timestamp, CB>, Stream<'scope, Self::Timestamp, CB::Container>);
fn new_input_with_builder<CB: ContainerBuilder<Container: Clone>>(&self) -> (Handle<Self::Timestamp, CB>, Stream<'scope, Self::Timestamp, CB::Container>);

/// Create a new stream from a supplied interactive handle.
///
Expand Down Expand Up @@ -133,25 +131,25 @@ pub trait Input<'scope> {
/// }
/// });
/// ```
fn input_from<CB: ContainerBuilder<Container: Clone>>(&mut self, handle: &mut Handle<Self::Timestamp, CB>) -> Stream<'scope, Self::Timestamp, CB::Container>;
fn input_from<CB: ContainerBuilder<Container: Clone>>(&self, handle: &mut Handle<Self::Timestamp, CB>) -> Stream<'scope, Self::Timestamp, CB::Container>;
}

use crate::order::TotalOrder;
impl<'scope, T: Timestamp + TotalOrder> Input<'scope> for Scope<'scope, T> {
type Timestamp = T;
fn new_input<C: Container+Clone>(&mut self) -> (Handle<T, CapacityContainerBuilder<C>>, Stream<'scope, T, C>) {
fn new_input<C: Container+Clone>(&self) -> (Handle<T, CapacityContainerBuilder<C>>, Stream<'scope, T, C>) {
let mut handle = Handle::new();
let stream = self.input_from(&mut handle);
(handle, stream)
}

fn new_input_with_builder<CB: ContainerBuilder<Container: Clone>>(&mut self) -> (Handle<T, CB>, Stream<'scope, T, CB::Container>) {
fn new_input_with_builder<CB: ContainerBuilder<Container: Clone>>(&self) -> (Handle<T, CB>, Stream<'scope, T, CB::Container>) {
let mut handle = Handle::new_with_builder();
let stream = self.input_from(&mut handle);
(handle, stream)
}

fn input_from<CB: ContainerBuilder<Container: Clone>>(&mut self, handle: &mut Handle<T, CB>) -> Stream<'scope, T, CB::Container> {
fn input_from<CB: ContainerBuilder<Container: Clone>>(&self, handle: &mut Handle<T, CB>) -> Stream<'scope, T, CB::Container> {
let (output, registrar) = Tee::<T, CB::Container>::new();
let counter = Counter::new(output);
let produced = Rc::clone(counter.produced());
Expand All @@ -177,7 +175,7 @@ impl<'scope, T: Timestamp + TotalOrder> Input<'scope> for Scope<'scope, T> {
copies,
}));

Stream::new(Source::new(index, 0), registrar, self.clone())
Stream::new(Source::new(index, 0), registrar, *self)
}
}

Expand Down Expand Up @@ -338,7 +336,7 @@ impl<T: Timestamp, CB: ContainerBuilder<Container: Clone>> Handle<T, CB> {
/// }
/// });
/// ```
pub fn to_stream<'scope>(&mut self, scope: &mut Scope<'scope, T>) -> Stream<'scope, T, CB::Container>
pub fn to_stream<'scope>(&mut self, scope: Scope<'scope, T>) -> Stream<'scope, T, CB::Container>
where
T: TotalOrder,
{
Expand Down
9 changes: 4 additions & 5 deletions timely/src/dataflow/operators/core/to_stream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Conversion to the `Stream` type from iterators.

use crate::container::{CapacityContainerBuilder, SizableContainer, PushInto};
use crate::scheduling::Scheduler;
use crate::progress::Timestamp;
use crate::{Container, ContainerBuilder};
use crate::dataflow::operators::generic::operator::source;
Expand Down Expand Up @@ -30,11 +29,11 @@ pub trait ToStreamBuilder<CB: ContainerBuilder> {
///
/// assert_eq!(data1.extract(), data2.extract());
/// ```
fn to_stream_with_builder<'scope, T: Timestamp>(self, scope: &mut Scope<'scope, T>) -> Stream<'scope, T, CB::Container>;
fn to_stream_with_builder<'scope, T: Timestamp>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, CB::Container>;
}

impl<CB: ContainerBuilder, I: IntoIterator+'static> ToStreamBuilder<CB> for I where CB: PushInto<I::Item> {
fn to_stream_with_builder<'scope, T: Timestamp>(self, scope: &mut Scope<'scope, T>) -> Stream<'scope, T, CB::Container> {
fn to_stream_with_builder<'scope, T: Timestamp>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, CB::Container> {

source::<_, CB, _, _>(scope, "ToStreamBuilder", |capability, info| {

Expand Down Expand Up @@ -80,11 +79,11 @@ pub trait ToStream<C> {
///
/// assert_eq!(data1.extract(), data2.extract());
/// ```
fn to_stream<'scope, T: Timestamp>(self, scope: &mut Scope<'scope, T>) -> Stream<'scope, T, C>;
fn to_stream<'scope, T: Timestamp>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, C>;
}

impl<C: Container + SizableContainer, I: IntoIterator+'static> ToStream<C> for I where C: PushInto<I::Item> {
fn to_stream<'scope, T: Timestamp>(self, scope: &mut Scope<'scope, T>) -> Stream<'scope, T, C> {
fn to_stream<'scope, T: Timestamp>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, C> {
ToStreamBuilder::<CapacityContainerBuilder<C>>::to_stream_with_builder(self, scope)
}
}
8 changes: 3 additions & 5 deletions timely/src/dataflow/operators/core/unordered_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ use std::rc::Rc;
use std::cell::RefCell;

use crate::ContainerBuilder;
use crate::scheduling::Scheduler;

use crate::scheduling::{Schedule, ActivateOnDrop};

use crate::progress::{Operate, operate::SharedProgress, Timestamp};
Expand Down Expand Up @@ -75,11 +73,11 @@ pub trait UnorderedInput<'scope, T: Timestamp> {
/// assert_eq!(extract[i], (i, vec![i]));
/// }
/// ```
fn new_unordered_input<CB: ContainerBuilder>(&mut self) -> ((UnorderedHandle<T, CB>, ActivateCapability<T>), Stream<'scope, T, CB::Container>);
fn new_unordered_input<CB: ContainerBuilder>(&self) -> ((UnorderedHandle<T, CB>, ActivateCapability<T>), Stream<'scope, T, CB::Container>);
}

impl<'scope, T: Timestamp> UnorderedInput<'scope, T> for Scope<'scope, T> {
fn new_unordered_input<CB: ContainerBuilder>(&mut self) -> ((UnorderedHandle<T, CB>, ActivateCapability<T>), Stream<'scope, T, CB::Container>) {
fn new_unordered_input<CB: ContainerBuilder>(&self) -> ((UnorderedHandle<T, CB>, ActivateCapability<T>), Stream<'scope, T, CB::Container>) {

let (output, registrar) = Tee::<T, CB::Container>::new();
let internal = Rc::new(RefCell::new(ChangeBatch::new()));
Expand Down Expand Up @@ -107,7 +105,7 @@ impl<'scope, T: Timestamp> UnorderedInput<'scope, T> for Scope<'scope, T> {
peers,
}));

((helper, cap), Stream::new(Source::new(index, 0), registrar, self.clone()))
((helper, cap), Stream::new(Source::new(index, 0), registrar, *self))
}
}

Expand Down
Loading
Loading