diff --git a/src/operators/arrange.rs b/src/operators/arrange.rs index 5f2ffa3a38..cb5b685878 100644 --- a/src/operators/arrange.rs +++ b/src/operators/arrange.rs @@ -50,7 +50,7 @@ pub struct TraceWriter where T: Lattice+Ord+Clone+'static, Tr: Trace, Tr::Batch: Batch { phantom: ::std::marker::PhantomData<(K, V, R)>, trace: Weak>>, - queues: Rc,Vec, Option<(T, Tr::Batch)>)>>>>)>>, + queues: Rc,Vec>)>>, } impl TraceWriter @@ -63,8 +63,9 @@ where T: Lattice+Ord+Clone+'static, Tr: Trace, Tr::Batch: Batch, Tr::Batch: Batch = Rc<(Activator, RefCell, Option<(T, >::Batch)>)>>)>; +type TraceAgentQueueWriter = Weak<(Activator, RefCell, Option<(T, >::Batch)>)>>)>; /// A `TraceReader` wrapper which can be imported into other dataflows. /// @@ -114,7 +120,7 @@ pub struct TraceAgent where T: Lattice+Ord+Clone+'static, Tr: TraceReader { phantom: ::std::marker::PhantomData<(K, V, R)>, trace: Rc>>, - queues: Weak,Vec, Option<(T, Tr::Batch)>)>>>>)>>, + queues: Weak,Vec>)>>, advance: Vec, through: Vec, } @@ -173,7 +179,7 @@ where T: Timestamp+Lattice, Tr: TraceReader { /// /// The queue will be immediately populated with existing historical batches from the trace, and until the reference /// is dropped the queue will receive new batches as produced by the source `arrange` operator. - pub fn new_listener(&mut self) -> Rc, Option<(T, >::Batch)>)>>> where T: Default { + pub fn new_listener(&mut self, activator: Activator) -> TraceAgentQueueReader where T: Default { // create a new queue for progress and batch information. let mut new_queue = VecDeque::new(); @@ -183,18 +189,19 @@ where T: Timestamp+Lattice, Tr: TraceReader { new_queue.push_back((vec![T::default()], Some((T::default(), batch.clone())))); }); - let reference = Rc::new(RefCell::new(new_queue)); + let reference = Rc::new((activator, RefCell::new(new_queue))); // wraps the queue in a ref-counted ref cell and enqueue/return it. if let Some(queue) = self.queues.upgrade() { let mut borrow = queue.borrow_mut(); - reference.borrow_mut().push_back((borrow.0.clone(), None)); + reference.1.borrow_mut().push_back((borrow.0.clone(), None)); borrow.1.push(Rc::downgrade(&reference)); } else { // if the trace is closed, send a final signal. - reference.borrow_mut().push_back((Vec::new(), None)); + reference.1.borrow_mut().push_back((Vec::new(), None)); } + reference.0.activate(); reference } @@ -263,16 +270,19 @@ where T: Lattice+Ord+Clone+'static, Tr: TraceReader { /// ``` pub fn import>(&mut self, scope: &G) -> Arranged> where T: Timestamp { - let queue = self.new_listener(); + let trace = self.clone(); - let collection = source(scope, "ArrangedSource", move |capability, _info| { + let stream = source(scope, "ArrangedSource", move |capability, info| { + + let activator = scope.activator_for(&info.address[..]); + let queue = self.new_listener(activator); // capabilities the source maintains. let mut capabilities = vec![capability]; move |output| { - let mut borrow = queue.borrow_mut(); + let mut borrow = queue.1.borrow_mut(); while let Some((frontier, sent)) = borrow.pop_front() { // if data are associated, send em! @@ -302,10 +312,7 @@ where T: Lattice+Ord+Clone+'static, Tr: TraceReader { } }); - Arranged { - stream: collection, - trace: self.clone(), - } + Arranged { stream, trace } } } diff --git a/tests/trace.rs b/tests/trace.rs index ed704d5956..54191c69c6 100644 --- a/tests/trace.rs +++ b/tests/trace.rs @@ -17,7 +17,7 @@ pub type OrdValSpine = Spine> type IntegerTrace = OrdValSpine, u64, usize, i64>; fn get_trace() -> Spine, u64, usize, i64, Rc, u64, usize, i64>>> { - let op_info = OperatorInfo::new(0, 0); + let op_info = OperatorInfo::new(0, 0, &[]); let mut trace = IntegerTrace::new(op_info, None); { let mut batcher = <<