Skip to content


Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Mar 7, 2019
2 parents ae7974e + 971915c commit 7a79f6a
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 49 deletions.
3 changes: 1 addition & 2 deletions dogsdogsdogs/src/
Expand Up @@ -75,7 +75,6 @@ impl<T: Timestamp> Refines<T> for AltNeu<T> {
use differential_dataflow::lattice::Lattice;
impl<T: Lattice> Lattice for AltNeu<T> {
fn minimum() -> Self { AltNeu::alt(T::minimum()) }
fn maximum() -> Self { AltNeu::neu(T::maximum()) }
fn join(&self, other: &Self) -> Self {
let time = self.time.join(&other.time);
let mut neu = false;
Expand All @@ -98,4 +97,4 @@ impl<T: Lattice> Lattice for AltNeu<T> {
AltNeu { time, neu }
109 changes: 62 additions & 47 deletions dogsdogsdogs/src/
Expand Up @@ -11,6 +11,7 @@ extern crate serde;
use std::rc::Rc;
use std::collections::HashMap;
use std::hash::Hash;
use std::ops::Mul;

use timely::PartialOrder;
use timely::dataflow::Scope;
Expand All @@ -23,6 +24,8 @@ use timely::dataflow::operators::Concatenate;
use timely_sort::Unsigned;

use differential_dataflow::{Data, Collection, AsCollection, Hashable};
use differential_dataflow::operators::Threshold;
use differential_dataflow::difference::{Monoid};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::TraceAgent;
use differential_dataflow::operators::arrange::{ArrangeBySelf, ArrangeByKey};
Expand All @@ -38,29 +41,29 @@ pub mod altneu;
Implementors of `PrefixExtension` provide types and methods for extending a differential dataflow collection,
via the three methods `count`, `propose`, and `validate`.
pub trait PrefixExtender<G: Scope> {
pub trait PrefixExtender<G: Scope, R: Monoid+Mul<Output = R>> {
/// The required type of prefix to extend.
type Prefix;
/// The type to be produced as extension.
type Extension;
/// Annotates prefixes with the number of extensions the relation would propose.
fn count(&mut self, &Collection<G, (Self::Prefix, usize, usize)>, usize) -> Collection<G, (Self::Prefix, usize, usize)>;
fn count(&mut self, &Collection<G, (Self::Prefix, usize, usize), R>, usize) -> Collection<G, (Self::Prefix, usize, usize), R>;
/// Extends each prefix with corresponding extensions.
fn propose(&mut self, &Collection<G, Self::Prefix>) -> Collection<G, (Self::Prefix, Self::Extension)>;
fn propose(&mut self, &Collection<G, Self::Prefix, R>) -> Collection<G, (Self::Prefix, Self::Extension), R>;
/// Restricts proposed extensions by those the extender would have proposed.
fn validate(&mut self, &Collection<G, (Self::Prefix, Self::Extension)>) -> Collection<G, (Self::Prefix, Self::Extension)>;
fn validate(&mut self, &Collection<G, (Self::Prefix, Self::Extension), R>) -> Collection<G, (Self::Prefix, Self::Extension), R>;

pub trait ProposeExtensionMethod<G: Scope, P: Data+Ord> {
fn propose_using<PE: PrefixExtender<G, Prefix=P>>(&self, extender: &mut PE) -> Collection<G, (P, PE::Extension)>;
fn extend<E: Data+Ord>(&self, extenders: &mut [&mut PrefixExtender<G,Prefix=P,Extension=E>]) -> Collection<G, (P, E)>;
pub trait ProposeExtensionMethod<G: Scope, P: Data+Ord, R: Monoid+Mul<Output = R>> {
fn propose_using<PE: PrefixExtender<G, R, Prefix=P>>(&self, extender: &mut PE) -> Collection<G, (P, PE::Extension), R>;
fn extend<E: Data+Ord>(&self, extenders: &mut [&mut PrefixExtender<G,R,Prefix=P,Extension=E>]) -> Collection<G, (P, E), R>;

impl<G: Scope, P: Data+Ord> ProposeExtensionMethod<G, P> for Collection<G, P> {
fn propose_using<PE: PrefixExtender<G, Prefix=P>>(&self, extender: &mut PE) -> Collection<G, (P, PE::Extension)> {
impl<G: Scope, P: Data+Ord, R: Monoid+Mul<Output = R>> ProposeExtensionMethod<G, P, R> for Collection<G, P, R> {
fn propose_using<PE: PrefixExtender<G, R, Prefix=P>>(&self, extender: &mut PE) -> Collection<G, (P, PE::Extension), R> {
fn extend<E: Data+Ord>(&self, extenders: &mut [&mut PrefixExtender<G,Prefix=P,Extension=E>]) -> Collection<G, (P, E)>
fn extend<E: Data+Ord>(&self, extenders: &mut [&mut PrefixExtender<G,R,Prefix=P,Extension=E>]) -> Collection<G, (P, E), R>

if extenders.len() == 1 {
Expand Down Expand Up @@ -89,12 +92,12 @@ impl<G: Scope, P: Data+Ord> ProposeExtensionMethod<G, P> for Collection<G, P> {

pub trait ValidateExtensionMethod<G: Scope, P, E> {
fn validate_using<PE: PrefixExtender<G, Prefix=P, Extension=E>>(&self, extender: &mut PE) -> Collection<G, (P, E)>;
pub trait ValidateExtensionMethod<G: Scope, R: Monoid+Mul<Output = R>, P, E> {
fn validate_using<PE: PrefixExtender<G, R, Prefix=P, Extension=E>>(&self, extender: &mut PE) -> Collection<G, (P, E), R>;

impl<G: Scope, P, E> ValidateExtensionMethod<G, P, E> for Collection<G, (P, E)> {
fn validate_using<PE: PrefixExtender<G, Prefix=P, Extension=E>>(&self, extender: &mut PE) -> Collection<G, (P, E)> {
impl<G: Scope, R: Monoid+Mul<Output = R>, P, E> ValidateExtensionMethod<G, R, P, E> for Collection<G, (P, E), R> {
fn validate_using<PE: PrefixExtender<G, R, Prefix=P, Extension=E>>(&self, extender: &mut PE) -> Collection<G, (P, E), R> {
Expand All @@ -105,27 +108,29 @@ type TraceValHandle<K,V,T,R> = TraceAgent<K, V, T, R, TraceValSpine<K,V,T,R>>;
type TraceKeySpine<K,T,R> = Spine<K, (), T, R, Rc<OrdKeyBatch<K,T,R>>>;
type TraceKeyHandle<K,T,R> = TraceAgent<K, (), T, R, TraceKeySpine<K,T,R>>;

pub struct CollectionIndex<K, V, T>
pub struct CollectionIndex<K, V, T, R>
K: Data,
V: Data,
T: Lattice+Data,
R: Monoid+Mul<Output = R>,
/// A trace of type (K, ()), used to count extensions for each prefix.
count_trace: TraceKeyHandle<K, T, isize>,

/// A trace of type (K, V), used to propose extensions for each prefix.
propose_trace: TraceValHandle<K, V, T, isize>,
propose_trace: TraceValHandle<K, V, T, R>,

/// A trace of type ((K, V), ()), used to validate proposed extensions.
validate_trace: TraceKeyHandle<(K, V), T, isize>,
validate_trace: TraceKeyHandle<(K, V), T, R>,

impl<K, V, T> Clone for CollectionIndex<K, V, T>
impl<K, V, T, R> Clone for CollectionIndex<K, V, T, R>
K: Data+Hash,
V: Data+Hash,
T: Lattice+Data+Timestamp,
R: Monoid+Mul<Output = R>,
fn clone(&self) -> Self {
CollectionIndex {
Expand All @@ -136,25 +141,33 @@ where

impl<K, V, T> CollectionIndex<K, V, T>
impl<K, V, T, R> CollectionIndex<K, V, T, R>
K: Data+Hash,
V: Data+Hash,
T: Lattice+Data+Timestamp,
R: Monoid+Mul<Output = R>,
pub fn index<G: Scope<Timestamp=T>>(collection: &Collection<G, (K, V), isize>) -> Self {
let counts =|(k,_v)| k).arrange_by_self().trace;

pub fn index<G: Scope<Timestamp = T>>(collection: &Collection<G, (K, V), R>) -> Self {
// We need to count the number of (k, v) pairs and not rely on the given Monoid R and its binary addition operation.
// counts and validate can share the base arrangement
let arranged = collection.arrange_by_self();
let counts = arranged
.map(|(k, _v)| k)
let propose = collection.arrange_by_key().trace;
let validate = collection.arrange_by_self().trace;
let validate = arranged.trace;

CollectionIndex {
count_trace: counts,
propose_trace: propose,
validate_trace: validate,

pub fn extend_using<P, F: Fn(&P)->K>(&self, logic: F) -> CollectionExtender<K, V, T, P, F> {
pub fn extend_using<P, F: Fn(&P)->K>(&self, logic: F) -> CollectionExtender<K, V, T, R, P, F> {
CollectionExtender {
phantom: std::marker::PhantomData,
indices: self.clone(),
Expand All @@ -163,32 +176,34 @@ where

pub struct CollectionExtender<K, V, T, P, F>
pub struct CollectionExtender<K, V, T, R, P, F>
K: Data,
V: Data,
T: Lattice+Data,
R: Monoid+Mul<Output = R>,
F: Fn(&P)->K,
phantom: std::marker::PhantomData<P>,
indices: CollectionIndex<K, V, T>,
indices: CollectionIndex<K, V, T, R>,
key_selector: Rc<F>,

impl<G, K, V, P, F> PrefixExtender<G> for CollectionExtender<K, V, G::Timestamp, P, F>
impl<G, K, V, R, P, F> PrefixExtender<G, R> for CollectionExtender<K, V, G::Timestamp, R, P, F>
G: Scope,
K: Data+Hash,
V: Data+Hash,
P: Data,
G::Timestamp: Lattice+Data,
R: Monoid+Mul<Output = R>,
F: Fn(&P)->K+'static,

type Prefix = P;
type Extension = V;

fn count(&mut self, prefixes: &Collection<G, (P, usize, usize)>, index: usize) -> Collection<G, (P, usize, usize)> {
fn count(&mut self, prefixes: &Collection<G, (P, usize, usize), R>, index: usize) -> Collection<G, (P, usize, usize), R> {

// This method takes a stream of `(prefix, time, diff)` changes, and we want to produce the corresponding
// stream of `((prefix, count), time, diff)` changes, just by looking up `count` in `count_trace`. We are
Expand All @@ -202,7 +217,7 @@ where
let logic1 = self.key_selector.clone();
let logic2 = self.key_selector.clone();

let exchange = Exchange::new(move |update: &((P,usize,usize),G::Timestamp,isize)| logic1(&(update.0).0).hashed().as_u64());
let exchange = Exchange::new(move |update: &((P,usize,usize),G::Timestamp,R)| logic1(&(update.0).0).hashed().as_u64());

let mut buffer1 = Vec::new();
let mut buffer2 = Vec::new();
Expand Down Expand Up @@ -261,11 +276,11 @@ where
*diff = 0;
*diff = R::zero();

prefixes.retain(|ptd| ptd.2 != 0);
prefixes.retain(|ptd| !ptd.2.is_zero());
Expand All @@ -283,7 +298,7 @@ where

fn propose(&mut self, prefixes: &Collection<G, P>) -> Collection<G, (P, V)> {
fn propose(&mut self, prefixes: &Collection<G, P, R>) -> Collection<G, (P, V), R> {

// This method takes a stream of `(prefix, time, diff)` changes, and we want to produce the corresponding
// stream of `((prefix, count), time, diff)` changes, just by looking up `count` in `count_trace`. We are
Expand All @@ -300,7 +315,7 @@ where
let mut buffer1 = Vec::new();
let mut buffer2 = Vec::new();

let exchange = Exchange::new(move |update: &(P,G::Timestamp,isize)| logic1(&update.0).hashed().as_u64());
let exchange = Exchange::new(move |update: &(P,G::Timestamp,R)| logic1(&update.0).hashed().as_u64());

prefixes.inner.binary_frontier(&, exchange, Pipeline, "Propose", move |_,_| move |input1, input2, output| {

Expand Down Expand Up @@ -343,21 +358,21 @@ where
cursor.seek_key(&storage, &key);
if cursor.get_key(&storage) == Some(&key) {
while let Some(value) = cursor.get_val(&storage) {
let mut count = 0;
let mut count = R::zero();
cursor.map_times(&storage, |t, d| if t.less_equal(time) { count += d; });
// assert!(count >= 0);
if count > 0 {
session.give(((prefix.clone(), value.clone()), time.clone(), diff.clone()));
let prod = count * diff.clone();
if !prod.is_zero() {
session.give(((prefix.clone(), value.clone()), time.clone(), prod));
*diff = 0;
*diff = R::zero();

prefixes.retain(|ptd| ptd.2 != 0);
prefixes.retain(|ptd| !ptd.2.is_zero());
Expand All @@ -375,7 +390,7 @@ where

fn validate(&mut self, extensions: &Collection<G, (P, V)>) -> Collection<G, (P, V)> {
fn validate(&mut self, extensions: &Collection<G, (P, V), R>) -> Collection<G, (P, V), R> {

// This method takes a stream of `(prefix, time, diff)` changes, and we want to produce the corresponding
Expand All @@ -393,7 +408,7 @@ where
let mut buffer1 = Vec::new();
let mut buffer2 = Vec::new();

let exchange = Exchange::new(move |update: &((P,V),G::Timestamp,isize)|
let exchange = Exchange::new(move |update: &((P,V),G::Timestamp,R)|
(logic1(&(update.0).0).clone(), ((update.0).1).clone()).hashed().as_u64()

Expand Down Expand Up @@ -437,18 +452,18 @@ where
let key = (logic2(&prefix.0), (prefix.1).clone());
cursor.seek_key(&storage, &key);
if cursor.get_key(&storage) == Some(&key) {
let mut count = 0;
let mut count = R::zero();
cursor.map_times(&storage, |t, d| if t.less_equal(time) { count += d; });
// assert!(count >= 0);
if count > 0 {
session.give((prefix.clone(), time.clone(), diff.clone()));
let prod = count * diff.clone();
if !prod.is_zero(){
session.give((prefix.clone(), time.clone(), prod));
*diff = 0;
*diff = R::zero();

prefixes.retain(|ptd| ptd.2 != 0);
prefixes.retain(|ptd| !ptd.2.is_zero());
Expand Down

0 comments on commit 7a79f6a

Please sign in to comment.