Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
comnik committed Mar 8, 2019
1 parent 9b63eb3 commit c7d8e8d
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 52 deletions.
76 changes: 41 additions & 35 deletions src/lib.rs
Expand Up @@ -44,7 +44,7 @@ use differential_dataflow::operators::Threshold;
use differential_dataflow::trace::implementations::ord::{OrdKeySpine, OrdValSpine};
use differential_dataflow::trace::wrappers::enter::TraceEnter;
use differential_dataflow::trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
use differential_dataflow::{Collection, Data};

pub use num_rational::Rational32;
Expand Down Expand Up @@ -109,10 +109,10 @@ pub type ResultDiff<T> = (Vec<Value>, T, isize);
pub struct Datom(pub Eid, pub Aid, pub Value);

/// A trace of values indexed by self.
pub type TraceKeyHandle<K, T, R> = TraceAgent<K, (), T, R, OrdKeySpine<K, T, R>>;
pub type TraceKeyHandle<K, T, R> = TraceAgent<OrdKeySpine<K, T, R>>;

/// A trace of (K, V) pairs indexed by key.
pub type TraceValHandle<K, V, T, R> = TraceAgent<K, V, T, R, OrdValSpine<K, V, T, R>>;
pub type TraceValHandle<K, V, T, R> = TraceAgent<OrdValSpine<K, V, T, R>>;

/// A handle to an arranged relation.
pub type RelationHandle<T> = TraceKeyHandle<Vec<Value>, T, isize>;
Expand Down Expand Up @@ -307,13 +307,19 @@ where
G::Timestamp: Lattice + Data,
K: Data,
V: Data,
TrCount: TraceReader<K, (), G::Timestamp, isize> + Clone,
TrPropose: TraceReader<K, V, G::Timestamp, isize> + Clone,
TrValidate: TraceReader<(K, V), (), G::Timestamp, isize> + Clone,
TrCount: TraceReader<Key=K, Val=(), Time=G::Timestamp, R=isize> + Clone,
TrCount::Batch: BatchReader<TrCount::Key,TrCount::Val,G::Timestamp,TrCount::R>+'static,
TrCount::Cursor: Cursor<TrCount::Key,TrCount::Val,G::Timestamp,TrCount::R>+'static,
TrPropose: TraceReader<Key=K, Val=V, Time=G::Timestamp, R=isize> + Clone,
TrPropose::Batch: BatchReader<TrPropose::Key,TrPropose::Val,G::Timestamp,TrPropose::R>+'static,
TrPropose::Cursor: Cursor<TrPropose::Key,TrPropose::Val,G::Timestamp,TrPropose::R>+'static,
TrValidate: TraceReader<Key=(K, V), Val=(), Time=G::Timestamp, R=isize> + Clone,
TrValidate::Batch: BatchReader<TrValidate::Key,TrValidate::Val,G::Timestamp,TrValidate::R>+'static,
TrValidate::Cursor: Cursor<TrValidate::Key,TrValidate::Val,G::Timestamp,TrValidate::R>+'static,
{
count: Arranged<G, K, (), isize, TrCount>,
propose: Arranged<G, K, V, isize, TrPropose>,
validate: Arranged<G, (K, V), (), isize, TrValidate>,
count: Arranged<G, TrCount>,
propose: Arranged<G, TrPropose>,
validate: Arranged<G, TrValidate>,
}

impl<G, K, V, TrCount, TrPropose, TrValidate> Clone
Expand All @@ -323,9 +329,15 @@ where
G::Timestamp: Lattice + Data,
K: Data,
V: Data,
TrCount: TraceReader<K, (), G::Timestamp, isize> + Clone,
TrPropose: TraceReader<K, V, G::Timestamp, isize> + Clone,
TrValidate: TraceReader<(K, V), (), G::Timestamp, isize> + Clone,
TrCount: TraceReader<Key=K, Val=(), Time=G::Timestamp, R=isize> + Clone,
TrCount::Batch: BatchReader<TrCount::Key,TrCount::Val,G::Timestamp,TrCount::R>+'static,
TrCount::Cursor: Cursor<TrCount::Key,TrCount::Val,G::Timestamp,TrCount::R>+'static,
TrPropose: TraceReader<Key=K, Val=V, Time=G::Timestamp, R=isize> + Clone,
TrPropose::Batch: BatchReader<TrPropose::Key,TrPropose::Val,G::Timestamp,TrPropose::R>+'static,
TrPropose::Cursor: Cursor<TrPropose::Key,TrPropose::Val,G::Timestamp,TrPropose::R>+'static,
TrValidate: TraceReader<Key=(K, V), Val=(), Time=G::Timestamp, R=isize> + Clone,
TrValidate::Batch: BatchReader<TrValidate::Key,TrValidate::Val,G::Timestamp,TrValidate::R>+'static,
TrValidate::Cursor: Cursor<TrValidate::Key,TrValidate::Val,G::Timestamp,TrValidate::R>+'static,
{
fn clone(&self) -> Self {
LiveIndex {
Expand All @@ -342,9 +354,15 @@ where
G::Timestamp: Lattice + Data,
K: Data,
V: Data,
TrCount: TraceReader<K, (), G::Timestamp, isize> + Clone,
TrPropose: TraceReader<K, V, G::Timestamp, isize> + Clone,
TrValidate: TraceReader<(K, V), (), G::Timestamp, isize> + Clone,
TrCount: TraceReader<Key=K, Val=(), Time=G::Timestamp, R=isize> + Clone,
TrCount::Batch: BatchReader<TrCount::Key,TrCount::Val,G::Timestamp,TrCount::R>+'static,
TrCount::Cursor: Cursor<TrCount::Key,TrCount::Val,G::Timestamp,TrCount::R>+'static,
TrPropose: TraceReader<Key=K, Val=V, Time=G::Timestamp, R=isize> + Clone,
TrPropose::Batch: BatchReader<TrPropose::Key,TrPropose::Val,G::Timestamp,TrPropose::R>+'static,
TrPropose::Cursor: Cursor<TrPropose::Key,TrPropose::Val,G::Timestamp,TrPropose::R>+'static,
TrValidate: TraceReader<Key=(K, V), Val=(), Time=G::Timestamp, R=isize> + Clone,
TrValidate::Batch: BatchReader<TrValidate::Key,TrValidate::Val,G::Timestamp,TrValidate::R>+'static,
TrValidate::Cursor: Cursor<TrValidate::Key,TrValidate::Val,G::Timestamp,TrValidate::R>+'static,
{
/// Brings the index's traces into the specified scope.
pub fn enter<'a, TInner>(
Expand All @@ -354,9 +372,9 @@ where
Child<'a, G, TInner>,
K,
V,
TraceEnter<K, (), G::Timestamp, isize, TrCount, TInner>,
TraceEnter<K, V, G::Timestamp, isize, TrPropose, TInner>,
TraceEnter<(K, V), (), G::Timestamp, isize, TrValidate, TInner>,
TraceEnter<TrCount, TInner>,
TraceEnter<TrPropose, TInner>,
TraceEnter<TrValidate, TInner>,
>
where
TrCount::Batch: Clone,
Expand Down Expand Up @@ -385,9 +403,9 @@ where
Child<'a, G, TInner>,
K,
V,
TraceEnterAt<K, (), G::Timestamp, isize, TrCount, TInner, FCount>,
TraceEnterAt<K, V, G::Timestamp, isize, TrPropose, TInner, FPropose>,
TraceEnterAt<(K, V), (), G::Timestamp, isize, TrValidate, TInner, FValidate>,
TraceEnterAt<TrCount, TInner, FCount>,
TraceEnterAt<TrPropose, TInner, FPropose>,
TraceEnterAt<TrValidate, TInner, FValidate>,
>
where
TrCount::Batch: Clone,
Expand Down Expand Up @@ -457,13 +475,7 @@ where
fn arrange_by_variables(
self,
variables: &[Var],
) -> Arranged<
Iterative<'a, G, u64>,
Vec<Value>,
Vec<Value>,
isize,
TraceValHandle<Vec<Value>, Vec<Value>, Product<G::Timestamp, u64>, isize>,
>;
) -> Arranged<Iterative<'a, G, u64>, TraceValHandle<Vec<Value>, Vec<Value>, Product<G::Timestamp, u64>, isize>>;
}

/// A collection and variable bindings.
Expand Down Expand Up @@ -542,13 +554,7 @@ where
fn arrange_by_variables(
self,
variables: &[Var],
) -> Arranged<
Iterative<'a, G, u64>,
Vec<Value>,
Vec<Value>,
isize,
TraceValHandle<Vec<Value>, Vec<Value>, Product<G::Timestamp, u64>, isize>,
> {
) -> Arranged<Iterative<'a, G, u64>, TraceValHandle<Vec<Value>, Vec<Value>, Product<G::Timestamp, u64>, isize>> {
self.tuples_by_variables(variables).arrange()
}
}
Expand Down
24 changes: 18 additions & 6 deletions src/plan/hector.rs
Expand Up @@ -885,9 +885,15 @@ where
K: Data,
V: Data,
F: Fn(&P) -> K,
TrCount: TraceReader<K, (), S::Timestamp, isize> + Clone + 'static,
TrPropose: TraceReader<K, V, S::Timestamp, isize> + Clone + 'static,
TrValidate: TraceReader<(K, V), (), S::Timestamp, isize> + Clone + 'static,
TrCount: TraceReader<Key=K, Val=(), Time=S::Timestamp, R=isize> + Clone + 'static,
TrCount::Batch: BatchReader<TrCount::Key,TrCount::Val,S::Timestamp,TrCount::R>+'static,
TrCount::Cursor: Cursor<TrCount::Key,TrCount::Val,S::Timestamp,TrCount::R>+'static,
TrPropose: TraceReader<Key=K, Val=V, Time=S::Timestamp, R=isize> + Clone + 'static,
TrPropose::Batch: BatchReader<TrPropose::Key,TrPropose::Val,S::Timestamp,TrPropose::R>+'static,
TrPropose::Cursor: Cursor<TrPropose::Key,TrPropose::Val,S::Timestamp,TrPropose::R>+'static,
TrValidate: TraceReader<Key=(K, V), Val=(), Time=S::Timestamp, R=isize> + Clone + 'static,
TrValidate::Batch: BatchReader<TrValidate::Key,TrValidate::Val,S::Timestamp,TrValidate::R>+'static,
TrValidate::Cursor: Cursor<TrValidate::Key,TrValidate::Val,S::Timestamp,TrValidate::R>+'static,
{
phantom: std::marker::PhantomData<P>,
indices: LiveIndex<S, K, V, TrCount, TrPropose, TrValidate>,
Expand All @@ -904,9 +910,15 @@ where
V: Data + Hash,
P: Data,
F: Fn(&P) -> K + 'static,
TrCount: TraceReader<K, (), S::Timestamp, isize> + Clone + 'static,
TrPropose: TraceReader<K, V, S::Timestamp, isize> + Clone + 'static,
TrValidate: TraceReader<(K, V), (), S::Timestamp, isize> + Clone + 'static,
TrCount: TraceReader<Key=K, Val=(), Time=S::Timestamp, R=isize> + Clone + 'static,
TrCount::Batch: BatchReader<TrCount::Key,TrCount::Val,S::Timestamp,TrCount::R>+'static,
TrCount::Cursor: Cursor<TrCount::Key,TrCount::Val,S::Timestamp,TrCount::R>+'static,
TrPropose: TraceReader<Key=K, Val=V, Time=S::Timestamp, R=isize> + Clone + 'static,
TrPropose::Batch: BatchReader<TrPropose::Key,TrPropose::Val,S::Timestamp,TrPropose::R>+'static,
TrPropose::Cursor: Cursor<TrPropose::Key,TrPropose::Val,S::Timestamp,TrPropose::R>+'static,
TrValidate: TraceReader<Key=(K, V), Val=(), Time=S::Timestamp, R=isize> + Clone + 'static,
TrValidate::Batch: BatchReader<TrValidate::Key,TrValidate::Val,S::Timestamp,TrValidate::R>+'static,
TrValidate::Cursor: Cursor<TrValidate::Key,TrValidate::Val,S::Timestamp,TrValidate::R>+'static,
{
type Prefix = P;
type Extension = V;
Expand Down
13 changes: 2 additions & 11 deletions src/plan/pull.rs
Expand Up @@ -114,17 +114,8 @@ impl<P: Implementable> Implementable for PullLevel<P> {
let paths = input.tuples();
let e_path: Arranged<
Iterative<S, u64>,
Value,
Vec<Value>,
isize,
TraceAgent<
Value,
Vec<Value>,
Product<T, u64>,
isize,
OrdValSpine<Value, Vec<Value>, Product<T, u64>, isize>,
>,
> = paths.map(|t| (t.last().unwrap().clone(), t)).arrange();
TraceAgent<OrdValSpine<Value, Vec<Value>, Product<T, u64>, isize>>>
= paths.map(|t| (t.last().unwrap().clone(), t)).arrange();

let mut shutdown_handle = shutdown_input;
let streams = self.pull_attributes.iter().map(|a| {
Expand Down

0 comments on commit c7d8e8d

Please sign in to comment.