Skip to content

Commit

Permalink
Do a semijoin when there is a filter on an index.
Browse files Browse the repository at this point in the history
  • Loading branch information
Andi Wang committed Jul 23, 2020
1 parent 9f9cf34 commit 482d0fc
Show file tree
Hide file tree
Showing 13 changed files with 1,108 additions and 229 deletions.
55 changes: 55 additions & 0 deletions src/dataflow/src/render/constant.rs
@@ -0,0 +1,55 @@
// Copyright Materialize, Inc. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use differential_dataflow::lattice::Lattice;
use differential_dataflow::{AsCollection, Collection};

use timely::dataflow::operators::map::Map;
use timely::dataflow::operators::to_stream::ToStream;
use timely::dataflow::Scope;
use timely::progress::{timestamp::Refines, Timestamp};

use expr::RelationExpr;
use repr::Row;

use crate::operator::CollectionExt;
use crate::render::context::Context;

impl<G, T> Context<G, RelationExpr, Row, T>
where
G: Scope,
G::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice,
{
pub fn render_constant(
&mut self,
relation_expr: &RelationExpr,
scope: &mut G,
worker_index: usize,
) {
// The constant collection is instantiated only on worker zero.
if let RelationExpr::Constant { rows, .. } = relation_expr {
let rows = if worker_index == 0 {
rows.clone()
} else {
vec![]
};

let collection = rows
.to_stream(scope)
.map(|(x, diff)| (x, timely::progress::Timestamp::minimum(), diff))
.as_collection();

let err_collection = Collection::empty(scope);

self.collections
.insert(relation_expr.clone(), (collection, err_collection));
}
}
}
59 changes: 59 additions & 0 deletions src/dataflow/src/render/filter.rs
@@ -0,0 +1,59 @@
// Copyright Materialize, Inc. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use differential_dataflow::lattice::Lattice;
use differential_dataflow::Collection;

use timely::dataflow::Scope;
use timely::progress::{timestamp::Refines, Timestamp};

use dataflow_types::DataflowError;
use expr::{RelationExpr, ScalarExpr};
use repr::{Datum, Row};

use crate::operator::CollectionExt;
use crate::render::context::Context;

impl<G, T> Context<G, RelationExpr, Row, T>
where
G: Scope,
G::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice,
{
pub fn render_filter(
&mut self,
input: &RelationExpr,
predicates: Vec<ScalarExpr>,
) -> (Collection<G, Row>, Collection<G, DataflowError>) {
let (ok_collection, err_collection) = self.collection(input).unwrap();
let (ok_collection, new_err_collection) = render_filter_inner(ok_collection, predicates);
let err_collection = err_collection.concat(&new_err_collection);
(ok_collection, err_collection)
}
}

pub fn render_filter_inner<G>(
collection: Collection<G, Row>,
predicates: Vec<ScalarExpr>,
) -> (Collection<G, Row>, Collection<G, DataflowError>)
where
G: Scope,
G::Timestamp: Lattice,
{
let temp_storage = repr::RowArena::new();
collection.filter_fallible(move |input_row| {
let datums = input_row.unpack();
for p in &predicates {
if p.eval(&datums, &temp_storage)? != Datum::True {
return Ok(false);
}
}
Ok::<_, DataflowError>(true)
})
}
73 changes: 71 additions & 2 deletions src/dataflow/src/render/join.rs
Expand Up @@ -9,15 +9,15 @@

use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::arrangement::Arrange;
use differential_dataflow::operators::join::JoinCore;
use differential_dataflow::operators::join::{Join, JoinCore};
use differential_dataflow::trace::implementations::ord::OrdValSpine;
use differential_dataflow::Collection;
use timely::dataflow::Scope;
use timely::progress::{timestamp::Refines, Timestamp};

use dataflow_types::*;
use expr::{RelationExpr, ScalarExpr};
use repr::{Datum, Row, RowArena};
use repr::{Datum, RelationType, Row, RowArena};

use crate::operator::CollectionExt;
use crate::render::context::{ArrangementFlavor, Context};
Expand Down Expand Up @@ -277,4 +277,73 @@ where
panic!("render_join called on invalid expression.")
}
}

pub fn render_semi_join(
&mut self,
input: &RelationExpr,
keys: &[ScalarExpr],
scope: &mut G,
worker_index: usize,
) {
if let RelationExpr::Join {
inputs,
equivalences,
implementation: expr::JoinImplementation::Semijoin,
..
} = input
{
// Extract the constant to semijoin against from the equivalence classes
let temp_storage = RowArena::new();
let constant_row = keys
.iter()
.map(|k| {
let eval_result = equivalences.iter().find_map(|e| {
if e.contains(k) {
return e.iter().find_map(|s| match s {
ScalarExpr::Literal(_, _) => Some(s.eval(&[], &temp_storage)),
_ => None,
});
}
None
});
match eval_result {
Some(Ok(datum)) => datum,
Some(Err(_)) => {
unreachable!("Error occurred when evaluating semijoin constant")
}
None => unreachable!("No constant for semijoin key"),
}
})
.collect::<Vec<_>>();

// Create a RelationExpr representing the constant
// Having the RelationType is technically not an accurate
// representation of the expression, but relation type doesn't
// matter when it comes to rendering.
let mut row_packer = repr::RowPacker::new();
let constant_row_expr = RelationExpr::Constant {
rows: vec![(row_packer.pack(constant_row), 1)],
typ: RelationType::empty(),
};

self.render_constant(&constant_row_expr, scope, worker_index);

let (constant_collection, errs) = self.collection(&constant_row_expr).unwrap();
match self.arrangement(&inputs[0], keys) {
Some(ArrangementFlavor::Local(oks, es)) => {
let result = oks.semijoin(&constant_collection).arrange_named("Semijoin");
let es = errs.concat(&es.as_collection(|k, _v| k.clone())).arrange();
self.set_local(input, keys, (result, es));
}
Some(ArrangementFlavor::Trace(_gid, oks, es)) => {
let result = oks.semijoin(&constant_collection).arrange_named("Semijoin");
let es = errs.concat(&es.as_collection(|k, _v| k.clone())).arrange();
self.set_local(input, keys, (result, es));
}
None => {
panic!("Arrangement alarmingly absent!");
}
};
}
}
}
47 changes: 14 additions & 33 deletions src/dataflow/src/render/mod.rs
Expand Up @@ -110,7 +110,6 @@ use differential_dataflow::operators::arrange::upsert::arrange_from_upsert;
use differential_dataflow::{AsCollection, Collection};
use timely::communication::Allocate;
use timely::dataflow::operators::generic::operator;
use timely::dataflow::operators::to_stream::ToStream;
use timely::dataflow::operators::unordered_input::UnorderedInput;
use timely::dataflow::operators::Map;
use timely::dataflow::Scope;
Expand Down Expand Up @@ -139,8 +138,10 @@ use crate::server::{TimestampDataUpdates, TimestampMetadataUpdates};
use crate::source::{FileSourceInfo, KafkaSourceInfo};

mod arrange_by;
mod constant;
mod context;
mod delta_join;
mod filter;
mod join;
mod reduce;
mod threshold;
Expand Down Expand Up @@ -737,23 +738,8 @@ where
// a collection or an arrangement. In either case, we associate the result with
// the `relation_expr` argument in the context.
match relation_expr {
// The constant collection is instantiated only on worker zero.
RelationExpr::Constant { rows, .. } => {
let rows = if worker_index == 0 {
rows.clone()
} else {
vec![]
};

let collection = rows
.to_stream(scope)
.map(|(x, diff)| (x, timely::progress::Timestamp::minimum(), diff))
.as_collection();

let err_collection = Collection::empty(scope);

self.collections
.insert(relation_expr.clone(), (collection, err_collection));
RelationExpr::Constant { .. } => {
self.render_constant(relation_expr, scope, worker_index);
}

// A get should have been loaded into the context, and it is surprising to
Expand Down Expand Up @@ -908,28 +894,18 @@ where
.render_delta_join(input, predicates, scope, worker_index, |t| {
t.saturating_sub(1)
}),
expr::JoinImplementation::Semijoin => {
self.ensure_rendered(input, scope, worker_index);
self.render_filter(input, predicates.clone())
}
expr::JoinImplementation::Unimplemented => {
panic!("Attempt to render unimplemented join");
}
};
(ok_collection, err_collection.map(Into::into))
} else {
self.ensure_rendered(input, scope, worker_index);
let temp_storage = RowArena::new();
let predicates = predicates.clone();
let (ok_collection, err_collection) = self.collection(input).unwrap();
let (ok_collection, new_err_collection) =
ok_collection.filter_fallible(move |input_row| {
let datums = input_row.unpack();
for p in &predicates {
if p.eval(&datums, &temp_storage)? != Datum::True {
return Ok(false);
}
}
Ok::<_, DataflowError>(true)
});
let err_collection = err_collection.concat(&new_err_collection);
(ok_collection, err_collection)
self.render_filter(input, predicates.clone())
};
self.collections.insert(relation_expr.clone(), collections);
}
Expand Down Expand Up @@ -957,6 +933,11 @@ where
);
self.collections.insert(relation_expr.clone(), collection);
}
expr::JoinImplementation::Semijoin => {
if let RelationExpr::ArrangeBy { keys, .. } = &inputs[0] {
self.render_semi_join(relation_expr, &keys[0], scope, worker_index);
}
}
expr::JoinImplementation::Unimplemented => {
panic!("Attempt to render unimplemented join");
}
Expand Down
1 change: 1 addition & 0 deletions src/expr/src/explain.rs
Expand Up @@ -451,6 +451,7 @@ impl JoinImplementation {
.collect()
)
),
Semijoin => "Semijoin".to_owned(),
Unimplemented => "Unimplemented".to_owned(),
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/expr/src/relation/mod.rs
Expand Up @@ -1152,6 +1152,14 @@ pub enum JoinImplementation {
/// Each plan starts from the corresponding index, and then in sequence joins
/// against collections identified by index and with the specified arrangement key.
DeltaQuery(Vec<Vec<(usize, Vec<ScalarExpr>)>>),
/// Perform a semi join of a single input to a constant literal
///
/// Technically, both differential and delta query implementations will
/// also perform semijoins if the semijoin would shrink the size of the
/// arranged inputs. A join is designated as having this implementation
/// in the single input case where neither `Differential` nor `DeltaQuery`
/// make sense.
Semijoin,
/// No implementation yet selected.
Unimplemented,
}
Expand Down

0 comments on commit 482d0fc

Please sign in to comment.