Skip to content

Commit

Permalink
Change JoinImplementation::Semijoin -> subcase of JoinImplementation:…
Browse files Browse the repository at this point in the history
…:Differential.
  • Loading branch information
Andi Wang committed Aug 31, 2020
1 parent dbdf6bc commit 1b2e6e7
Show file tree
Hide file tree
Showing 10 changed files with 1,572 additions and 1,145 deletions.
55 changes: 47 additions & 8 deletions src/dataflow/src/render/delta_join.rs
Expand Up @@ -9,6 +9,8 @@

#![allow(clippy::op_ref)]

use std::collections::HashMap;

use differential_dataflow::lattice::Lattice;
use dogsdogsdogs::altneu::AltNeu;
use timely::dataflow::Scope;
Expand Down Expand Up @@ -73,16 +75,54 @@ where
offset += arities[input];
}

// filter arrangements using semijoins if possible
let mut equivalences = equivalences.clone();
for equivalence in equivalences.iter_mut() {
equivalence.sort();
equivalence.dedup();
}

// if possible, shorten input arrangements by joining them with a
// constant row. Create a map to look up the shortened
// arrangements to use
let mut filter_constants = vec![];
let mut filtered_input_lookup = HashMap::new();

for (index, input) in inputs.iter().enumerate() {
if let RelationExpr::ArrangeBy {keys, ..} = input {
for key in keys.iter() {
let result = self.filter_on_index_if_able(
&input,
prior_arities[index],
key,
&equivalences,
scope,
worker_index
);
if let Some((filtered_input, constants)) = result {
filter_constants.extend(constants);
filtered_input_lookup.insert((index, key.clone()), filtered_input);
} else {
filtered_input_lookup.insert((index, key.clone()), input.clone());
}
}
}
}

filter_constants.sort();
filter_constants.dedup();
// eliminate filters that have already been applied from
// `equivalences`
for equivalence in equivalences.iter_mut() {
equivalence.retain(|expr| !filter_constants.contains(expr));
}
equivalences.retain(|e| e.len() > 1);

let mut err_streams = Vec::with_capacity(inputs.len());
for relation in 0..inputs.len() {

// We maintain a private copy of `equivalences`, which we will digest
// as we produce the join.
let mut equivalences = equivalences.clone();
for equivalence in equivalences.iter_mut() {
equivalence.sort();
equivalence.dedup();
}

// This collection determines changes that result from updates inbound
// from `inputs[relation]` and reflects all strictly prior updates and
Expand Down Expand Up @@ -116,7 +156,6 @@ where
// Repeatedly update `update_stream` to reflect joins with more and more
// other relations, in the specified order.
for (other, next_key) in order.iter() {

let mut next_key_rebased = next_key.clone();
for expr in next_key_rebased.iter_mut() {
expr.visit_mut(&mut |e| if let ScalarExpr::Column(c) = e {
Expand Down Expand Up @@ -170,11 +209,11 @@ where
// arrangement, rather than re-wrap each time we use a thing.
let subtract = subtract.clone();
let (oks, errs) = match self
.arrangement(&inputs[*other], &next_key[..])
.arrangement(&filtered_input_lookup[&(*other, next_key.clone())], &next_key[..])
.unwrap_or_else(|| {
panic!(
"Arrangement alarmingly absent!: {}, {:?}",
inputs[*other].pretty(),
filtered_input_lookup[&(*other, next_key.clone())].pretty(),
&next_key[..]
)
}) {
Expand Down
10 changes: 5 additions & 5 deletions src/dataflow/src/render/filter.rs
Expand Up @@ -11,7 +11,6 @@ use differential_dataflow::lattice::Lattice;
use differential_dataflow::Collection;

use timely::dataflow::Scope;
use timely::progress::{timestamp::Refines, Timestamp};

use dataflow_types::DataflowError;
use expr::{RelationExpr, ScalarExpr};
Expand All @@ -20,19 +19,20 @@ use repr::{Datum, Row};
use crate::operator::CollectionExt;
use crate::render::context::Context;

impl<G, T> Context<G, RelationExpr, Row, T>
impl<G> Context<G, RelationExpr, Row, dataflow_types::Timestamp>
where
G: Scope,
G::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice,
G: Scope<Timestamp = dataflow_types::Timestamp>,
{
/// Finds collection corresponding to input RelationExpr and then applies
/// predicates to the input collection.
pub fn render_filter(
&mut self,
input: &RelationExpr,
predicates: Vec<ScalarExpr>,
scope: &mut G,
worker_index: usize,
) -> (Collection<G, Row>, Collection<G, DataflowError>) {
self.ensure_rendered(input, scope, worker_index);
let (ok_collection, err_collection) = self.collection(input).unwrap();
let (ok_collection, new_err_collection) = render_filter_inner(ok_collection, predicates);
let err_collection = err_collection.concat(&new_err_collection);
Expand Down

0 comments on commit 1b2e6e7

Please sign in to comment.