From adbc4530b92ec0407947ba76688ae438c575c190 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 13 Apr 2026 03:50:19 -0400 Subject: [PATCH 1/9] Remove AsWorker trait in favor of worker() accessor --- timely/src/dataflow/channels/pact.rs | 24 ++-- .../src/dataflow/operators/core/enterleave.rs | 9 +- .../dataflow/operators/generic/builder_raw.rs | 9 +- timely/src/dataflow/scope.rs | 48 +++---- timely/src/dataflow/stream.rs | 3 +- timely/src/progress/broadcast.rs | 2 +- timely/src/progress/subgraph.rs | 3 +- timely/src/worker.rs | 119 ++++++------------ 8 files changed, 79 insertions(+), 138 deletions(-) diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 296e87a8f..0fd8321a3 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -15,7 +15,7 @@ use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; use crate::communication::{Push, Pull}; use crate::dataflow::channels::Message; use crate::logging::TimelyLogger as Logger; -use crate::worker::AsWorker; +use crate::worker::Worker; /// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors. pub trait ParallelizationContract { @@ -24,7 +24,7 @@ pub trait ParallelizationContract { /// Type implementing `Pull` produced by this pact. type Puller: Pull>+'static; /// Allocates a matched pair of push and pull endpoints implementing the pact. - fn connect(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller); + fn connect(self, worker: &Worker, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller); } /// A direct connection @@ -34,10 +34,10 @@ pub struct Pipeline; impl ParallelizationContract for Pipeline { type Pusher = LogPusher>>; type Puller = LogPuller>>; - fn connect(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller) { - let (pusher, puller) = allocator.pipeline::>(identifier, address); - (LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()), - LogPuller::new(puller, allocator.index(), identifier, logging)) + fn connect(self, worker: &Worker, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller) { + let (pusher, puller) = worker.pipeline::>(identifier, address); + (LogPusher::new(pusher, worker.index(), worker.index(), identifier, logging.clone()), + LogPuller::new(puller, worker.index(), identifier, logging)) } } @@ -92,7 +92,7 @@ mod distributor { use crate::dataflow::channels::{ContainerBytes, Message}; use crate::logging::TimelyLogger; use crate::progress::Timestamp; - use crate::worker::AsWorker; + use crate::worker::Worker; use super::{ParallelizationContract, LogPusher, LogPuller}; @@ -112,11 +112,11 @@ mod distributor { { type Pusher = Exchange>>>, D>; type Puller = LogPuller>>>; - fn connect(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller) { - let (senders, receiver) = allocator.allocate::>(identifier, address); - let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); - let distributor = (self.0)(allocator.peers()); - (Exchange::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone())) + fn connect(self, worker: &Worker, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller) { + let (senders, receiver) = worker.allocate::>(identifier, address); + let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, worker.index(), i, identifier, logging.clone())).collect::>(); + let distributor = (self.0)(worker.peers()); + (Exchange::new(senders, distributor), LogPuller::new(receiver, worker.index(), identifier, logging.clone())) } } } diff --git a/timely/src/dataflow/operators/core/enterleave.rs b/timely/src/dataflow/operators/core/enterleave.rs index bc242064b..656244f34 100644 --- a/timely/src/dataflow/operators/core/enterleave.rs +++ b/timely/src/dataflow/operators/core/enterleave.rs @@ -30,7 +30,6 @@ use crate::{Accountable, Container}; use crate::communication::Push; use crate::dataflow::channels::pushers::{Counter, Tee}; use crate::dataflow::channels::Message; -use crate::worker::AsWorker; use crate::dataflow::{Stream, Scope}; /// Extension trait to move a `Stream` into a child of its current `Scope`. @@ -85,9 +84,9 @@ where }; let produced = Rc::clone(ingress.targets.produced()); let input = inner.subgraph.borrow_mut().new_input(produced); - let channel_id = inner.clone().new_identifier(); + let channel_id = inner.worker().new_identifier(); - if let Some(logger) = inner.logging() { + if let Some(logger) = inner.worker().logging() { let pusher = LogPusher::new(ingress, channel_id, inner.index(), logger); self.connect_to(input, pusher, channel_id); } else { @@ -150,9 +149,9 @@ where let target = Target::new(0, output.port); let (targets, registrar) = Tee::::new(); let egress = EgressNub { targets, phantom: PhantomData }; - let channel_id = scope.clone().new_identifier(); + let channel_id = scope.worker().new_identifier(); - if let Some(logger) = scope.logging() { + if let Some(logger) = scope.worker().logging() { let pusher = LogPusher::new(egress, channel_id, scope.index(), logger); self.connect_to(target, pusher, channel_id); } else { diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index 32a19f824..13c2189d5 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -9,7 +9,6 @@ use std::rc::Rc; use std::cell::RefCell; use crate::scheduling::{Schedule, Activations}; -use crate::worker::AsWorker; use crate::scheduling::Scheduler; use crate::progress::{Source, Target}; @@ -63,7 +62,7 @@ pub struct OperatorBuilder<'scope, T: Timestamp> { impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> { /// Allocates a new generic operator builder from its containing scope. - pub fn new(name: String, mut scope: Scope<'scope, T>) -> Self { + pub fn new(name: String, scope: Scope<'scope, T>) -> Self { let slot = scope.reserve_operator(); let address = slot.addr(); @@ -107,9 +106,9 @@ impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> { P: ParallelizationContract, I: IntoIterator::Summary>)>, { - let channel_id = self.scope.new_identifier(); - let logging = self.scope.logging(); - let (sender, receiver) = pact.connect(&mut self.scope, channel_id, Rc::clone(&self.address), logging); + let channel_id = self.scope.worker().new_identifier(); + let logging = self.scope.worker().logging(); + let (sender, receiver) = pact.connect(self.scope.worker(), channel_id, Rc::clone(&self.address), logging); let target = Target::new(self.slot.index(), self.shape.inputs); stream.connect_to(target, sender, channel_id); diff --git a/timely/src/dataflow/scope.rs b/timely/src/dataflow/scope.rs index 169ee648b..16ac3c8c1 100644 --- a/timely/src/dataflow/scope.rs +++ b/timely/src/dataflow/scope.rs @@ -3,8 +3,6 @@ use std::rc::Rc; use std::cell::RefCell; -use crate::communication::{Exchangeable, Push, Pull}; -use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; use crate::scheduling::Scheduler; use crate::scheduling::activate::Activations; use crate::progress::{Timestamp, Operate, Subgraph, SubgraphBuilder}; @@ -13,25 +11,21 @@ use crate::progress::timestamp::Refines; use crate::order::Product; use crate::logging::TimelyLogger as Logger; use crate::logging::TimelyProgressLogger as ProgressLogger; -use crate::worker::{AsWorker, Config, Worker}; +use crate::worker::Worker; /// Type alias for an iterative scope. pub type Iterative<'scope, TOuter, TInner> = Scope<'scope, Product>; -/// A `Scope` wraps a `SubgraphBuilder` and manages the addition -/// of `Operate`s and the connection of edges between them. +/// A `Scope` manages the creation of new dataflow scopes, of operators and edges between them. /// -/// Importantly, this is a *shared* object, backed by `Rc>` wrappers. Each method -/// takes a shared reference, but can be thought of as first calling `.clone()` and then calling the -/// method. Each method does not hold the `RefCell`'s borrow, and should prevent accidental panics. +/// This is a shared object that can be freely cloned. It manages the scope construction through +/// a `RefCell`-wrapped subgraph builder, and all of this types methods use but do not hold write +/// access through the `RefCell`. pub struct Scope<'scope, T: Timestamp> { /// The subgraph under assembly. /// - /// Stored as `Rc>` so that multiple `Scope` clones can share the - /// same subgraph state during construction. The owning `scoped` / `region` / - /// `dataflow` call recovers the inner `SubgraphBuilder` via `Rc::try_unwrap` - /// when the closure returns; if a clone has escaped the closure, this fails - /// loudly with an actionable panic message. + /// Stored as `Rc>` so that multiple `Scope` clones can work on the same subgraph. + /// All methods on this type must release their borrow on this field before returning. pub(crate) subgraph: &'scope RefCell>, /// A copy of the worker hosting this scope. pub(crate) worker: Worker, @@ -42,6 +36,8 @@ pub struct Scope<'scope, T: Timestamp> { } impl<'scope, T: Timestamp> Scope<'scope, T> { + /// Access to the underlying worker. + pub fn worker(&self) -> &Worker { &self.worker } /// This worker's index out of `0 .. self.peers()`. pub fn index(&self) -> usize { self.worker.index() } /// The total number of workers in the computation. @@ -65,9 +61,9 @@ impl<'scope, T: Timestamp> Scope<'scope, T> { /// identifier of the future operator. It must be consumed by [`OperatorSlot::install`] /// before being dropped; otherwise it will panic, since the scope expects every /// reserved slot to eventually be filled. - pub fn reserve_operator(&mut self) -> OperatorSlot<'scope, T> { + pub fn reserve_operator(&self) -> OperatorSlot<'scope, T> { let index = self.subgraph.borrow_mut().allocate_child_id(); - let identifier = self.new_identifier(); + let identifier = self.worker().new_identifier(); OperatorSlot { scope: self.clone(), index, @@ -119,17 +115,17 @@ impl<'scope, T: Timestamp> Scope<'scope, T> { T2: Timestamp + Refines, F: FnOnce(&mut Scope) -> R, { - let mut parent = self.clone(); + let parent = self.clone(); let slot = parent.reserve_operator(); let path = slot.addr(); let identifier = slot.identifier(); let type_name = std::any::type_name::(); - let progress_logging = parent.logger_for(&format!("timely/progress/{type_name}")); - let summary_logging = parent.logger_for(&format!("timely/summary/{type_name}")); + let progress_logging = parent.worker().logger_for(&format!("timely/progress/{type_name}")); + let summary_logging = parent.worker().logger_for(&format!("timely/summary/{type_name}")); let subgraph = RefCell::new(SubgraphBuilder::new_from( - path, identifier, self.logging(), summary_logging, name, + path, identifier, self.worker().logging(), summary_logging, name, )); let mut child = Scope { @@ -141,7 +137,7 @@ impl<'scope, T: Timestamp> Scope<'scope, T> { let result = func(&mut child); drop(child); - let subgraph = subgraph.into_inner().build(&mut parent); + let subgraph = subgraph.into_inner().build(&parent.worker); (result, subgraph, slot) } @@ -234,18 +230,6 @@ impl<'scope, T: Timestamp> Scope<'scope, T> { } } -impl<'scope, T: Timestamp> AsWorker for Scope<'scope, T> { - fn config(&self) -> &Config { self.worker.config() } - fn index(&self) -> usize { self.worker.index() } - fn peers(&self) -> usize { self.worker.peers() } - fn allocate(&mut self, identifier: usize, address: Rc<[usize]>) -> (Vec>>, Box>) { self.worker.allocate(identifier, address) } - fn pipeline(&mut self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher, ThreadPuller) { self.worker.pipeline(identifier, address) } - fn broadcast(&mut self, identifier: usize, address: Rc<[usize]>) -> (Box>, Box>) { self.worker.broadcast(identifier, address) } - fn new_identifier(&mut self) -> usize { self.worker.new_identifier() } - fn peek_identifier(&self) -> usize { self.worker.peek_identifier() } - fn log_register(&self) -> Option<::std::cell::RefMut<'_, crate::logging_core::Registry>> { self.worker.log_register() } -} - impl<'scope, T: Timestamp> Scheduler for Scope<'scope, T> { fn activations(&self) -> Rc> { self.worker.activations() } } diff --git a/timely/src/dataflow/stream.rs b/timely/src/dataflow/stream.rs index 17096a33a..e5c70247f 100644 --- a/timely/src/dataflow/stream.rs +++ b/timely/src/dataflow/stream.rs @@ -10,7 +10,6 @@ use crate::communication::Push; use crate::dataflow::Scope; use crate::dataflow::channels::pushers::tee::TeeHelper; use crate::dataflow::channels::Message; -use crate::worker::AsWorker; use std::fmt::{self, Debug}; /// Abstraction of a stream of `C: Container` records timestamped with `T`. @@ -52,7 +51,7 @@ impl<'scope, T: Timestamp, C> Stream<'scope, T, C> { /// records should actually be sent. The identifier is unique to the edge and is used only for logging purposes. pub fn connect_to>+'static>(self, target: Target, pusher: P, identifier: usize) where C: 'static { - let mut logging: Option = AsWorker::logging(&self.scope()); + let mut logging: Option = self.scope().worker().logging(); logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent { id: identifier, scope_addr: self.scope.addr().to_vec(), diff --git a/timely/src/progress/broadcast.rs b/timely/src/progress/broadcast.rs index 28e835aa9..918ee70f4 100644 --- a/timely/src/progress/broadcast.rs +++ b/timely/src/progress/broadcast.rs @@ -32,7 +32,7 @@ pub struct Progcaster { impl Progcaster { /// Creates a new `Progcaster` using a channel from the supplied worker. - pub fn new(worker: &mut A, addr: Rc<[usize]>, identifier: usize, mut logging: Option, progress_logging: Option>) -> Progcaster { + pub fn new(worker: &crate::worker::Worker, addr: Rc<[usize]>, identifier: usize, mut logging: Option, progress_logging: Option>) -> Progcaster { let channel_identifier = worker.new_identifier(); let (pusher, puller) = worker.broadcast(channel_identifier, addr); diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 9769ebaf5..19f7a4ea4 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -10,6 +10,7 @@ use std::cell::RefCell; use std::collections::BinaryHeap; use std::cmp::Reverse; +use crate::scheduling::Scheduler; use crate::logging::TimelyLogger as Logger; use crate::logging::TimelySummaryLogger as SummaryLogger; @@ -148,7 +149,7 @@ where } /// Now that initialization is complete, actually build a subgraph. - pub fn build(mut self, worker: &mut A) -> Subgraph { + pub fn build(mut self, worker: &crate::worker::Worker) -> Subgraph { // at this point, the subgraph is frozen. we should initialize any internal state which // may have been determined after construction (e.g. the numbers of inputs and outputs). // we also need to determine what to return as a summary and initial capabilities, which diff --git a/timely/src/worker.rs b/timely/src/worker.rs index f8cee248e..2d5632e55 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -134,7 +134,6 @@ impl Config { /// let mut config = timely::Config::process(3); /// config.worker.set("example".to_string(), 7u64); /// timely::execute(config, |worker| { - /// use crate::timely::worker::AsWorker; /// assert_eq!(worker.config().get::("example"), Some(&7)); /// }).unwrap(); /// ``` @@ -157,7 +156,6 @@ impl Config { /// let mut config = timely::Config::process(3); /// config.worker.set("example".to_string(), 7u64); /// timely::execute(config, |worker| { - /// use crate::timely::worker::AsWorker; /// assert_eq!(worker.config().get::("example"), Some(&7)); /// }).unwrap(); /// ``` @@ -166,51 +164,6 @@ impl Config { } } -/// Methods provided by the root Worker. -/// -/// These methods are often proxied by child scopes, and this trait provides access. -pub trait AsWorker : Scheduler { - /// Returns the worker configuration parameters. - fn config(&self) -> &Config; - /// Index of the worker among its peers. - fn index(&self) -> usize; - /// Number of peer workers. - fn peers(&self) -> usize; - /// Allocates a new channel from a supplied identifier and address. - /// - /// The identifier is used to identify the underlying channel and route - /// its data. It should be distinct from other identifiers passed used - /// for allocation, but can otherwise be arbitrary. - /// - /// The address should specify a path to an operator that should be - /// scheduled in response to the receipt of records on the channel. - /// Most commonly, this would be the address of the *target* of the - /// channel. - fn allocate(&mut self, identifier: usize, address: Rc<[usize]>) -> (Vec>>, Box>); - /// Constructs a pipeline channel from the worker to itself. - /// - /// By default this method uses the native channel allocation mechanism, but the expectation is - /// that this behavior will be overridden to be more efficient. - fn pipeline(&mut self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher, ThreadPuller); - - /// Allocates a broadcast channel, where each pushed message is received by all. - fn broadcast(&mut self, identifier: usize, address: Rc<[usize]>) -> (Box>, Box>); - - /// Allocates a new worker-unique identifier. - fn new_identifier(&mut self) -> usize; - /// The next worker-unique identifier to be allocated. - fn peek_identifier(&self) -> usize; - /// Provides access to named logging streams. - fn log_register(&self) -> Option>; - /// Acquires a logger by name, if the log register exists and the name is registered. - /// - /// For a more precise understanding of why a result is `None` one can use the direct functions. - fn logger_for(&self, name: &str) -> Option> { - self.log_register().and_then(|l| l.get(name)) - } - /// Provides access to the timely logging stream. - fn logging(&self) -> Option { self.logger_for("timely").map(Into::into) } -} /// A `Worker` is the entry point to a timely dataflow computation. It wraps an `Allocator` /// and has a list of dataflows that it manages. @@ -237,38 +190,6 @@ pub struct Worker { temp_channel_ids: Rc>>, } -impl AsWorker for Worker { - fn config(&self) -> &Config { &self.config } - fn index(&self) -> usize { self.allocator.borrow().index() } - fn peers(&self) -> usize { self.allocator.borrow().peers() } - fn allocate(&mut self, identifier: usize, address: Rc<[usize]>) -> (Vec>>, Box>) { - if address.is_empty() { panic!("Unacceptable address: Length zero"); } - let mut paths = self.paths.borrow_mut(); - paths.insert(identifier, address); - self.temp_channel_ids.borrow_mut().push(identifier); - self.allocator.borrow_mut().allocate(identifier) - } - fn pipeline(&mut self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher, ThreadPuller) { - if address.is_empty() { panic!("Unacceptable address: Length zero"); } - let mut paths = self.paths.borrow_mut(); - paths.insert(identifier, address); - self.temp_channel_ids.borrow_mut().push(identifier); - self.allocator.borrow_mut().pipeline(identifier) - } - fn broadcast(&mut self, identifier: usize, address: Rc<[usize]>) -> (Box>, Box>) { - if address.is_empty() { panic!("Unacceptable address: Length zero"); } - let mut paths = self.paths.borrow_mut(); - paths.insert(identifier, address); - self.temp_channel_ids.borrow_mut().push(identifier); - self.allocator.borrow_mut().broadcast(identifier) - } - - fn new_identifier(&mut self) -> usize { self.new_identifier() } - fn peek_identifier(&self) -> usize { self.peek_identifier() } - fn log_register(&self) -> Option> { - self.log_register() - } -} impl Scheduler for Worker { fn activations(&self) -> Rc> { @@ -543,7 +464,7 @@ impl Worker { /// /// This method is public, though it is not expected to be widely used outside /// of the timely dataflow system. - pub fn new_identifier(&mut self) -> usize { + pub fn new_identifier(&self) -> usize { *self.identifiers.borrow_mut() += 1; *self.identifiers.borrow() - 1 } @@ -571,6 +492,44 @@ impl Worker { self.logging.as_ref().map(|l| l.borrow_mut()) } + /// Returns the worker configuration parameters. + pub fn config(&self) -> &Config { &self.config } + + /// Acquires a logger by name, if the log register exists and the name is registered. + pub fn logger_for(&self, name: &str) -> Option> { + self.log_register().and_then(|l| l.get(name)) + } + + /// Provides access to the timely logging stream. + pub fn logging(&self) -> Option { self.logger_for("timely").map(Into::into) } + + /// Allocates a new channel from a supplied identifier and address. + pub fn allocate(&self, identifier: usize, address: Rc<[usize]>) -> (Vec>>, Box>) { + if address.is_empty() { panic!("Unacceptable address: Length zero"); } + let mut paths = self.paths.borrow_mut(); + paths.insert(identifier, address); + self.temp_channel_ids.borrow_mut().push(identifier); + self.allocator.borrow_mut().allocate(identifier) + } + + /// Constructs a pipeline channel from the worker to itself. + pub fn pipeline(&self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher, ThreadPuller) { + if address.is_empty() { panic!("Unacceptable address: Length zero"); } + let mut paths = self.paths.borrow_mut(); + paths.insert(identifier, address); + self.temp_channel_ids.borrow_mut().push(identifier); + self.allocator.borrow_mut().pipeline(identifier) + } + + /// Allocates a broadcast channel, where each pushed message is received by all. + pub fn broadcast(&self, identifier: usize, address: Rc<[usize]>) -> (Box>, Box>) { + if address.is_empty() { panic!("Unacceptable address: Length zero"); } + let mut paths = self.paths.borrow_mut(); + paths.insert(identifier, address); + self.temp_channel_ids.borrow_mut().push(identifier); + self.allocator.borrow_mut().broadcast(identifier) + } + /// Construct a new dataflow. /// /// # Examples From 80e6bb9059f83f057f1afd01904a72a53d606cbf Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 13 Apr 2026 04:00:23 -0400 Subject: [PATCH 2/9] Remove Scheduler trait in favor of inherents --- mdbook/src/chapter_2/chapter_2_4.md | 1 - .../dataflow/operators/core/capture/replay.rs | 1 - .../src/dataflow/operators/core/enterleave.rs | 2 -- timely/src/dataflow/operators/core/input.rs | 2 -- .../src/dataflow/operators/core/to_stream.rs | 1 - .../operators/core/unordered_input.rs | 2 -- .../dataflow/operators/generic/builder_raw.rs | 1 - .../dataflow/operators/generic/operator.rs | 1 - .../dataflow/operators/vec/flow_controlled.rs | 1 - timely/src/dataflow/scope.rs | 9 ++++---- timely/src/progress/subgraph.rs | 1 - timely/src/scheduling/mod.rs | 20 ----------------- timely/src/synchronization/sequence.rs | 1 - timely/src/worker.rs | 22 ++++++++++++++----- 14 files changed, 20 insertions(+), 45 deletions(-) diff --git a/mdbook/src/chapter_2/chapter_2_4.md b/mdbook/src/chapter_2/chapter_2_4.md index 49e71eaf7..860ea0eeb 100644 --- a/mdbook/src/chapter_2/chapter_2_4.md +++ b/mdbook/src/chapter_2/chapter_2_4.md @@ -86,7 +86,6 @@ fn main() { source(scope, "Source", |capability, info| { // Acquire a re-activator for this operator. - use timely::scheduling::Scheduler; let activator = scope.activator_for(info.address); let mut cap = Some(capability); diff --git a/timely/src/dataflow/operators/core/capture/replay.rs b/timely/src/dataflow/operators/core/capture/replay.rs index 114299fd0..cf2bb832d 100644 --- a/timely/src/dataflow/operators/core/capture/replay.rs +++ b/timely/src/dataflow/operators/core/capture/replay.rs @@ -39,7 +39,6 @@ //! than that in which the stream was captured. use crate::dataflow::{Scope, Stream}; -use crate::scheduling::Scheduler; use crate::dataflow::channels::pushers::Counter as PushCounter; use crate::dataflow::operators::generic::builder_raw::OperatorBuilder; use crate::progress::Timestamp; diff --git a/timely/src/dataflow/operators/core/enterleave.rs b/timely/src/dataflow/operators/core/enterleave.rs index 656244f34..af3e897fc 100644 --- a/timely/src/dataflow/operators/core/enterleave.rs +++ b/timely/src/dataflow/operators/core/enterleave.rs @@ -61,8 +61,6 @@ where { fn enter<'inner>(self, inner: &Scope<'inner, TInner>) -> Stream<'inner, TInner, C> { - use crate::scheduling::Scheduler; - // Validate that `inner` is a child of `self`'s scope. let inner_addr = inner.addr(); let outer_addr = self.scope().addr(); diff --git a/timely/src/dataflow/operators/core/input.rs b/timely/src/dataflow/operators/core/input.rs index e0aaea73b..6ccec07fa 100644 --- a/timely/src/dataflow/operators/core/input.rs +++ b/timely/src/dataflow/operators/core/input.rs @@ -4,8 +4,6 @@ use std::rc::Rc; use std::cell::RefCell; use crate::container::{CapacityContainerBuilder, PushInto}; -use crate::scheduling::Scheduler; - use crate::scheduling::{Schedule, Activator}; use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch}; diff --git a/timely/src/dataflow/operators/core/to_stream.rs b/timely/src/dataflow/operators/core/to_stream.rs index 6f12933b9..f45554296 100644 --- a/timely/src/dataflow/operators/core/to_stream.rs +++ b/timely/src/dataflow/operators/core/to_stream.rs @@ -1,7 +1,6 @@ //! Conversion to the `Stream` type from iterators. use crate::container::{CapacityContainerBuilder, SizableContainer, PushInto}; -use crate::scheduling::Scheduler; use crate::progress::Timestamp; use crate::{Container, ContainerBuilder}; use crate::dataflow::operators::generic::operator::source; diff --git a/timely/src/dataflow/operators/core/unordered_input.rs b/timely/src/dataflow/operators/core/unordered_input.rs index 7b36b5838..1c881cbf9 100644 --- a/timely/src/dataflow/operators/core/unordered_input.rs +++ b/timely/src/dataflow/operators/core/unordered_input.rs @@ -4,8 +4,6 @@ use std::rc::Rc; use std::cell::RefCell; use crate::ContainerBuilder; -use crate::scheduling::Scheduler; - use crate::scheduling::{Schedule, ActivateOnDrop}; use crate::progress::{Operate, operate::SharedProgress, Timestamp}; diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index 13c2189d5..27cdfb454 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -9,7 +9,6 @@ use std::rc::Rc; use std::cell::RefCell; use crate::scheduling::{Schedule, Activations}; -use crate::scheduling::Scheduler; use crate::progress::{Source, Target}; use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain}; diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index 14a0d0ce9..cdf4445b2 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -504,7 +504,6 @@ impl<'scope, T: Timestamp, C1: Container> Operator<'scope, T, C1> for Stream<'sc /// /// # Examples /// ``` -/// use timely::scheduling::Scheduler; /// use timely::dataflow::operators::Inspect; /// use timely::dataflow::operators::generic::operator::source; /// use timely::dataflow::Scope; diff --git a/timely/src/dataflow/operators/vec/flow_controlled.rs b/timely/src/dataflow/operators/vec/flow_controlled.rs index b794cccb8..0c9d489fc 100644 --- a/timely/src/dataflow/operators/vec/flow_controlled.rs +++ b/timely/src/dataflow/operators/vec/flow_controlled.rs @@ -1,7 +1,6 @@ //! Methods to construct flow-controlled sources. use crate::order::TotalOrder; -use crate::scheduling::Scheduler; use crate::progress::timestamp::Timestamp; use crate::dataflow::operators::generic::operator::source; use crate::dataflow::operators::probe::Handle; diff --git a/timely/src/dataflow/scope.rs b/timely/src/dataflow/scope.rs index 16ac3c8c1..77f167994 100644 --- a/timely/src/dataflow/scope.rs +++ b/timely/src/dataflow/scope.rs @@ -3,7 +3,6 @@ use std::rc::Rc; use std::cell::RefCell; -use crate::scheduling::Scheduler; use crate::scheduling::activate::Activations; use crate::progress::{Timestamp, Operate, Subgraph, SubgraphBuilder}; use crate::progress::{Source, Target}; @@ -42,6 +41,10 @@ impl<'scope, T: Timestamp> Scope<'scope, T> { pub fn index(&self) -> usize { self.worker.index() } /// The total number of workers in the computation. pub fn peers(&self) -> usize { self.worker.peers() } + /// Provides a shared handle to the activation scheduler. + pub fn activations(&self) -> Rc> { self.worker.activations() } + /// Constructs an `Activator` tied to the specified operator address. + pub fn activator_for(&self, path: Rc<[usize]>) -> crate::scheduling::Activator { self.worker.activator_for(path) } /// A useful name describing the scope. pub fn name(&self) -> String { self.subgraph.borrow().name.clone() } @@ -230,10 +233,6 @@ impl<'scope, T: Timestamp> Scope<'scope, T> { } } -impl<'scope, T: Timestamp> Scheduler for Scope<'scope, T> { - fn activations(&self) -> Rc> { self.worker.activations() } -} - impl<'scope, T: Timestamp> Clone for Scope<'scope, T> { fn clone(&self) -> Self { Scope { diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 19f7a4ea4..7fb289f97 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -10,7 +10,6 @@ use std::cell::RefCell; use std::collections::BinaryHeap; use std::cmp::Reverse; -use crate::scheduling::Scheduler; use crate::logging::TimelyLogger as Logger; use crate::logging::TimelySummaryLogger as SummaryLogger; diff --git a/timely/src/scheduling/mod.rs b/timely/src/scheduling/mod.rs index fef24d1f9..ec46ddbf0 100644 --- a/timely/src/scheduling/mod.rs +++ b/timely/src/scheduling/mod.rs @@ -1,8 +1,5 @@ //! Types and traits to activate and schedule fibers. -use std::rc::Rc; -use std::cell::RefCell; - pub mod activate; pub use self::activate::{Activations, Activator, ActivateOnDrop, SyncActivator}; @@ -19,20 +16,3 @@ pub trait Schedule { /// work and would be upset if the computation terminated. fn schedule(&mut self) -> bool; } - -/// Methods for types which schedule fibers. -pub trait Scheduler { - /// Provides a shared handle to the activation scheduler. - fn activations(&self) -> Rc>; - - /// Constructs an `Activator` tied to the specified operator address. - fn activator_for(&self, path: Rc<[usize]>) -> Activator { - Activator::new(path, self.activations()) - } - - /// Constructs a `SyncActivator` tied to the specified operator address. - fn sync_activator_for(&self, path: Vec) -> SyncActivator { - let sync_activations = self.activations().borrow().sync(); - SyncActivator::new(path, sync_activations) - } -} diff --git a/timely/src/synchronization/sequence.rs b/timely/src/synchronization/sequence.rs index d3a607295..9b06fe8b4 100644 --- a/timely/src/synchronization/sequence.rs +++ b/timely/src/synchronization/sequence.rs @@ -6,7 +6,6 @@ use std::time::{Instant, Duration}; use std::collections::VecDeque; use crate::{ExchangeData, PartialOrder}; -use crate::scheduling::Scheduler; use crate::worker::Worker; use crate::dataflow::channels::pact::Exchange; use crate::dataflow::operators::generic::operator::source; diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 2d5632e55..2dad9f72d 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -11,7 +11,7 @@ use std::sync::Arc; use crate::communication::{Allocator, Exchangeable, Push, Pull}; use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; -use crate::scheduling::{Schedule, Scheduler, Activations}; +use crate::scheduling::{Schedule, Activations, Activator, SyncActivator}; use crate::progress::timestamp::{Refines}; use crate::progress::SubgraphBuilder; use crate::progress::operate::Operate; @@ -191,11 +191,6 @@ pub struct Worker { } -impl Scheduler for Worker { - fn activations(&self) -> Rc> { - Rc::clone(&self.activations) - } -} impl Worker { /// Allocates a new `Worker` bound to a channel allocator. @@ -495,6 +490,21 @@ impl Worker { /// Returns the worker configuration parameters. pub fn config(&self) -> &Config { &self.config } + /// Provides a shared handle to the activation scheduler. + pub fn activations(&self) -> Rc> { + Rc::clone(&self.activations) + } + + /// Constructs an `Activator` tied to the specified operator address. + pub fn activator_for(&self, path: Rc<[usize]>) -> Activator { + Activator::new(path, self.activations()) + } + + /// Constructs a `SyncActivator` tied to the specified operator address. + pub fn sync_activator_for(&self, path: Vec) -> SyncActivator { + SyncActivator::new(path, self.activations().borrow().sync()) + } + /// Acquires a logger by name, if the log register exists and the name is registered. pub fn logger_for(&self, name: &str) -> Option> { self.log_register().and_then(|l| l.get(name)) From 7e30971a43bb6645e5245b2b7f905092c0916baf Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 13 Apr 2026 04:01:52 -0400 Subject: [PATCH 3/9] Remove OperatorSlot::scope_mut --- timely/src/dataflow/scope.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/timely/src/dataflow/scope.rs b/timely/src/dataflow/scope.rs index 77f167994..4b1dd7c56 100644 --- a/timely/src/dataflow/scope.rs +++ b/timely/src/dataflow/scope.rs @@ -290,10 +290,6 @@ impl<'scope, T: Timestamp> OperatorSlot<'scope, T> { self.scope.subgraph.borrow_mut().add_child(operator, self.index, self.identifier); self.installed = true; } - - /// Mutable access to the containing scope. Used to register a built [`Subgraph`] - /// before [`OperatorSlot::install`]. - pub fn scope_mut(&mut self) -> &mut Scope<'scope, T> { &mut self.scope } } impl<'scope, T: Timestamp> Drop for OperatorSlot<'scope, T> { From 6a82f0e391be50672afcc255c3aa25bc172aa205 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 13 Apr 2026 04:09:28 -0400 Subject: [PATCH 4/9] Excise &mut Scope usage --- .../src/dataflow/operators/core/capture/replay.rs | 6 +++--- timely/src/dataflow/operators/core/feedback.rs | 8 ++++---- timely/src/dataflow/operators/core/input.rs | 14 +++++++------- timely/src/dataflow/operators/core/to_stream.rs | 8 ++++---- .../src/dataflow/operators/core/unordered_input.rs | 4 ++-- timely/src/dataflow/operators/vec/input.rs | 8 ++++---- timely/src/dataflow/operators/vec/to_stream.rs | 4 ++-- .../src/dataflow/operators/vec/unordered_input.rs | 4 ++-- timely/src/dataflow/scope.rs | 14 +++++++------- timely/src/execute.rs | 2 +- timely/src/worker.rs | 10 +++++----- 11 files changed, 41 insertions(+), 41 deletions(-) diff --git a/timely/src/dataflow/operators/core/capture/replay.rs b/timely/src/dataflow/operators/core/capture/replay.rs index cf2bb832d..f9f829ee4 100644 --- a/timely/src/dataflow/operators/core/capture/replay.rs +++ b/timely/src/dataflow/operators/core/capture/replay.rs @@ -51,7 +51,7 @@ use crate::dataflow::channels::Message; /// Replay a capture stream into a scope with the same timestamp. pub trait Replay : Sized { /// Replays `self` into the provided scope, as a `Stream<'scope, T, C>`. - fn replay_into<'scope>(self, scope: &mut Scope<'scope, T>) -> Stream<'scope, T, C> { + fn replay_into<'scope>(self, scope: &Scope<'scope, T>) -> Stream<'scope, T, C> { self.replay_core(scope, Some(std::time::Duration::new(0, 0))) } /// Replays `self` into the provided scope, as a `Stream<'scope, T, C>`. @@ -59,7 +59,7 @@ pub trait Replay : Sized { /// The `period` argument allows the specification of a re-activation period, where the operator /// will re-activate itself every so often. The `None` argument instructs the operator not to /// re-activate itself. - fn replay_core<'scope>(self, scope: &mut Scope<'scope, T>, period: Option) -> Stream<'scope, T, C>; + fn replay_core<'scope>(self, scope: &Scope<'scope, T>, period: Option) -> Stream<'scope, T, C>; } impl Replay for I @@ -67,7 +67,7 @@ where I : IntoIterator, ::Item: EventIterator+'static, { - fn replay_core<'scope>(self, scope: &mut Scope<'scope, T>, period: Option) -> Stream<'scope, T, C>{ + fn replay_core<'scope>(self, scope: &Scope<'scope, T>, period: Option) -> Stream<'scope, T, C>{ let mut builder = OperatorBuilder::new("Replay".to_owned(), scope.clone()); diff --git a/timely/src/dataflow/operators/core/feedback.rs b/timely/src/dataflow/operators/core/feedback.rs index 7e5824a56..a551055d7 100644 --- a/timely/src/dataflow/operators/core/feedback.rs +++ b/timely/src/dataflow/operators/core/feedback.rs @@ -35,7 +35,7 @@ pub trait Feedback<'scope, T: Timestamp> { /// .connect_loop(handle); /// }); /// ``` - fn feedback(&mut self, summary: ::Summary) -> (Handle<'scope, T, C>, Stream<'scope, T, C>); + fn feedback(&self, summary: ::Summary) -> (Handle<'scope, T, C>, Stream<'scope, T, C>); } /// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`. @@ -65,12 +65,12 @@ pub trait LoopVariable<'scope, TOuter: Timestamp, TInner: Timestamp> { /// }); /// }); /// ``` - fn loop_variable(&mut self, summary: TInner::Summary) -> (Handle<'scope, Product, C>, Stream<'scope, Product, C>); + fn loop_variable(&self, summary: TInner::Summary) -> (Handle<'scope, Product, C>, Stream<'scope, Product, C>); } impl<'scope, T: Timestamp> Feedback<'scope, T> for Scope<'scope, T> { - fn feedback(&mut self, summary: ::Summary) -> (Handle<'scope, T, C>, Stream<'scope, T, C>) { + fn feedback(&self, summary: ::Summary) -> (Handle<'scope, T, C>, Stream<'scope, T, C>) { let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone()); let (output, stream) = builder.new_output(); @@ -80,7 +80,7 @@ impl<'scope, T: Timestamp> Feedback<'scope, T> for Scope<'scope, T> { } impl<'scope, TOuter: Timestamp, TInner: Timestamp> LoopVariable<'scope, TOuter, TInner> for Iterative<'scope, TOuter, TInner> { - fn loop_variable(&mut self, summary: TInner::Summary) -> (Handle<'scope, Product, C>, Stream<'scope, Product, C>) { + fn loop_variable(&self, summary: TInner::Summary) -> (Handle<'scope, Product, C>, Stream<'scope, Product, C>) { self.feedback(Product::new(Default::default(), summary)) } } diff --git a/timely/src/dataflow/operators/core/input.rs b/timely/src/dataflow/operators/core/input.rs index 6ccec07fa..247779163 100644 --- a/timely/src/dataflow/operators/core/input.rs +++ b/timely/src/dataflow/operators/core/input.rs @@ -61,7 +61,7 @@ pub trait Input<'scope> { /// } /// }); /// ``` - fn new_input(&mut self) -> (Handle>, Stream<'scope, Self::Timestamp, C>); + fn new_input(&self) -> (Handle>, Stream<'scope, Self::Timestamp, C>); /// Create a new [Stream] and [Handle] through which to supply input. /// @@ -98,7 +98,7 @@ pub trait Input<'scope> { /// } /// }); /// ``` - fn new_input_with_builder>(&mut self) -> (Handle, Stream<'scope, Self::Timestamp, CB::Container>); + fn new_input_with_builder>(&self) -> (Handle, Stream<'scope, Self::Timestamp, CB::Container>); /// Create a new stream from a supplied interactive handle. /// @@ -131,25 +131,25 @@ pub trait Input<'scope> { /// } /// }); /// ``` - fn input_from>(&mut self, handle: &mut Handle) -> Stream<'scope, Self::Timestamp, CB::Container>; + fn input_from>(&self, handle: &mut Handle) -> Stream<'scope, Self::Timestamp, CB::Container>; } use crate::order::TotalOrder; impl<'scope, T: Timestamp + TotalOrder> Input<'scope> for Scope<'scope, T> { type Timestamp = T; - fn new_input(&mut self) -> (Handle>, Stream<'scope, T, C>) { + fn new_input(&self) -> (Handle>, Stream<'scope, T, C>) { let mut handle = Handle::new(); let stream = self.input_from(&mut handle); (handle, stream) } - fn new_input_with_builder>(&mut self) -> (Handle, Stream<'scope, T, CB::Container>) { + fn new_input_with_builder>(&self) -> (Handle, Stream<'scope, T, CB::Container>) { let mut handle = Handle::new_with_builder(); let stream = self.input_from(&mut handle); (handle, stream) } - fn input_from>(&mut self, handle: &mut Handle) -> Stream<'scope, T, CB::Container> { + fn input_from>(&self, handle: &mut Handle) -> Stream<'scope, T, CB::Container> { let (output, registrar) = Tee::::new(); let counter = Counter::new(output); let produced = Rc::clone(counter.produced()); @@ -336,7 +336,7 @@ impl> Handle { /// } /// }); /// ``` - pub fn to_stream<'scope>(&mut self, scope: &mut Scope<'scope, T>) -> Stream<'scope, T, CB::Container> + pub fn to_stream<'scope>(&mut self, scope: &Scope<'scope, T>) -> Stream<'scope, T, CB::Container> where T: TotalOrder, { diff --git a/timely/src/dataflow/operators/core/to_stream.rs b/timely/src/dataflow/operators/core/to_stream.rs index f45554296..7b8b5a7d8 100644 --- a/timely/src/dataflow/operators/core/to_stream.rs +++ b/timely/src/dataflow/operators/core/to_stream.rs @@ -29,11 +29,11 @@ pub trait ToStreamBuilder { /// /// assert_eq!(data1.extract(), data2.extract()); /// ``` - fn to_stream_with_builder<'scope, T: Timestamp>(self, scope: &mut Scope<'scope, T>) -> Stream<'scope, T, CB::Container>; + fn to_stream_with_builder<'scope, T: Timestamp>(self, scope: &Scope<'scope, T>) -> Stream<'scope, T, CB::Container>; } impl ToStreamBuilder for I where CB: PushInto { - fn to_stream_with_builder<'scope, T: Timestamp>(self, scope: &mut Scope<'scope, T>) -> Stream<'scope, T, CB::Container> { + fn to_stream_with_builder<'scope, T: Timestamp>(self, scope: &Scope<'scope, T>) -> Stream<'scope, T, CB::Container> { source::<_, CB, _, _>(scope, "ToStreamBuilder", |capability, info| { @@ -79,11 +79,11 @@ pub trait ToStream { /// /// assert_eq!(data1.extract(), data2.extract()); /// ``` - fn to_stream<'scope, T: Timestamp>(self, scope: &mut Scope<'scope, T>) -> Stream<'scope, T, C>; + fn to_stream<'scope, T: Timestamp>(self, scope: &Scope<'scope, T>) -> Stream<'scope, T, C>; } impl ToStream for I where C: PushInto { - fn to_stream<'scope, T: Timestamp>(self, scope: &mut Scope<'scope, T>) -> Stream<'scope, T, C> { + fn to_stream<'scope, T: Timestamp>(self, scope: &Scope<'scope, T>) -> Stream<'scope, T, C> { ToStreamBuilder::>::to_stream_with_builder(self, scope) } } diff --git a/timely/src/dataflow/operators/core/unordered_input.rs b/timely/src/dataflow/operators/core/unordered_input.rs index 1c881cbf9..1bb66f7d5 100644 --- a/timely/src/dataflow/operators/core/unordered_input.rs +++ b/timely/src/dataflow/operators/core/unordered_input.rs @@ -73,11 +73,11 @@ pub trait UnorderedInput<'scope, T: Timestamp> { /// assert_eq!(extract[i], (i, vec![i])); /// } /// ``` - fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), Stream<'scope, T, CB::Container>); + fn new_unordered_input(&self) -> ((UnorderedHandle, ActivateCapability), Stream<'scope, T, CB::Container>); } impl<'scope, T: Timestamp> UnorderedInput<'scope, T> for Scope<'scope, T> { - fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), Stream<'scope, T, CB::Container>) { + fn new_unordered_input(&self) -> ((UnorderedHandle, ActivateCapability), Stream<'scope, T, CB::Container>) { let (output, registrar) = Tee::::new(); let internal = Rc::new(RefCell::new(ChangeBatch::new())); diff --git a/timely/src/dataflow/operators/vec/input.rs b/timely/src/dataflow/operators/vec/input.rs index c2185b6e4..b6ce4b229 100644 --- a/timely/src/dataflow/operators/vec/input.rs +++ b/timely/src/dataflow/operators/vec/input.rs @@ -50,7 +50,7 @@ pub trait Input<'scope> { /// } /// }); /// ``` - fn new_input(&mut self) -> (Handle, StreamVec<'scope, Self::Timestamp, D>); + fn new_input(&self) -> (Handle, StreamVec<'scope, Self::Timestamp, D>); /// Create a new stream from a supplied interactive handle. /// @@ -83,17 +83,17 @@ pub trait Input<'scope> { /// } /// }); /// ``` - fn input_from(&mut self, handle: &mut Handle) -> StreamVec<'scope, Self::Timestamp, D>; + fn input_from(&self, handle: &mut Handle) -> StreamVec<'scope, Self::Timestamp, D>; } use crate::order::TotalOrder; impl<'scope, T: Timestamp + TotalOrder> Input<'scope> for Scope<'scope, T> { type Timestamp = T; - fn new_input(&mut self) -> (Handle, StreamVec<'scope, T, D>) { + fn new_input(&self) -> (Handle, StreamVec<'scope, T, D>) { InputCore::new_input(self) } - fn input_from(&mut self, handle: &mut Handle) -> StreamVec<'scope, T, D> { + fn input_from(&self, handle: &mut Handle) -> StreamVec<'scope, T, D> { InputCore::input_from(self, handle) } } diff --git a/timely/src/dataflow/operators/vec/to_stream.rs b/timely/src/dataflow/operators/vec/to_stream.rs index f0d266fb1..196a54c66 100644 --- a/timely/src/dataflow/operators/vec/to_stream.rs +++ b/timely/src/dataflow/operators/vec/to_stream.rs @@ -22,11 +22,11 @@ pub trait ToStream { /// /// assert_eq!(data1.extract(), data2.extract()); /// ``` - fn to_stream<'scope, T: Timestamp>(self, scope: &mut Scope<'scope, T>) -> StreamVec<'scope, T, D>; + fn to_stream<'scope, T: Timestamp>(self, scope: &Scope<'scope, T>) -> StreamVec<'scope, T, D>; } impl ToStream for I { - fn to_stream<'scope, T: Timestamp>(self, scope: &mut Scope<'scope, T>) -> StreamVec<'scope, T, I::Item> { + fn to_stream<'scope, T: Timestamp>(self, scope: &Scope<'scope, T>) -> StreamVec<'scope, T, I::Item> { ToStreamCore::to_stream(self, scope) } } diff --git a/timely/src/dataflow/operators/vec/unordered_input.rs b/timely/src/dataflow/operators/vec/unordered_input.rs index a7cc5fd2e..9aa84121d 100644 --- a/timely/src/dataflow/operators/vec/unordered_input.rs +++ b/timely/src/dataflow/operators/vec/unordered_input.rs @@ -61,12 +61,12 @@ pub trait UnorderedInput<'scope, T: Timestamp> { /// assert_eq!(extract[i], (i, vec![i])); /// } /// ``` - fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), StreamVec<'scope, T, D>); + fn new_unordered_input(&self) -> ((UnorderedHandle, ActivateCapability), StreamVec<'scope, T, D>); } impl<'scope, T: Timestamp> UnorderedInput<'scope, T> for Scope<'scope, T> { - fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), StreamVec<'scope, T, D>) { + fn new_unordered_input(&self) -> ((UnorderedHandle, ActivateCapability), StreamVec<'scope, T, D>) { UnorderedInputCore::new_unordered_input(self) } } diff --git a/timely/src/dataflow/scope.rs b/timely/src/dataflow/scope.rs index 4b1dd7c56..79fe1c388 100644 --- a/timely/src/dataflow/scope.rs +++ b/timely/src/dataflow/scope.rs @@ -103,7 +103,7 @@ impl<'scope, T: Timestamp> Scope<'scope, T> { pub fn scoped(&self, name: &str, func: F) -> R where T2: Timestamp + Refines, - F: FnOnce(&mut Scope) -> R, + F: FnOnce(&Scope) -> R, { let (result, subgraph, slot) = self.scoped_raw(name, func); slot.install(Box::new(subgraph)); @@ -116,7 +116,7 @@ impl<'scope, T: Timestamp> Scope<'scope, T> { pub fn scoped_raw(&self, name: &str, func: F) -> (R, Subgraph, OperatorSlot<'scope, T>) where T2: Timestamp + Refines, - F: FnOnce(&mut Scope) -> R, + F: FnOnce(&Scope) -> R, { let parent = self.clone(); let slot = parent.reserve_operator(); @@ -131,14 +131,14 @@ impl<'scope, T: Timestamp> Scope<'scope, T> { path, identifier, self.worker().logging(), summary_logging, name, )); - let mut child = Scope { + let child = Scope { subgraph: &subgraph, worker: parent.worker.clone(), logging: parent.logging.clone(), progress_logging, }; - let result = func(&mut child); + let result = func(&child); drop(child); let subgraph = subgraph.into_inner().build(&parent.worker); (result, subgraph, slot) @@ -168,7 +168,7 @@ impl<'scope, T: Timestamp> Scope<'scope, T> { pub fn iterative(&self, func: F) -> R where T2: Timestamp, - F: FnOnce(&mut Scope>) -> R, + F: FnOnce(&Scope>) -> R, { self.scoped::, R, F>("Iterative", func) } @@ -196,7 +196,7 @@ impl<'scope, T: Timestamp> Scope<'scope, T> { /// ``` pub fn region(&self, func: F) -> R where - F: FnOnce(&mut Scope) -> R, + F: FnOnce(&Scope) -> R, { self.region_named("Region", func) } @@ -227,7 +227,7 @@ impl<'scope, T: Timestamp> Scope<'scope, T> { /// ``` pub fn region_named(&self, name: &str, func: F) -> R where - F: FnOnce(&mut Scope) -> R, + F: FnOnce(&Scope) -> R, { self.scoped::(name, func) } diff --git a/timely/src/execute.rs b/timely/src/execute.rs index 62814357a..1c0cc9eb6 100644 --- a/timely/src/execute.rs +++ b/timely/src/execute.rs @@ -124,7 +124,7 @@ impl Config { pub fn example(func: F) -> T where T: Send+'static, - F: FnOnce(&mut Scope)->T+Send+Sync+'static + F: FnOnce(&Scope)->T+Send+Sync+'static { crate::execute::execute_directly(|worker| worker.dataflow(|scope| func(scope))) } diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 2dad9f72d..baceeafde 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -558,7 +558,7 @@ impl Worker { pub fn dataflow(&mut self, func: F) -> R where T: Refines<()>, - F: FnOnce(&mut Scope)->R, + F: FnOnce(&Scope)->R, { self.dataflow_core("Dataflow", self.logging(), Box::new(()), |_, child| func(child)) } @@ -581,7 +581,7 @@ impl Worker { pub fn dataflow_named(&mut self, name: &str, func: F) -> R where T: Refines<()>, - F: FnOnce(&mut Scope)->R, + F: FnOnce(&Scope)->R, { self.dataflow_core(name, self.logging(), Box::new(()), |_, child| func(child)) } @@ -614,7 +614,7 @@ impl Worker { pub fn dataflow_core(&mut self, name: &str, mut logging: Option, mut resources: V, func: F) -> R where T: Refines<()>, - F: FnOnce(&mut V, &mut Scope)->R, + F: FnOnce(&mut V, &Scope)->R, V: Any+'static, { let dataflow_index = self.allocate_dataflow_index(); @@ -628,13 +628,13 @@ impl Worker { let subscope = RefCell::new(subscope); let result = { - let mut builder = Scope { + let builder = Scope { subgraph: &subscope, worker: self.clone(), logging: logging.clone(), progress_logging, }; - func(&mut resources, &mut builder) + func(&mut resources, &builder) }; let operator = subscope.into_inner().build(self); From 74b8de0a23fbfb0ec22c7b31b88a7acd15ae66bf Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 13 Apr 2026 04:16:12 -0400 Subject: [PATCH 5/9] Correct typo --- timely/src/dataflow/scope.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/timely/src/dataflow/scope.rs b/timely/src/dataflow/scope.rs index 79fe1c388..a236d8800 100644 --- a/timely/src/dataflow/scope.rs +++ b/timely/src/dataflow/scope.rs @@ -18,7 +18,7 @@ pub type Iterative<'scope, TOuter, TInner> = Scope<'scope, Product { /// The subgraph under assembly. From 734d37fc9ab3883d98cbbae324e4ffc95e1daae6 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 13 Apr 2026 05:34:18 -0400 Subject: [PATCH 6/9] Streamline Scope --- timely/src/dataflow/scope.rs | 31 ++++++++----------------------- timely/src/worker.rs | 5 +---- 2 files changed, 9 insertions(+), 27 deletions(-) diff --git a/timely/src/dataflow/scope.rs b/timely/src/dataflow/scope.rs index a236d8800..9eba1e93c 100644 --- a/timely/src/dataflow/scope.rs +++ b/timely/src/dataflow/scope.rs @@ -8,8 +8,6 @@ use crate::progress::{Timestamp, Operate, Subgraph, SubgraphBuilder}; use crate::progress::{Source, Target}; use crate::progress::timestamp::Refines; use crate::order::Product; -use crate::logging::TimelyLogger as Logger; -use crate::logging::TimelyProgressLogger as ProgressLogger; use crate::worker::Worker; /// Type alias for an iterative scope. @@ -26,17 +24,13 @@ pub struct Scope<'scope, T: Timestamp> { /// Stored as `Rc>` so that multiple `Scope` clones can work on the same subgraph. /// All methods on this type must release their borrow on this field before returning. pub(crate) subgraph: &'scope RefCell>, - /// A copy of the worker hosting this scope. - pub(crate) worker: Worker, - /// The log writer for this scope. - pub(crate) logging: Option, - /// The progress log writer for this scope. - pub(crate) progress_logging: Option>, + /// The worker hosting this scope. + pub(crate) worker: &'scope Worker, } impl<'scope, T: Timestamp> Scope<'scope, T> { /// Access to the underlying worker. - pub fn worker(&self) -> &Worker { &self.worker } + pub fn worker(&self) -> &'scope Worker { self.worker } /// This worker's index out of `0 .. self.peers()`. pub fn index(&self) -> usize { self.worker.index() } /// The total number of workers in the computation. @@ -118,29 +112,22 @@ impl<'scope, T: Timestamp> Scope<'scope, T> { T2: Timestamp + Refines, F: FnOnce(&Scope) -> R, { - let parent = self.clone(); - let slot = parent.reserve_operator(); + let slot = self.reserve_operator(); let path = slot.addr(); let identifier = slot.identifier(); let type_name = std::any::type_name::(); - let progress_logging = parent.worker().logger_for(&format!("timely/progress/{type_name}")); - let summary_logging = parent.worker().logger_for(&format!("timely/summary/{type_name}")); + let summary_logging = self.worker().logger_for(&format!("timely/summary/{type_name}")); let subgraph = RefCell::new(SubgraphBuilder::new_from( path, identifier, self.worker().logging(), summary_logging, name, )); - let child = Scope { - subgraph: &subgraph, - worker: parent.worker.clone(), - logging: parent.logging.clone(), - progress_logging, - }; + let child = Scope { subgraph: &subgraph, worker: self.worker }; let result = func(&child); drop(child); - let subgraph = subgraph.into_inner().build(&parent.worker); + let subgraph = subgraph.into_inner().build(self.worker); (result, subgraph, slot) } @@ -237,9 +224,7 @@ impl<'scope, T: Timestamp> Clone for Scope<'scope, T> { fn clone(&self) -> Self { Scope { subgraph: self.subgraph, - worker: self.worker.clone(), - logging: self.logging.clone(), - progress_logging: self.progress_logging.clone(), + worker: self.worker, } } } diff --git a/timely/src/worker.rs b/timely/src/worker.rs index baceeafde..50aef59f6 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -622,7 +622,6 @@ impl Worker { let identifier = self.new_identifier(); let type_name = std::any::type_name::(); - let progress_logging = self.logger_for(&format!("timely/progress/{}", type_name)); let summary_logging = self.logger_for(&format!("timely/summary/{}", type_name)); let subscope = SubgraphBuilder::new_from(addr, identifier, logging.clone(), summary_logging, name); let subscope = RefCell::new(subscope); @@ -630,9 +629,7 @@ impl Worker { let result = { let builder = Scope { subgraph: &subscope, - worker: self.clone(), - logging: logging.clone(), - progress_logging, + worker: self, }; func(&mut resources, &builder) }; From 549a2af19e10f40a93898e33a6e28805976b5cb8 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 13 Apr 2026 05:57:58 -0400 Subject: [PATCH 7/9] Implement Copy for Scope --- .../dataflow/operators/core/capture/replay.rs | 8 ++--- timely/src/dataflow/operators/core/concat.rs | 2 +- .../src/dataflow/operators/core/enterleave.rs | 20 ++++-------- .../src/dataflow/operators/core/feedback.rs | 2 +- timely/src/dataflow/operators/core/input.rs | 4 +-- .../src/dataflow/operators/core/to_stream.rs | 8 ++--- .../operators/core/unordered_input.rs | 2 +- .../dataflow/operators/generic/builder_raw.rs | 2 +- .../dataflow/operators/generic/operator.rs | 6 ++-- .../dataflow/operators/vec/flow_controlled.rs | 2 +- .../src/dataflow/operators/vec/to_stream.rs | 4 +-- timely/src/dataflow/scope.rs | 31 ++++++++----------- timely/src/dataflow/stream.rs | 4 +-- timely/src/execute.rs | 2 +- timely/src/synchronization/sequence.rs | 7 ++--- timely/src/worker.rs | 8 ++--- 16 files changed, 49 insertions(+), 63 deletions(-) diff --git a/timely/src/dataflow/operators/core/capture/replay.rs b/timely/src/dataflow/operators/core/capture/replay.rs index f9f829ee4..1e296298d 100644 --- a/timely/src/dataflow/operators/core/capture/replay.rs +++ b/timely/src/dataflow/operators/core/capture/replay.rs @@ -51,7 +51,7 @@ use crate::dataflow::channels::Message; /// Replay a capture stream into a scope with the same timestamp. pub trait Replay : Sized { /// Replays `self` into the provided scope, as a `Stream<'scope, T, C>`. - fn replay_into<'scope>(self, scope: &Scope<'scope, T>) -> Stream<'scope, T, C> { + fn replay_into<'scope>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, C> { self.replay_core(scope, Some(std::time::Duration::new(0, 0))) } /// Replays `self` into the provided scope, as a `Stream<'scope, T, C>`. @@ -59,7 +59,7 @@ pub trait Replay : Sized { /// The `period` argument allows the specification of a re-activation period, where the operator /// will re-activate itself every so often. The `None` argument instructs the operator not to /// re-activate itself. - fn replay_core<'scope>(self, scope: &Scope<'scope, T>, period: Option) -> Stream<'scope, T, C>; + fn replay_core<'scope>(self, scope: Scope<'scope, T>, period: Option) -> Stream<'scope, T, C>; } impl Replay for I @@ -67,9 +67,9 @@ where I : IntoIterator, ::Item: EventIterator+'static, { - fn replay_core<'scope>(self, scope: &Scope<'scope, T>, period: Option) -> Stream<'scope, T, C>{ + fn replay_core<'scope>(self, scope: Scope<'scope, T>, period: Option) -> Stream<'scope, T, C>{ - let mut builder = OperatorBuilder::new("Replay".to_owned(), scope.clone()); + let mut builder = OperatorBuilder::new("Replay".to_owned(), scope); let address = builder.operator_info().address; let activator = scope.activator_for(address); diff --git a/timely/src/dataflow/operators/core/concat.rs b/timely/src/dataflow/operators/core/concat.rs index d403a33fc..d98fabe90 100644 --- a/timely/src/dataflow/operators/core/concat.rs +++ b/timely/src/dataflow/operators/core/concat.rs @@ -63,7 +63,7 @@ impl<'scope, T: Timestamp> Concatenate<'scope, T> for Scope<'scope, T> { // create an operator builder. use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; - let mut builder = OperatorBuilder::new("Concatenate".to_string(), self.clone()); + let mut builder = OperatorBuilder::new("Concatenate".to_string(), *self); // create new input handles for each input stream. let mut handles = sources.into_iter().map(|s| builder.new_input(s, Pipeline)).collect::>(); diff --git a/timely/src/dataflow/operators/core/enterleave.rs b/timely/src/dataflow/operators/core/enterleave.rs index af3e897fc..cbcd5a7ad 100644 --- a/timely/src/dataflow/operators/core/enterleave.rs +++ b/timely/src/dataflow/operators/core/enterleave.rs @@ -50,7 +50,7 @@ pub trait Enter<'outer, TOuter: Timestamp, TInner: Timestamp+Refines, C> /// }); /// }); /// ``` - fn enter<'inner>(self, inner: &Scope<'inner, TInner>) -> Stream<'inner, TInner, C>; + fn enter<'inner>(self, inner: Scope<'inner, TInner>) -> Stream<'inner, TInner, C>; } impl<'outer, TOuter, TInner, C> Enter<'outer, TOuter, TInner, C> for Stream<'outer, TOuter, C> @@ -59,7 +59,7 @@ where TInner: Timestamp + Refines, C: Container, { - fn enter<'inner>(self, inner: &Scope<'inner, TInner>) -> Stream<'inner, TInner, C> { + fn enter<'inner>(self, inner: Scope<'inner, TInner>) -> Stream<'inner, TInner, C> { // Validate that `inner` is a child of `self`'s scope. let inner_addr = inner.addr(); @@ -91,11 +91,7 @@ where self.connect_to(input, ingress, channel_id); } - Stream::new( - Source::new(0, input.port), - registrar, - inner.clone(), - ) + Stream::new(Source::new(0, input.port), registrar, inner) } } @@ -119,7 +115,7 @@ pub trait Leave<'inner, TInner: Timestamp, C> { /// }); /// }); /// ``` - fn leave<'outer, TOuter: Timestamp>(self, outer: &Scope<'outer, TOuter>) -> Stream<'outer, TOuter, C> where TInner: Refines; + fn leave<'outer, TOuter: Timestamp>(self, outer: Scope<'outer, TOuter>) -> Stream<'outer, TOuter, C> where TInner: Refines; } impl<'inner, TInner, C> Leave<'inner, TInner, C> for Stream<'inner, TInner, C> @@ -127,7 +123,7 @@ where TInner: Timestamp, C: Container, { - fn leave<'outer, TOuter: Timestamp>(self, outer: &Scope<'outer, TOuter>) -> Stream<'outer, TOuter, C> where TInner: Refines { + fn leave<'outer, TOuter: Timestamp>(self, outer: Scope<'outer, TOuter>) -> Stream<'outer, TOuter, C> where TInner: Refines { let scope = self.scope(); @@ -156,11 +152,7 @@ where self.connect_to(target, egress, channel_id); } - Stream::new( - output, - registrar, - outer.clone(), - ) + Stream::new(output, registrar, outer) } } diff --git a/timely/src/dataflow/operators/core/feedback.rs b/timely/src/dataflow/operators/core/feedback.rs index a551055d7..4c0686713 100644 --- a/timely/src/dataflow/operators/core/feedback.rs +++ b/timely/src/dataflow/operators/core/feedback.rs @@ -72,7 +72,7 @@ impl<'scope, T: Timestamp> Feedback<'scope, T> for Scope<'scope, T> { fn feedback(&self, summary: ::Summary) -> (Handle<'scope, T, C>, Stream<'scope, T, C>) { - let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone()); + let mut builder = OperatorBuilder::new("Feedback".to_owned(), *self); let (output, stream) = builder.new_output(); (Handle { builder, summary, output }, stream) diff --git a/timely/src/dataflow/operators/core/input.rs b/timely/src/dataflow/operators/core/input.rs index 247779163..94ed474d2 100644 --- a/timely/src/dataflow/operators/core/input.rs +++ b/timely/src/dataflow/operators/core/input.rs @@ -175,7 +175,7 @@ impl<'scope, T: Timestamp + TotalOrder> Input<'scope> for Scope<'scope, T> { copies, })); - Stream::new(Source::new(index, 0), registrar, self.clone()) + Stream::new(Source::new(index, 0), registrar, *self) } } @@ -336,7 +336,7 @@ impl> Handle { /// } /// }); /// ``` - pub fn to_stream<'scope>(&mut self, scope: &Scope<'scope, T>) -> Stream<'scope, T, CB::Container> + pub fn to_stream<'scope>(&mut self, scope: Scope<'scope, T>) -> Stream<'scope, T, CB::Container> where T: TotalOrder, { diff --git a/timely/src/dataflow/operators/core/to_stream.rs b/timely/src/dataflow/operators/core/to_stream.rs index 7b8b5a7d8..ef7b7648a 100644 --- a/timely/src/dataflow/operators/core/to_stream.rs +++ b/timely/src/dataflow/operators/core/to_stream.rs @@ -29,11 +29,11 @@ pub trait ToStreamBuilder { /// /// assert_eq!(data1.extract(), data2.extract()); /// ``` - fn to_stream_with_builder<'scope, T: Timestamp>(self, scope: &Scope<'scope, T>) -> Stream<'scope, T, CB::Container>; + fn to_stream_with_builder<'scope, T: Timestamp>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, CB::Container>; } impl ToStreamBuilder for I where CB: PushInto { - fn to_stream_with_builder<'scope, T: Timestamp>(self, scope: &Scope<'scope, T>) -> Stream<'scope, T, CB::Container> { + fn to_stream_with_builder<'scope, T: Timestamp>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, CB::Container> { source::<_, CB, _, _>(scope, "ToStreamBuilder", |capability, info| { @@ -79,11 +79,11 @@ pub trait ToStream { /// /// assert_eq!(data1.extract(), data2.extract()); /// ``` - fn to_stream<'scope, T: Timestamp>(self, scope: &Scope<'scope, T>) -> Stream<'scope, T, C>; + fn to_stream<'scope, T: Timestamp>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, C>; } impl ToStream for I where C: PushInto { - fn to_stream<'scope, T: Timestamp>(self, scope: &Scope<'scope, T>) -> Stream<'scope, T, C> { + fn to_stream<'scope, T: Timestamp>(self, scope: Scope<'scope, T>) -> Stream<'scope, T, C> { ToStreamBuilder::>::to_stream_with_builder(self, scope) } } diff --git a/timely/src/dataflow/operators/core/unordered_input.rs b/timely/src/dataflow/operators/core/unordered_input.rs index 1bb66f7d5..01f94cc30 100644 --- a/timely/src/dataflow/operators/core/unordered_input.rs +++ b/timely/src/dataflow/operators/core/unordered_input.rs @@ -105,7 +105,7 @@ impl<'scope, T: Timestamp> UnorderedInput<'scope, T> for Scope<'scope, T> { peers, })); - ((helper, cap), Stream::new(Source::new(index, 0), registrar, self.clone())) + ((helper, cap), Stream::new(Source::new(index, 0), registrar, *self)) } } diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index 27cdfb454..90a750be0 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -135,7 +135,7 @@ impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> { self.shape.outputs += 1; let (target, registrar) = Tee::new(); let source = Source::new(self.slot.index(), new_output); - let stream = Stream::new(source, registrar, self.scope.clone()); + let stream = Stream::new(source, registrar, self.scope); for (input, entry) in connection { self.summary[input].add_port(new_output, entry); diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index cdf4445b2..939c38296 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -537,13 +537,13 @@ impl<'scope, T: Timestamp, C1: Container> Operator<'scope, T, C1> for Stream<'sc /// .inspect(|x| println!("number: {:?}", x)); /// }); /// ``` -pub fn source<'scope, T: Timestamp, CB, B, L>(scope: &Scope<'scope, T>, name: &str, constructor: B) -> Stream<'scope, T, CB::Container> +pub fn source<'scope, T: Timestamp, CB, B, L>(scope: Scope<'scope, T>, name: &str, constructor: B) -> Stream<'scope, T, CB::Container> where CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut OutputBuilderSession<'_, T, CB>)+'static { - let mut builder = OperatorBuilder::new(name.to_owned(), scope.clone()); + let mut builder = OperatorBuilder::new(name.to_owned(), scope); let operator_info = builder.operator_info(); let (output, stream) = builder.new_output(); @@ -581,7 +581,7 @@ where /// /// }); /// ``` -pub fn empty<'scope, T: Timestamp, C: Container>(scope: &Scope<'scope, T>) -> Stream<'scope, T, C> { +pub fn empty<'scope, T: Timestamp, C: Container>(scope: Scope<'scope, T>) -> Stream<'scope, T, C> { source::<_, CapacityContainerBuilder, _, _>(scope, "Empty", |_capability, _info| |_output| { // drop capability, do nothing }) diff --git a/timely/src/dataflow/operators/vec/flow_controlled.rs b/timely/src/dataflow/operators/vec/flow_controlled.rs index 0c9d489fc..051186428 100644 --- a/timely/src/dataflow/operators/vec/flow_controlled.rs +++ b/timely/src/dataflow/operators/vec/flow_controlled.rs @@ -75,7 +75,7 @@ pub fn iterator_source< DI: IntoIterator, I: IntoIterator, F: FnMut(&T)->Option>+'static>( - scope: &Scope<'scope, T>, + scope: Scope<'scope, T>, name: &str, mut input_f: F, probe: Handle, diff --git a/timely/src/dataflow/operators/vec/to_stream.rs b/timely/src/dataflow/operators/vec/to_stream.rs index 196a54c66..74e1092a8 100644 --- a/timely/src/dataflow/operators/vec/to_stream.rs +++ b/timely/src/dataflow/operators/vec/to_stream.rs @@ -22,11 +22,11 @@ pub trait ToStream { /// /// assert_eq!(data1.extract(), data2.extract()); /// ``` - fn to_stream<'scope, T: Timestamp>(self, scope: &Scope<'scope, T>) -> StreamVec<'scope, T, D>; + fn to_stream<'scope, T: Timestamp>(self, scope: Scope<'scope, T>) -> StreamVec<'scope, T, D>; } impl ToStream for I { - fn to_stream<'scope, T: Timestamp>(self, scope: &Scope<'scope, T>) -> StreamVec<'scope, T, I::Item> { + fn to_stream<'scope, T: Timestamp>(self, scope: Scope<'scope, T>) -> StreamVec<'scope, T, I::Item> { ToStreamCore::to_stream(self, scope) } } diff --git a/timely/src/dataflow/scope.rs b/timely/src/dataflow/scope.rs index 9eba1e93c..75c1a45c7 100644 --- a/timely/src/dataflow/scope.rs +++ b/timely/src/dataflow/scope.rs @@ -15,13 +15,13 @@ pub type Iterative<'scope, TOuter, TInner> = Scope<'scope, Product { /// The subgraph under assembly. /// - /// Stored as `Rc>` so that multiple `Scope` clones can work on the same subgraph. + /// Stored as `Rc>` so that multiple `Scope` copies can work on the same subgraph. /// All methods on this type must release their borrow on this field before returning. pub(crate) subgraph: &'scope RefCell>, /// The worker hosting this scope. @@ -62,7 +62,7 @@ impl<'scope, T: Timestamp> Scope<'scope, T> { let index = self.subgraph.borrow_mut().allocate_child_id(); let identifier = self.worker().new_identifier(); OperatorSlot { - scope: self.clone(), + scope: *self, index, identifier, installed: false, @@ -97,7 +97,7 @@ impl<'scope, T: Timestamp> Scope<'scope, T> { pub fn scoped(&self, name: &str, func: F) -> R where T2: Timestamp + Refines, - F: FnOnce(&Scope) -> R, + F: FnOnce(Scope) -> R, { let (result, subgraph, slot) = self.scoped_raw(name, func); slot.install(Box::new(subgraph)); @@ -110,7 +110,7 @@ impl<'scope, T: Timestamp> Scope<'scope, T> { pub fn scoped_raw(&self, name: &str, func: F) -> (R, Subgraph, OperatorSlot<'scope, T>) where T2: Timestamp + Refines, - F: FnOnce(&Scope) -> R, + F: FnOnce(Scope) -> R, { let slot = self.reserve_operator(); let path = slot.addr(); @@ -125,8 +125,7 @@ impl<'scope, T: Timestamp> Scope<'scope, T> { let child = Scope { subgraph: &subgraph, worker: self.worker }; - let result = func(&child); - drop(child); + let result = func(child); let subgraph = subgraph.into_inner().build(self.worker); (result, subgraph, slot) } @@ -155,7 +154,7 @@ impl<'scope, T: Timestamp> Scope<'scope, T> { pub fn iterative(&self, func: F) -> R where T2: Timestamp, - F: FnOnce(&Scope>) -> R, + F: FnOnce(Scope>) -> R, { self.scoped::, R, F>("Iterative", func) } @@ -183,7 +182,7 @@ impl<'scope, T: Timestamp> Scope<'scope, T> { /// ``` pub fn region(&self, func: F) -> R where - F: FnOnce(&Scope) -> R, + F: FnOnce(Scope) -> R, { self.region_named("Region", func) } @@ -214,19 +213,15 @@ impl<'scope, T: Timestamp> Scope<'scope, T> { /// ``` pub fn region_named(&self, name: &str, func: F) -> R where - F: FnOnce(&Scope) -> R, + F: FnOnce(Scope) -> R, { self.scoped::(name, func) } } +impl<'scope, T: Timestamp> Copy for Scope<'scope, T> {} impl<'scope, T: Timestamp> Clone for Scope<'scope, T> { - fn clone(&self) -> Self { - Scope { - subgraph: self.subgraph, - worker: self.worker, - } - } + fn clone(&self) -> Self { *self } } impl<'scope, T: Timestamp> std::fmt::Debug for Scope<'scope, T> { diff --git a/timely/src/dataflow/stream.rs b/timely/src/dataflow/stream.rs index e5c70247f..52ef2903a 100644 --- a/timely/src/dataflow/stream.rs +++ b/timely/src/dataflow/stream.rs @@ -29,7 +29,7 @@ impl<'scope, T: Timestamp, C: Clone+'static> Clone for Stream<'scope, T, C> { fn clone(&self) -> Self { Self { name: self.name, - scope: self.scope.clone(), + scope: self.scope, ports: self.ports.clone(), } } @@ -70,7 +70,7 @@ impl<'scope, T: Timestamp, C> Stream<'scope, T, C> { /// The name of the stream's source operator. pub fn name(&self) -> &Source { &self.name } /// The scope immediately containing the stream. - pub fn scope(&self) -> Scope<'scope, T> { self.scope.clone() } + pub fn scope(&self) -> Scope<'scope, T> { self.scope } /// Allows the assertion of a container type, for the benefit of type inference. /// diff --git a/timely/src/execute.rs b/timely/src/execute.rs index 1c0cc9eb6..0afbfecc0 100644 --- a/timely/src/execute.rs +++ b/timely/src/execute.rs @@ -124,7 +124,7 @@ impl Config { pub fn example(func: F) -> T where T: Send+'static, - F: FnOnce(&Scope)->T+Send+Sync+'static + F: FnOnce(Scope)->T+Send+Sync+'static { crate::execute::execute_directly(|worker| worker.dataflow(|scope| func(scope))) } diff --git a/timely/src/synchronization/sequence.rs b/timely/src/synchronization/sequence.rs index 9b06fe8b4..186cb38e8 100644 --- a/timely/src/synchronization/sequence.rs +++ b/timely/src/synchronization/sequence.rs @@ -109,10 +109,9 @@ impl Sequencer { let activator_sink = Rc::clone(&activator); // build a dataflow used to serialize and circulate commands - worker.dataflow::(move |dataflow| { + worker.dataflow::(move |scope| { - let scope = dataflow.clone(); - let peers = dataflow.peers(); + let peers = scope.peers(); let mut recvd = Vec::new(); @@ -120,7 +119,7 @@ impl Sequencer { let mut counter = 0; // a source that attempts to pull from `recv` and produce commands for everyone - source(dataflow, "SequenceInput", move |capability, info| { + source(scope, "SequenceInput", move |capability, info| { // initialize activator, now that we have the address activator_source diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 50aef59f6..c2170d281 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -558,7 +558,7 @@ impl Worker { pub fn dataflow(&mut self, func: F) -> R where T: Refines<()>, - F: FnOnce(&Scope)->R, + F: FnOnce(Scope)->R, { self.dataflow_core("Dataflow", self.logging(), Box::new(()), |_, child| func(child)) } @@ -581,7 +581,7 @@ impl Worker { pub fn dataflow_named(&mut self, name: &str, func: F) -> R where T: Refines<()>, - F: FnOnce(&Scope)->R, + F: FnOnce(Scope)->R, { self.dataflow_core(name, self.logging(), Box::new(()), |_, child| func(child)) } @@ -614,7 +614,7 @@ impl Worker { pub fn dataflow_core(&mut self, name: &str, mut logging: Option, mut resources: V, func: F) -> R where T: Refines<()>, - F: FnOnce(&mut V, &Scope)->R, + F: FnOnce(&mut V, Scope)->R, V: Any+'static, { let dataflow_index = self.allocate_dataflow_index(); @@ -631,7 +631,7 @@ impl Worker { subgraph: &subscope, worker: self, }; - func(&mut resources, &builder) + func(&mut resources, builder) }; let operator = subscope.into_inner().build(self); From 27b82c11a1f8895f168959f722853b1693c55a16 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 13 Apr 2026 06:09:19 -0400 Subject: [PATCH 8/9] Doccomment correction --- timely/src/dataflow/scope.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/timely/src/dataflow/scope.rs b/timely/src/dataflow/scope.rs index 75c1a45c7..fb922a988 100644 --- a/timely/src/dataflow/scope.rs +++ b/timely/src/dataflow/scope.rs @@ -21,7 +21,7 @@ pub type Iterative<'scope, TOuter, TInner> = Scope<'scope, Product { /// The subgraph under assembly. /// - /// Stored as `Rc>` so that multiple `Scope` copies can work on the same subgraph. + /// Stored as `RefCell<...>` so that multiple `Scope` copies can work on the same subgraph. /// All methods on this type must release their borrow on this field before returning. pub(crate) subgraph: &'scope RefCell>, /// The worker hosting this scope. From 59d8c91bba38bd67222ac7e5b07f85e4b05c15fe Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 13 Apr 2026 06:55:31 -0400 Subject: [PATCH 9/9] Remove now-redundant closure --- timely/src/execute.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/timely/src/execute.rs b/timely/src/execute.rs index 0afbfecc0..a77c0cf36 100644 --- a/timely/src/execute.rs +++ b/timely/src/execute.rs @@ -126,7 +126,7 @@ where T: Send+'static, F: FnOnce(Scope)->T+Send+Sync+'static { - crate::execute::execute_directly(|worker| worker.dataflow(|scope| func(scope))) + crate::execute::execute_directly(|worker| worker.dataflow(func)) }