Skip to content

Commit

Permalink
generalize threshold
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Sep 4, 2019
1 parent f21f5ed commit 84043a9
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 43 deletions.
9 changes: 5 additions & 4 deletions experiments/src/bin/graspan1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::fs::File;
use timely::dataflow::Scope;
use timely::order::Product;

use differential_dataflow::difference::Present;
use differential_dataflow::input::Input;
use differential_dataflow::trace::implementations::ord::OrdValSpine;
use differential_dataflow::operators::*;
Expand All @@ -17,7 +18,6 @@ use differential_dataflow::operators::iterate::SemigroupVariable;
type Node = u32;
type Time = ();
type Iter = u32;
type Diff = i32;
type Offs = u32;

fn main() {
Expand Down Expand Up @@ -52,7 +52,8 @@ fn main() {
labels.join_core(&edges, |_b, a, c| Some((*c, *a)))
.concat(&nodes)
.arrange::<OrdValSpine<_,_,_,_,Offs>>()
.distinct_total_core::<Diff>();
// .distinct_total_core::<Diff>();
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None });

labels.set(&next);
next.leave()
Expand Down Expand Up @@ -81,8 +82,8 @@ fn main() {
let dst: Node = elts.next().unwrap().parse().ok().expect("malformed dst");
let typ: &str = elts.next().unwrap();
match typ {
"n" => { nodes.update((src, dst), 1 as Diff); },
"e" => { edges.update((src, dst), 1 as Diff); },
"n" => { nodes.update((src, dst), Present); },
"e" => { edges.update((src, dst), Present); },
unk => { panic!("unknown type: {}", unk)},
}
}
Expand Down
50 changes: 28 additions & 22 deletions experiments/src/bin/graspan2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,18 @@ use std::fs::File;
use timely::dataflow::Scope;
use timely::order::Product;

use differential_dataflow::operators::iterate::Variable;
use differential_dataflow::operators::iterate::SemigroupVariable;

use differential_dataflow::Collection;
use differential_dataflow::input::Input;
use differential_dataflow::operators::*;
use differential_dataflow::operators::arrange::Arrange;
use differential_dataflow::trace::implementations::ord::{OrdValSpine, OrdKeySpine};

use differential_dataflow::difference::Present;

type Node = u32;
type Time = ();
type Iter = u32;
type Diff = i32;
type Offs = u32;

fn main() {
Expand All @@ -45,8 +44,8 @@ fn unoptimized() {

// let timer = timer.clone();

let (a_handle, assignment) = scope.new_collection::<_,Diff>();
let (d_handle, dereference) = scope.new_collection::<_,Diff>();
let (a_handle, assignment) = scope.new_collection::<_,Present>();
let (d_handle, dereference) = scope.new_collection::<_,Present>();

let nodes =
assignment
Expand All @@ -63,9 +62,8 @@ fn unoptimized() {
let assignment = assignment.enter(scope);
let dereference = dereference.enter(scope);

let value_flow = Variable::new(scope, Product::new(Default::default(), 1));
let memory_alias = Variable::new(scope, Product::new(Default::default(), 1));
// let value_alias = Variable::from(nodes.filter(|_| false).map(|n| (n,n)));
let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1));
let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1));

let value_flow_arranged = value_flow.arrange::<OrdValSpine<_,_,_,_,Offs>>();
let memory_alias_arranged = memory_alias.arrange::<OrdValSpine<_,_,_,_,Offs>>();
Expand Down Expand Up @@ -94,19 +92,23 @@ fn unoptimized() {
let value_flow_next =
value_flow_next
.arrange::<OrdKeySpine<_,_,_,Offs>>()
.distinct_total_core::<Diff>();
// .distinct_total_core::<Diff>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None });
;

// MA(a,b) <- D(x,a),VA(x,y),D(y,b)
let memory_alias_next: Collection<_,_,Diff> =
let memory_alias_next: Collection<_,_,Present> =
value_alias_next
.join_core(&dereference, |_x,&y,&a| Some((y,a)))
.arrange::<OrdValSpine<_,_,_,_,Offs>>()
.join_core(&dereference, |_y,&a,&b| Some((a,b)));

let memory_alias_next: Collection<_,_,Diff> =
let memory_alias_next: Collection<_,_,Present> =
memory_alias_next
.arrange::<OrdKeySpine<_,_,_,Offs>>()
.distinct_total_core::<Diff>();
// .distinct_total_core::<Diff>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None });
;

value_flow.set(&value_flow_next);
memory_alias.set(&memory_alias_next);
Expand Down Expand Up @@ -135,11 +137,10 @@ fn unoptimized() {
let dst: Node = elts.next().unwrap().parse().ok().expect("malformed dst");
let typ: &str = elts.next().unwrap();
match typ {
// "a" => { a.insert((src, dst)); },
"a" => a.update((src,dst), 1),
// "d" => { d.insert((src, dst)); },
"d" => d.update((src,dst), 1),
"a" => a.update((src,dst), Present),
"d" => d.update((src,dst), Present),
_ => { },
// x => panic!("Unexpected type: {:?}", x),
}
}
}
Expand Down Expand Up @@ -186,8 +187,8 @@ fn optimized() {
let assignment = assignment.enter(scope);
let dereference = dereference.enter(scope);

let value_flow = Variable::new(scope, Product::new(Default::default(), 1));
let memory_alias = Variable::new(scope, Product::new(Default::default(), 1));
let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1));
let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1));

let value_flow_arranged = value_flow.arrange::<OrdValSpine<_,_,_,_,u32>>();
let memory_alias_arranged = memory_alias.arrange::<OrdValSpine<_,_,_,_,u32>>();
Expand All @@ -205,7 +206,9 @@ fn optimized() {
.join_core(&value_flow_arranged, |_,&a,&b| Some((a,b)))
.concat(&nodes.map(|n| (n,n)))
.arrange::<OrdKeySpine<_,_,_,u32>>()
.distinct_total_core::<Diff>();
// .distinct_total_core::<Diff>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None });
;

// VFD(a,b) <- VF(a,x),D(x,b)
let value_flow_deref =
Expand All @@ -228,7 +231,9 @@ fn optimized() {
.join_core(&value_flow_deref, |_y,&a,&b| Some((a,b)))
.concat(&memory_alias_next)
.arrange::<OrdKeySpine<_,_,_,u32>>()
.distinct_total_core::<Diff>();
// .distinct_total_core::<Diff>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None });
;

value_flow.set(&value_flow_next);
memory_alias.set(&memory_alias_next);
Expand Down Expand Up @@ -256,9 +261,10 @@ fn optimized() {
let dst: Node = elts.next().unwrap().parse().ok().expect("malformed dst");
let typ: &str = elts.next().unwrap();
match typ {
"a" => { a.update((src, dst), 1 as Diff); },
"d" => { d.update((src, dst), 1 as Diff); },
"a" => { a.update((src, dst), Present); },
"d" => { d.update((src, dst), Present); },
_ => { },
// x => panic!("Unexpected type: {:?}", x),
}
}
}
Expand Down
48 changes: 31 additions & 17 deletions src/operators/threshold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ use trace::{BatchReader, Cursor, TraceReader};
/// Extension trait for the `distinct` differential dataflow method.
pub trait ThresholdTotal<G: Scope, K: ExchangeData, R: ExchangeData+Semigroup> where G::Timestamp: TotalOrder+Lattice+Ord {
/// Reduces the collection to one occurrence of each distinct element.
fn threshold_semigroup<R2, F>(&self, thresh: F) -> Collection<G, K, R2>
where
R2: Semigroup,
F: Fn(&K,&R,Option<&R>)->Option<R2>+'static,
;
/// Reduces the collection to one occurrence of each distinct element.
///
/// # Examples
///
Expand All @@ -38,7 +44,13 @@ pub trait ThresholdTotal<G: Scope, K: ExchangeData, R: ExchangeData+Semigroup> w
/// });
/// }
/// ```
fn threshold_total<R2: Abelian, F: Fn(&K,&R)->R2+'static>(&self, thresh: F) -> Collection<G, K, R2>;
fn threshold_total<R2: Abelian, F: Fn(&K,&R)->R2+'static>(&self, thresh: F) -> Collection<G, K, R2> {
self.threshold_semigroup(move |key, new, old| {
let mut new = thresh(key, new);
if let Some(old) = old { new += &-thresh(key, old); }
if !new.is_zero() { Some(new) } else { None }
})
}
/// Reduces the collection to one occurrence of each distinct element.
///
/// This reduction only tests whether the weight associated with a record is non-zero, and otherwise
Expand Down Expand Up @@ -80,9 +92,9 @@ pub trait ThresholdTotal<G: Scope, K: ExchangeData, R: ExchangeData+Semigroup> w

impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> ThresholdTotal<G, K, R> for Collection<G, K, R>
where G::Timestamp: TotalOrder+Lattice+Ord {
fn threshold_total<R2: Abelian, F: Fn(&K,&R)->R2+'static>(&self, thresh: F) -> Collection<G, K, R2> {
fn threshold_semigroup<R2: Semigroup, F: Fn(&K,&R,Option<&R>)->Option<R2>+'static>(&self, thresh: F) -> Collection<G, K, R2> {
self.arrange_by_self()
.threshold_total(thresh)
.threshold_semigroup(thresh)
}
}

Expand All @@ -95,8 +107,7 @@ where
T1::Batch: BatchReader<T1::Key, (), G::Timestamp, T1::R>,
T1::Cursor: Cursor<T1::Key, (), G::Timestamp, T1::R>,
{

fn threshold_total<R2: Abelian, F:Fn(&T1::Key,&T1::R)->R2+'static>(&self, thresh: F) -> Collection<G, T1::Key, R2> {
fn threshold_semigroup<R2: Semigroup, F:Fn(&T1::Key,&T1::R,Option<&T1::R>)->Option<R2>+'static>(&self, thresh: F) -> Collection<G, T1::Key, R2> {

let mut trace = self.trace.clone();
let mut buffer = Vec::new();
Expand Down Expand Up @@ -138,21 +149,24 @@ where
// If the result is non-zero, send it along.
batch_cursor.map_times(&batch, |time, diff| {

// Determine old and new weights.
// If a count is zero, the weight must be zero.
let old_weight = count.as_ref().map(|c| thresh(key, c));
count.as_mut().map(|c| *c += diff);
if count.is_none() { count = Some(diff.clone()); }
let new_weight = count.as_ref().map(|c| thresh(key, c));

let difference =
match (old_weight, new_weight) {
(Some(old), Some(new)) => { let mut diff = -old; diff += &new; Some(diff) },
(Some(old), None) => { Some(-old) },
(None, Some(new)) => { Some(new) },
(None, None) => None,
match &count {
Some(old) => {
let mut temp = old.clone();
temp += diff;
thresh(key, &temp, Some(old))
},
None => { thresh(key, diff, None) },
};

// Either add or assign `diff` to `count`.
if let Some(count) = &mut count {
*count += diff;
}
else {
count = Some(diff.clone());
}

if let Some(difference) = difference {
if !difference.is_zero() {
session.give((key.clone(), time.clone(), difference));
Expand Down

0 comments on commit 84043a9

Please sign in to comment.