Skip to content

Commit

Permalink
Improve the precision of half_join (#386)
Browse files Browse the repository at this point in the history
Co-authored-by: Frank McSherry <mcsherry@gallustrate.myfiosgateway.com>
  • Loading branch information
frankmcsherry and Frank McSherry committed Apr 18, 2023
1 parent 73f5a21 commit 438804d
Showing 1 changed file with 36 additions and 0 deletions.
36 changes: 36 additions & 0 deletions dogsdogsdogs/src/operators/half_join.rs
Expand Up @@ -186,6 +186,9 @@ where
let timer = std::time::Instant::now();
let mut work = 0;

// New entries to introduce to the stash after processing.
let mut stash_additions = HashMap::new();

if let Some(ref mut trace) = arrangement_trace {

for (capability, proposals) in stash.iter_mut() {
Expand Down Expand Up @@ -231,6 +234,35 @@ where
}

proposals.retain(|ptd| !ptd.2.is_zero());

// Determine the lower bound of remaining update times.
let mut antichain = Antichain::new();
for (_, initial, _) in proposals.iter() {
antichain.insert(initial.clone());
}
// Fast path: there is only one element in the antichain.
// All times in `proposals` must be greater or equal to it.
if antichain.len() == 1 && !antichain.less_equal(capability.time()) {
stash_additions
.entry(capability.delayed(&antichain[0]))
.or_insert(Vec::new())
.extend(proposals.drain(..));
}
else if antichain.len() > 1 {
// Any remaining times should peel off elements from `proposals`.
let mut additions = vec![Vec::new(); antichain.len()];
for (data, initial, diff) in proposals.drain(..) {
use timely::PartialOrder;
let position = antichain.iter().position(|t| t.less_equal(&initial)).unwrap();
additions[position].push((data, initial, diff));
}
for (time, addition) in antichain.into_iter().zip(additions) {
stash_additions
.entry(capability.delayed(&time))
.or_insert(Vec::new())
.extend(addition);
}
}
}
}
}
Expand All @@ -242,6 +274,10 @@ where

// drop fully processed capabilities.
stash.retain(|_,proposals| !proposals.is_empty());

for (capability, proposals) in stash_additions.into_iter() {
stash.entry(capability).or_insert(Vec::new()).extend(proposals);
}

// The logical merging frontier depends on both input1 and stash.
let mut frontier = timely::progress::frontier::Antichain::new();
Expand Down

0 comments on commit 438804d

Please sign in to comment.