Skip to content

Commit

Permalink
trace import should work
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Dec 21, 2018
1 parent f744aa8 commit 97ad604
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 18 deletions.
41 changes: 24 additions & 17 deletions src/operators/arrange.rs
Expand Up @@ -50,7 +50,7 @@ pub struct TraceWriter<K, V, T, R, Tr>
where T: Lattice+Ord+Clone+'static, Tr: Trace<K,V,T,R>, Tr::Batch: Batch<K,V,T,R> {
phantom: ::std::marker::PhantomData<(K, V, R)>,
trace: Weak<RefCell<TraceBox<K, V, T, R, Tr>>>,
queues: Rc<RefCell<(Vec<T>,Vec<Weak<RefCell<VecDeque<(Vec<T>, Option<(T, Tr::Batch)>)>>>>)>>,
queues: Rc<RefCell<(Vec<T>,Vec<TraceAgentQueueWriter<K,V,T,R,Tr>>)>>,
}

impl<K, V, T, R, Tr> TraceWriter<K, V, T, R, Tr>
Expand All @@ -63,8 +63,9 @@ where T: Lattice+Ord+Clone+'static, Tr: Trace<K,V,T,R>, Tr::Batch: Batch<K,V,T,R
let mut borrow = self.queues.borrow_mut();
borrow.0 = frontier.to_vec();
for queue in borrow.1.iter_mut() {
if let Some(mut queue) = queue.upgrade() {
queue.borrow_mut().push_back((frontier.to_vec(), data.clone()));
if let Some(pair) = queue.upgrade() {
pair.1.borrow_mut().push_back((frontier.to_vec(), data.clone()));
pair.0.activate();
}
}
borrow.1.retain(|w| w.upgrade().is_some());
Expand Down Expand Up @@ -97,14 +98,19 @@ where T: Lattice+Ord+Clone+'static, Tr: Trace<K,V,T,R>, Tr::Batch: Batch<K,V,T,R

let mut borrow = self.queues.borrow_mut();
for queue in borrow.1.iter_mut() {
queue.upgrade().map(|queue| {
queue.borrow_mut().push_back((Vec::new(), None));
queue.upgrade().map(|pair| {
pair.1.borrow_mut().push_back((Vec::new(), None));
pair.0.activate();
});
}
borrow.1.retain(|w| w.upgrade().is_some());
}
}

use timely::scheduling::Activator;
// Short names for strongly and weakly owned activators and shared queues.
type TraceAgentQueueReader<K,V,T,R,Tr> = Rc<(Activator, RefCell<VecDeque<(Vec<T>, Option<(T, <Tr as TraceReader<K,V,T,R>>::Batch)>)>>)>;
type TraceAgentQueueWriter<K,V,T,R,Tr> = Weak<(Activator, RefCell<VecDeque<(Vec<T>, Option<(T, <Tr as TraceReader<K,V,T,R>>::Batch)>)>>)>;

/// A `TraceReader` wrapper which can be imported into other dataflows.
///
Expand All @@ -114,7 +120,7 @@ pub struct TraceAgent<K, V, T, R, Tr>
where T: Lattice+Ord+Clone+'static, Tr: TraceReader<K,V,T,R> {
phantom: ::std::marker::PhantomData<(K, V, R)>,
trace: Rc<RefCell<TraceBox<K, V, T, R, Tr>>>,
queues: Weak<RefCell<(Vec<T>,Vec<Weak<RefCell<VecDeque<(Vec<T>, Option<(T, Tr::Batch)>)>>>>)>>,
queues: Weak<RefCell<(Vec<T>,Vec<TraceAgentQueueWriter<K,V,T,R,Tr>>)>>,
advance: Vec<T>,
through: Vec<T>,
}
Expand Down Expand Up @@ -173,7 +179,7 @@ where T: Timestamp+Lattice, Tr: TraceReader<K,V,T,R> {
///
/// The queue will be immediately populated with existing historical batches from the trace, and until the reference
/// is dropped the queue will receive new batches as produced by the source `arrange` operator.
pub fn new_listener(&mut self) -> Rc<RefCell<VecDeque<(Vec<T>, Option<(T, <Tr as TraceReader<K,V,T,R>>::Batch)>)>>> where T: Default {
pub fn new_listener(&mut self, activator: Activator) -> TraceAgentQueueReader<K,V,T,R,Tr> where T: Default {

// create a new queue for progress and batch information.
let mut new_queue = VecDeque::new();
Expand All @@ -183,18 +189,19 @@ where T: Timestamp+Lattice, Tr: TraceReader<K,V,T,R> {
new_queue.push_back((vec![T::default()], Some((T::default(), batch.clone()))));
});

let reference = Rc::new(RefCell::new(new_queue));
let reference = Rc::new((activator, RefCell::new(new_queue)));

// wraps the queue in a ref-counted ref cell and enqueue/return it.
if let Some(queue) = self.queues.upgrade() {
let mut borrow = queue.borrow_mut();
reference.borrow_mut().push_back((borrow.0.clone(), None));
reference.1.borrow_mut().push_back((borrow.0.clone(), None));
borrow.1.push(Rc::downgrade(&reference));
}
else {
// if the trace is closed, send a final signal.
reference.borrow_mut().push_back((Vec::new(), None));
reference.1.borrow_mut().push_back((Vec::new(), None));
}
reference.0.activate();

reference
}
Expand Down Expand Up @@ -263,16 +270,19 @@ where T: Lattice+Ord+Clone+'static, Tr: TraceReader<K,V,T,R> {
/// ```
pub fn import<G: Scope<Timestamp=T>>(&mut self, scope: &G) -> Arranged<G, K, V, R, TraceAgent<K, V, T, R, Tr>> where T: Timestamp {

let queue = self.new_listener();
let trace = self.clone();

let collection = source(scope, "ArrangedSource", move |capability, _info| {
let stream = source(scope, "ArrangedSource", move |capability, info| {

let activator = scope.activator_for(&info.address[..]);
let queue = self.new_listener(activator);

// capabilities the source maintains.
let mut capabilities = vec![capability];

move |output| {

let mut borrow = queue.borrow_mut();
let mut borrow = queue.1.borrow_mut();
while let Some((frontier, sent)) = borrow.pop_front() {

// if data are associated, send em!
Expand Down Expand Up @@ -302,10 +312,7 @@ where T: Lattice+Ord+Clone+'static, Tr: TraceReader<K,V,T,R> {
}
});

Arranged {
stream: collection,
trace: self.clone(),
}
Arranged { stream, trace }
}
}

Expand Down
2 changes: 1 addition & 1 deletion tests/trace.rs
Expand Up @@ -17,7 +17,7 @@ pub type OrdValSpine<K, V, T, R> = Spine<K, V, T, R, Rc<OrdValBatch<K, V, T, R>>
type IntegerTrace = OrdValSpine<UnsignedWrapper<u64>, u64, usize, i64>;

fn get_trace() -> Spine<UnsignedWrapper<u64>, u64, usize, i64, Rc<OrdValBatch<UnsignedWrapper<u64>, u64, usize, i64>>> {
let op_info = OperatorInfo::new(0, 0);
let op_info = OperatorInfo::new(0, 0, &[]);
let mut trace = IntegerTrace::new(op_info, None);
{
let mut batcher = <<
Expand Down

0 comments on commit 97ad604

Please sign in to comment.