Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monoid for worst case optimal joins #147

Merged
merged 9 commits into from Mar 4, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions dogsdogsdogs/src/altneu.rs
Expand Up @@ -75,7 +75,6 @@ impl<T: Timestamp> Refines<T> for AltNeu<T> {
use differential_dataflow::lattice::Lattice;
impl<T: Lattice> Lattice for AltNeu<T> {
fn minimum() -> Self { AltNeu::alt(T::minimum()) }
fn maximum() -> Self { AltNeu::neu(T::maximum()) }
fn join(&self, other: &Self) -> Self {
let time = self.time.join(&other.time);
let mut neu = false;
Expand All @@ -98,4 +97,4 @@ impl<T: Lattice> Lattice for AltNeu<T> {
}
AltNeu { time, neu }
}
}
}
105 changes: 60 additions & 45 deletions dogsdogsdogs/src/lib.rs
Expand Up @@ -11,6 +11,7 @@ extern crate serde;
use std::rc::Rc;
use std::collections::HashMap;
use std::hash::Hash;
use std::ops::Mul;

use timely::PartialOrder;
use timely::dataflow::Scope;
Expand All @@ -23,6 +24,8 @@ use timely::dataflow::operators::Concatenate;
use timely_sort::Unsigned;

use differential_dataflow::{Data, Collection, AsCollection, Hashable};
use differential_dataflow::operators::Threshold;
use differential_dataflow::difference::{Monoid};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::TraceAgent;
use differential_dataflow::operators::arrange::{ArrangeBySelf, ArrangeByKey};
Expand All @@ -38,29 +41,29 @@ pub mod altneu;
Implementors of `PrefixExtension` provide types and methods for extending a differential dataflow collection,
via the three methods `count`, `propose`, and `validate`.
**/
pub trait PrefixExtender<G: Scope> {
pub trait PrefixExtender<G: Scope, R: Monoid+Mul<Output = R>> {
/// The required type of prefix to extend.
type Prefix;
/// The type to be produced as extension.
type Extension;
/// Annotates prefixes with the number of extensions the relation would propose.
fn count(&mut self, &Collection<G, (Self::Prefix, usize, usize)>, usize) -> Collection<G, (Self::Prefix, usize, usize)>;
fn count(&mut self, &Collection<G, (Self::Prefix, usize, usize), R>, usize) -> Collection<G, (Self::Prefix, usize, usize), R>;
/// Extends each prefix with corresponding extensions.
fn propose(&mut self, &Collection<G, Self::Prefix>) -> Collection<G, (Self::Prefix, Self::Extension)>;
fn propose(&mut self, &Collection<G, Self::Prefix, R>) -> Collection<G, (Self::Prefix, Self::Extension), R>;
/// Restricts proposed extensions by those the extender would have proposed.
fn validate(&mut self, &Collection<G, (Self::Prefix, Self::Extension)>) -> Collection<G, (Self::Prefix, Self::Extension)>;
fn validate(&mut self, &Collection<G, (Self::Prefix, Self::Extension), R>) -> Collection<G, (Self::Prefix, Self::Extension), R>;
}

pub trait ProposeExtensionMethod<G: Scope, P: Data+Ord> {
fn propose_using<PE: PrefixExtender<G, Prefix=P>>(&self, extender: &mut PE) -> Collection<G, (P, PE::Extension)>;
fn extend<E: Data+Ord>(&self, extenders: &mut [&mut PrefixExtender<G,Prefix=P,Extension=E>]) -> Collection<G, (P, E)>;
pub trait ProposeExtensionMethod<G: Scope, P: Data+Ord, R: Monoid+Mul<Output = R>> {
fn propose_using<PE: PrefixExtender<G, R, Prefix=P>>(&self, extender: &mut PE) -> Collection<G, (P, PE::Extension), R>;
fn extend<E: Data+Ord>(&self, extenders: &mut [&mut PrefixExtender<G,R,Prefix=P,Extension=E>]) -> Collection<G, (P, E), R>;
}

impl<G: Scope, P: Data+Ord> ProposeExtensionMethod<G, P> for Collection<G, P> {
fn propose_using<PE: PrefixExtender<G, Prefix=P>>(&self, extender: &mut PE) -> Collection<G, (P, PE::Extension)> {
impl<G: Scope, P: Data+Ord, R: Monoid+Mul<Output = R>> ProposeExtensionMethod<G, P, R> for Collection<G, P, R> {
fn propose_using<PE: PrefixExtender<G, R, Prefix=P>>(&self, extender: &mut PE) -> Collection<G, (P, PE::Extension), R> {
extender.propose(self)
}
fn extend<E: Data+Ord>(&self, extenders: &mut [&mut PrefixExtender<G,Prefix=P,Extension=E>]) -> Collection<G, (P, E)>
fn extend<E: Data+Ord>(&self, extenders: &mut [&mut PrefixExtender<G,R,Prefix=P,Extension=E>]) -> Collection<G, (P, E), R>
{

if extenders.len() == 1 {
Expand Down Expand Up @@ -89,12 +92,12 @@ impl<G: Scope, P: Data+Ord> ProposeExtensionMethod<G, P> for Collection<G, P> {
}
}

pub trait ValidateExtensionMethod<G: Scope, P, E> {
fn validate_using<PE: PrefixExtender<G, Prefix=P, Extension=E>>(&self, extender: &mut PE) -> Collection<G, (P, E)>;
pub trait ValidateExtensionMethod<G: Scope, R: Monoid+Mul<Output = R>, P, E> {
fn validate_using<PE: PrefixExtender<G, R, Prefix=P, Extension=E>>(&self, extender: &mut PE) -> Collection<G, (P, E), R>;
}

impl<G: Scope, P, E> ValidateExtensionMethod<G, P, E> for Collection<G, (P, E)> {
fn validate_using<PE: PrefixExtender<G, Prefix=P, Extension=E>>(&self, extender: &mut PE) -> Collection<G, (P, E)> {
impl<G: Scope, R: Monoid+Mul<Output = R>, P, E> ValidateExtensionMethod<G, R, P, E> for Collection<G, (P, E), R> {
fn validate_using<PE: PrefixExtender<G, R, Prefix=P, Extension=E>>(&self, extender: &mut PE) -> Collection<G, (P, E), R> {
extender.validate(self)
}
}
Expand All @@ -105,27 +108,29 @@ type TraceValHandle<K,V,T,R> = TraceAgent<K, V, T, R, TraceValSpine<K,V,T,R>>;
type TraceKeySpine<K,T,R> = Spine<K, (), T, R, Rc<OrdKeyBatch<K,T,R>>>;
type TraceKeyHandle<K,T,R> = TraceAgent<K, (), T, R, TraceKeySpine<K,T,R>>;

pub struct CollectionIndex<K, V, T>
pub struct CollectionIndex<K, V, T, R>
where
K: Data,
V: Data,
T: Lattice+Data,
R: Monoid+Mul<Output = R>,
{
/// A trace of type (K, ()), used to count extensions for each prefix.
count_trace: TraceKeyHandle<K, T, isize>,

/// A trace of type (K, V), used to propose extensions for each prefix.
propose_trace: TraceValHandle<K, V, T, isize>,
propose_trace: TraceValHandle<K, V, T, R>,

/// A trace of type ((K, V), ()), used to validate proposed extensions.
validate_trace: TraceKeyHandle<(K, V), T, isize>,
validate_trace: TraceKeyHandle<(K, V), T, R>,
}

impl<K, V, T> Clone for CollectionIndex<K, V, T>
impl<K, V, T, R> Clone for CollectionIndex<K, V, T, R>
where
K: Data+Hash,
V: Data+Hash,
T: Lattice+Data+Timestamp,
R: Monoid+Mul<Output = R>,
{
fn clone(&self) -> Self {
CollectionIndex {
Expand All @@ -136,25 +141,33 @@ where
}
}

impl<K, V, T> CollectionIndex<K, V, T>
impl<K, V, T, R> CollectionIndex<K, V, T, R>
where
K: Data+Hash,
V: Data+Hash,
T: Lattice+Data+Timestamp,
R: Monoid+Mul<Output = R>,
{
pub fn index<G: Scope<Timestamp=T>>(collection: &Collection<G, (K, V), isize>) -> Self {
let counts = collection.map(|(k,_v)| k).arrange_by_self().trace;

pub fn index<G: Scope<Timestamp = T>>(collection: &Collection<G, (K, V), R>) -> Self {
// We need to count the number of (k, v) pairs and not rely on the given Monoid R and its binary addition operation.
// counts and validate can share the base arrangement
let arranged = collection.arrange_by_self();
let counts = arranged
.distinct()
.map(|(k, _v)| k)
.arrange_by_self()
.trace;
let propose = collection.arrange_by_key().trace;
let validate = collection.arrange_by_self().trace;
let validate = arranged.trace;

CollectionIndex {
count_trace: counts,
propose_trace: propose,
validate_trace: validate,
}
}

pub fn extend_using<P, F: Fn(&P)->K>(&self, logic: F) -> CollectionExtender<K, V, T, P, F> {
pub fn extend_using<P, F: Fn(&P)->K>(&self, logic: F) -> CollectionExtender<K, V, T, R, P, F> {
CollectionExtender {
phantom: std::marker::PhantomData,
indices: self.clone(),
Expand All @@ -163,32 +176,34 @@ where
}
}

pub struct CollectionExtender<K, V, T, P, F>
pub struct CollectionExtender<K, V, T, R, P, F>
where
K: Data,
V: Data,
T: Lattice+Data,
R: Monoid+Mul<Output = R>,
F: Fn(&P)->K,
{
phantom: std::marker::PhantomData<P>,
indices: CollectionIndex<K, V, T>,
indices: CollectionIndex<K, V, T, R>,
key_selector: Rc<F>,
}

impl<G, K, V, P, F> PrefixExtender<G> for CollectionExtender<K, V, G::Timestamp, P, F>
impl<G, K, V, R, P, F> PrefixExtender<G, R> for CollectionExtender<K, V, G::Timestamp, R, P, F>
where
G: Scope,
K: Data+Hash,
V: Data+Hash,
P: Data,
G::Timestamp: Lattice+Data,
R: Monoid+Mul<Output = R>,
F: Fn(&P)->K+'static,
{

type Prefix = P;
type Extension = V;

fn count(&mut self, prefixes: &Collection<G, (P, usize, usize)>, index: usize) -> Collection<G, (P, usize, usize)> {
fn count(&mut self, prefixes: &Collection<G, (P, usize, usize), R>, index: usize) -> Collection<G, (P, usize, usize), R> {

// This method takes a stream of `(prefix, time, diff)` changes, and we want to produce the corresponding
// stream of `((prefix, count), time, diff)` changes, just by looking up `count` in `count_trace`. We are
Expand All @@ -202,7 +217,7 @@ where
let logic1 = self.key_selector.clone();
let logic2 = self.key_selector.clone();

let exchange = Exchange::new(move |update: &((P,usize,usize),G::Timestamp,isize)| logic1(&(update.0).0).hashed().as_u64());
let exchange = Exchange::new(move |update: &((P,usize,usize),G::Timestamp,R)| logic1(&(update.0).0).hashed().as_u64());

let mut buffer1 = Vec::new();
let mut buffer2 = Vec::new();
Expand Down Expand Up @@ -261,11 +276,11 @@ where
}
}
}
*diff = 0;
*diff = R::zero();
}
}

prefixes.retain(|ptd| ptd.2 != 0);
prefixes.retain(|ptd| ptd.2 != R::zero());
}
}
}
Expand All @@ -283,7 +298,7 @@ where
}).as_collection()
}

fn propose(&mut self, prefixes: &Collection<G, P>) -> Collection<G, (P, V)> {
fn propose(&mut self, prefixes: &Collection<G, P, R>) -> Collection<G, (P, V), R> {

// This method takes a stream of `(prefix, time, diff)` changes, and we want to produce the corresponding
// stream of `((prefix, count), time, diff)` changes, just by looking up `count` in `count_trace`. We are
Expand All @@ -300,7 +315,7 @@ where
let mut buffer1 = Vec::new();
let mut buffer2 = Vec::new();

let exchange = Exchange::new(move |update: &(P,G::Timestamp,isize)| logic1(&update.0).hashed().as_u64());
let exchange = Exchange::new(move |update: &(P,G::Timestamp,R)| logic1(&update.0).hashed().as_u64());

prefixes.inner.binary_frontier(&propose.stream, exchange, Pipeline, "Propose", move |_,_| move |input1, input2, output| {

Expand Down Expand Up @@ -343,21 +358,21 @@ where
cursor.seek_key(&storage, &key);
if cursor.get_key(&storage) == Some(&key) {
while let Some(value) = cursor.get_val(&storage) {
let mut count = 0;
let mut count = R::zero();
cursor.map_times(&storage, |t, d| if t.less_equal(time) { count += d; });
// assert!(count >= 0);
if count > 0 {
session.give(((prefix.clone(), value.clone()), time.clone(), diff.clone()));
if count > R::zero() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you talk me through this logic? I would have though that Monoid did not imply Ord, and that our goal here is probably to multiply count and diff. Is that wrong?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing over in validate(); I think our goal (correct me if wrong) is to end up multiplying all of the constraints together. Back in relational-land this would implement "intersect", and in general monoid land it implements something else, I suppose...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I must have missed pushing the multiplication part.
As far as I understand it, the

cursor.map_times(&storage, |t, d| if t.less_equal(time) { count += d; });

both in validate and propose starts with the zero element of the monoid and then applies its addition for every time in the cursor, sort of consolidating one specific (k, v)-pair in the propose_trace/validate_trace.
If its non-zero we create an output like so:

if count > R::zero() {
    session.give(((prefix.clone(), value.clone()), time.clone(), diff.clone() * count));
}

using the Mul that the semi-ring supplies.
Same for validate.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect rather than > R::zero() you'll probably want !count.is_zero(), and you may even want to perform the multiplication with diff first (to see if they annihilate).

I think the > you have access to is not through Monoid, but rather just the Ord implementation that differential requires to sort things. I don't think it is semantically meaningful here (e.g., the numbers could accumulate negatively, and you would want to multiply anyhow).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, if you have the free cycles, there are several other spots in the code where you have != R::zero() that we should probably make into !blah.is_zero(), in potential anticipation of going from monoids to semigroups (where a zero element may not inhabit the type). Not as urgent, but it was something that came to mind as I read the edits.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mh yes good point. Didn't think much about the >.
Let me think a bit about semi-rings, but multiplication first wouldn't cost us anything, so seems like a sensible choice.
I am not sure if we can have two non-zero elements of our semiring using Mul would turn into the identiy, aka zero, element of Add

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, replying to the github email apparently put the response one level up. I'll avoid using that mechanism in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah didn't think about more complex diffs then a single one^^
True that, true that

session.give(((prefix.clone(), value.clone()), time.clone(), diff.clone() * count));
}
cursor.step_val(&storage);
}
cursor.rewind_vals(&storage);
}
*diff = 0;
*diff = R::zero();
}
}

prefixes.retain(|ptd| ptd.2 != 0);
prefixes.retain(|ptd| ptd.2 != R::zero());
}
}
}
Expand All @@ -375,7 +390,7 @@ where
}).as_collection()
}

fn validate(&mut self, extensions: &Collection<G, (P, V)>) -> Collection<G, (P, V)> {
fn validate(&mut self, extensions: &Collection<G, (P, V), R>) -> Collection<G, (P, V), R> {


// This method takes a stream of `(prefix, time, diff)` changes, and we want to produce the corresponding
Expand All @@ -393,7 +408,7 @@ where
let mut buffer1 = Vec::new();
let mut buffer2 = Vec::new();

let exchange = Exchange::new(move |update: &((P,V),G::Timestamp,isize)|
let exchange = Exchange::new(move |update: &((P,V),G::Timestamp,R)|
(logic1(&(update.0).0).clone(), ((update.0).1).clone()).hashed().as_u64()
);

Expand Down Expand Up @@ -437,18 +452,18 @@ where
let key = (logic2(&prefix.0), (prefix.1).clone());
cursor.seek_key(&storage, &key);
if cursor.get_key(&storage) == Some(&key) {
let mut count = 0;
let mut count = R::zero();
cursor.map_times(&storage, |t, d| if t.less_equal(time) { count += d; });
// assert!(count >= 0);
if count > 0 {
session.give((prefix.clone(), time.clone(), diff.clone()));
if count > R::zero(){
session.give((prefix.clone(), time.clone(), diff.clone() * count));
}
}
*diff = 0;
*diff = R::zero();
}
}

prefixes.retain(|ptd| ptd.2 != 0);
prefixes.retain(|ptd| ptd.2 != R::zero());
}
}
}
Expand Down