Skip to content

Commit

Permalink
Merge pull request #27 from frankmcsherry/batch_arrange
Browse files Browse the repository at this point in the history
Batch arrange, fixes #26.
  • Loading branch information
frankmcsherry committed Apr 11, 2017
2 parents b7fb883 + cb55d97 commit 3477954
Show file tree
Hide file tree
Showing 22 changed files with 936 additions and 3,122 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ git="http://github.com/frankmcsherry/timely-dataflow"

[dependencies]
abomonation="0.4.4"
timely_sort="^0.1.1"
timely_communication="^0.1.3"
timely_sort="0.1.5"
timely_communication="0.1.5"
itertools="0.4"
fnv="1.0.2"
linear-map = "0.0.4"
Expand Down
99 changes: 58 additions & 41 deletions examples/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use rand::{Rng, SeedableRng, StdRng};
use timely::dataflow::*;
use timely::dataflow::operators::*;

use differential_dataflow::Collection;
use differential_dataflow::{Collection, AsCollection};
use differential_dataflow::operators::*;
use differential_dataflow::lattice::Lattice;

Expand All @@ -25,15 +25,15 @@ fn main() {
// define a new computational scope, in which to run BFS
timely::execute_from_args(std::env::args().skip(6), move |computation| {

// let timer = ::std::time::Instant::now();
let timer = ::std::time::Instant::now();

// define BFS dataflow; return handles to roots and edges inputs
let (mut graph, probe) = computation.scoped(|scope| {
let (mut roots, mut graph, probe) = computation.scoped(|scope| {

let roots = vec![(1,Default::default(),1)].into_iter().to_stream(scope);
let (root_input, roots) = scope.new_input();
let (edge_input, graph) = scope.new_input();

let mut result = bfs(&Collection::new(graph.clone()), &Collection::new(roots.clone()));
let mut result = reach(&graph.as_collection(), &roots.as_collection());

if !inspect {
result = result.filter(|_| false);
Expand All @@ -44,43 +44,57 @@ fn main() {
.inspect(|x| println!("\t{:?}", x))
.probe();

(edge_input, probe.0)
(root_input, edge_input, probe.0)
});

let seed: &[_] = &[1, 2, 3, 4];
let mut rng1: StdRng = SeedableRng::from_seed(seed); // rng for edge additions
let mut rng2: StdRng = SeedableRng::from_seed(seed); // rng for edge deletions

let mut names = Vec::new();
let mut rng: StdRng = SeedableRng::from_seed(seed);
for _i in 0 .. nodes {
// names.push(_i as u32);
names.push(rng.next_u32());
}

roots.send((names[1], Default::default(), 1));
roots.close();

// println!("performing BFS on {} nodes, {} edges:", nodes, edges);

if computation.index() == 0 {
// trickle edges in to dataflow
for _ in 0..(edges/1000) {
for _ in 0..1000 {
graph.send(((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), Default::default(), 1));
let src = *rng1.choose(&names[..]).unwrap();
let dst = *rng1.choose(&names[..]).unwrap();
graph.send(((src, dst), Default::default(), 1));
}
computation.step();
}
for _ in 0.. (edges % 1000) {
graph.send(((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), Default::default(), 1));
let src = *rng1.choose(&names[..]).unwrap();
let dst = *rng1.choose(&names[..]).unwrap();
graph.send(((src, dst), Default::default(), 1));
}
}

// println!("loaded; elapsed: {:?}", timer.elapsed());
println!("loaded; elapsed: {:?}", timer.elapsed());

graph.advance_to(1);
computation.step_while(|| probe.lt(graph.time()));

// println!("stable; elapsed: {:?}", timer.elapsed());
println!("stable; elapsed: {:?}", timer.elapsed());

let mut session = differential_dataflow::input::InputSession::from(&mut graph);
for round in 0 .. rounds {
for element in 0 .. batch {
if computation.index() == 0 {
session.insert((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)));
session.remove((rng2.gen_range(0, nodes), rng2.gen_range(0, nodes)));
session.insert((names[rng1.gen_range(0, nodes as usize)], names[rng1.gen_range(0, nodes as usize)]));
session.remove((names[rng2.gen_range(0, nodes as usize)], names[rng2.gen_range(0, nodes as usize)]));
}
session.advance_to(1 + round * batch + element);
session.advance_to(2 + round * batch + element);
}
session.flush();

Expand All @@ -89,34 +103,12 @@ fn main() {

if computation.index() == 0 {
let elapsed = timer.elapsed();
println!("{}", elapsed.as_secs() * 1000000000 + (elapsed.subsec_nanos() as u64));
println!();
// println!("{:?}:\t{}", round, elapsed.as_secs() * 1000000000 + (elapsed.subsec_nanos() as u64));
}
}


// if batch > 0 {
// for _wave in 0 .. rounds {
// if computation.index() == 0 {
// let mut session = differential_dataflow::input::InputSession::from(&mut graph);
// for _ in 0 .. batch {
// session.insert((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)));
// session.remove((rng2.gen_range(0, nodes), rng2.gen_range(0, nodes)));
// let &round = session.epoch();
// session.advance_to(round + 1);
// }
// }

// let timer = ::std::time::Instant::now();
// computation.step_while(|| probe.lt(&graph.time()));

// if computation.index() == 0 {
// let elapsed = timer.elapsed();
// println!("{}", elapsed.as_secs() * 1000000000 + (elapsed.subsec_nanos() as u64));
// }
// }
// }

// println!("finished; elapsed: {:?}", timer.elapsed());

println!("finished; elapsed: {:?}", timer.elapsed());

}).unwrap();
}
Expand All @@ -134,8 +126,33 @@ where G::Timestamp: Lattice+Ord {
let edges = edges.enter(&inner.scope());
let nodes = nodes.enter(&inner.scope());

inner.join_map(&edges, |_k,l,d| (*d, l+1))
inner.join_map_u(&edges, |_k,l,d| (*d, l+1))
.concat(&nodes)
.group(|_, s, t| t.push((s[0].0, 1)))
.group_u(|_, s, t| t.push((s[0].0, 1)))
})
}

// returns pairs (n, s) indicating node n can be reached from a root in s steps.
fn reach<G: Scope>(edges: &Collection<G, Edge>, roots: &Collection<G, Node>) -> Collection<G, (Node, u32)>
where G::Timestamp: Lattice+Ord {

// repeatedly update minimal distances each node can be reached from each root
roots.iterate(|inner| {

let edges = edges.enter(&inner.scope());
let roots = roots.enter(&inner.scope());

edges//.inspect(|x| println!("edge: {:?}", x))
.semijoin_u(&inner)
.map(|(_, dst)| dst)
.concat(&roots)
.inspect_batch(|t,xs| for x in xs.iter() {
if x.0 == 2002306141 {
println!("props @ {:?}: {:?}", t, x)
}
})
.distinct_u()
// .inspect_batch(|t,xs| println!("label @ {:?}: {:?}", t, xs))
})
.map(|x| (x,0))
}
4 changes: 2 additions & 2 deletions examples/degrees.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use differential_dataflow::operators::join::JoinArranged;
use differential_dataflow::operators::group::GroupArranged;
use differential_dataflow::operators::arrange::{ArrangeByKey, ArrangeBySelf};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::trace::implementations::rhh::Spine;
use differential_dataflow::trace::implementations::hash::HashValSpine as Spine;

fn main() {

Expand Down Expand Up @@ -141,7 +141,7 @@ where G::Timestamp: Lattice+::std::hash::Hash+Ord {
// determine active vertices
let active = inner.flat_map(|(src,dst)| Some(src).into_iter().chain(Some(dst).into_iter()))
.arrange_by_self()
.group_arranged(move |_k, s, t| { if s[0].1 > k { t.push(((),1)) } }, Spine::new(Default::default()));
.group_arranged(move |_k, s, t| { if s[0].1 > k { t.push(((),1)) } }, Spine::new());
// .threshold(|k| k.as_u64(), |x| (VecMap::new(), x), move |_,cnt| if cnt >= k { 1 } else { 0 });

// restrict edges active vertices, return result
Expand Down
13 changes: 7 additions & 6 deletions examples/scc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use differential_dataflow::operators::join::JoinArranged;
use differential_dataflow::operators::group::GroupArranged;
use differential_dataflow::operators::arrange::Arrange;
use differential_dataflow::hashable::UnsignedWrapper;
use differential_dataflow::trace::implementations::rhh::Spine as HashSpine;
use differential_dataflow::trace::implementations::rhh_k::Spine as KeyHashSpine;
use differential_dataflow::trace::implementations::hash::HashValSpine as HashSpine;
use differential_dataflow::trace::implementations::hash::HashKeySpine as KeyHashSpine;


type Node = u32;
Expand Down Expand Up @@ -127,12 +127,13 @@ where G::Timestamp: Lattice+Ord+Hash {
graph.iterate(|edges| {
// keep edges from active edge destinations.

let active = edges.map(|(_,k)| (k,()))
.arrange(|k,v| (UnsignedWrapper::from(k), v), KeyHashSpine::new(Default::default()))
.group_arranged(|_k,_s,t| t.push(((), 1)), KeyHashSpine::new(Default::default()));
let active = edges.map(|(_,k)| (UnsignedWrapper::from(k), ()))
.arrange(KeyHashSpine::new())
.group_arranged(|_k,_s,t| t.push(((), 1)), KeyHashSpine::new());

graph.enter(&edges.scope())
.arrange(|k,v| (UnsignedWrapper::from(k), v), HashSpine::new(Default::default()))
.map(|(k,v)| (UnsignedWrapper::from(k), v))
.arrange(HashSpine::new())
.join_arranged(&active, |k,v,_| (k.item.clone(), v.clone()))

// let active = edges.map(|(_,dst)| dst).distinct_u();
Expand Down
Loading

0 comments on commit 3477954

Please sign in to comment.