diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index 338a994aa..a2920c163 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -186,6 +186,9 @@ where let timer = std::time::Instant::now(); let mut work = 0; + // New entries to introduce to the stash after processing. + let mut stash_additions = HashMap::new(); + if let Some(ref mut trace) = arrangement_trace { for (capability, proposals) in stash.iter_mut() { @@ -231,6 +234,35 @@ where } proposals.retain(|ptd| !ptd.2.is_zero()); + + // Determine the lower bound of remaining update times. + let mut antichain = Antichain::new(); + for (_, initial, _) in proposals.iter() { + antichain.insert(initial.clone()); + } + // Fast path: there is only one element in the antichain. + // All times in `proposals` must be greater or equal to it. + if antichain.len() == 1 && !antichain.less_equal(capability.time()) { + stash_additions + .entry(capability.delayed(&antichain[0])) + .or_insert(Vec::new()) + .extend(proposals.drain(..)); + } + else if antichain.len() > 1 { + // Any remaining times should peel off elements from `proposals`. + let mut additions = vec![Vec::new(); antichain.len()]; + for (data, initial, diff) in proposals.drain(..) { + use timely::PartialOrder; + let position = antichain.iter().position(|t| t.less_equal(&initial)).unwrap(); + additions[position].push((data, initial, diff)); + } + for (time, addition) in antichain.into_iter().zip(additions) { + stash_additions + .entry(capability.delayed(&time)) + .or_insert(Vec::new()) + .extend(addition); + } + } } } } @@ -242,6 +274,10 @@ where // drop fully processed capabilities. stash.retain(|_,proposals| !proposals.is_empty()); + + for (capability, proposals) in stash_additions.into_iter() { + stash.entry(capability).or_insert(Vec::new()).extend(proposals); + } // The logical merging frontier depends on both input1 and stash. let mut frontier = timely::progress::frontier::Antichain::new();