Skip to content

Commit

Permalink
Add comments. Use as_literal
Browse files Browse the repository at this point in the history
  • Loading branch information
Andi Wang committed Jul 28, 2020
1 parent bb5b3f3 commit 3361062
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 60 deletions.
5 changes: 4 additions & 1 deletion src/dataflow/src/render/filter.rs
Expand Up @@ -26,6 +26,8 @@ where
G::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice,
{
/// Finds collection corresponding to input RelationExpr and then applies
/// predicates to the input collection.
pub fn render_filter(
&mut self,
input: &RelationExpr,
Expand All @@ -38,6 +40,7 @@ where
}
}

/// Applies predicates to the given collection.
pub fn render_filter_inner<G>(
collection: Collection<G, Row>,
predicates: Vec<ScalarExpr>,
Expand All @@ -46,8 +49,8 @@ where
G: Scope,
G::Timestamp: Lattice,
{
let temp_storage = repr::RowArena::new();
collection.filter_fallible(move |input_row| {
let temp_storage = repr::RowArena::new();
let datums = input_row.unpack();
for p in &predicates {
if p.eval(&datums, &temp_storage)? != Datum::True {
Expand Down
120 changes: 64 additions & 56 deletions src/dataflow/src/render/join.rs
Expand Up @@ -295,72 +295,80 @@ where
}
}

pub fn render_semi_join(
&mut self,
input: &RelationExpr,
keys: &[ScalarExpr],
scope: &mut G,
worker_index: usize,
) {
/// Renders a join of an arrangement to a constant collection of size 1.
/// `input` should be of the form
/// `Join { inputs: [ArrangeBy{...}], implementation: Semijoin, ...}`
pub fn render_semi_join(&mut self, input: &RelationExpr, scope: &mut G, worker_index: usize) {
if let RelationExpr::Join {
inputs,
equivalences,
implementation: expr::JoinImplementation::Semijoin,
..
} = input
{
// Extract the constant to semijoin against from the equivalence classes
let temp_storage = RowArena::new();
let constant_row = keys
.iter()
.map(|k| {
let eval_result = equivalences.iter().find_map(|e| {
if e.contains(k) {
return e.iter().find_map(|s| match s {
ScalarExpr::Literal(_, _) => Some(s.eval(&[], &temp_storage)),
_ => None,
});
}
None
});
match eval_result {
Some(Ok(datum)) => datum,
Some(Err(_)) => {
unreachable!("Error occurred when evaluating semijoin constant")
}
None => unreachable!("No constant for semijoin key"),
}
})
.collect::<Vec<_>>();
if let RelationExpr::ArrangeBy {
input: inner_input,
keys,
} = &inputs[0]
{
let keys = &keys[0];
// For each key ...
let constant_row = keys
.iter()
.map(|k| {
equivalences
.iter()
.find_map(|e| {
// ... look for the equivalence class containing the key ...
if e.contains(k) {
// ... and find the constant within that equivalence
// class.
return e.iter().find_map(|s| match s.as_literal() {
Some(Ok(datum)) => Some(datum),
Some(Err(_)) => unreachable!(
"Error occurred when evaluating semijoin constant"
),
None => None,
});
}
None
})
.unwrap()
})
.collect::<Vec<_>>();

// TODO: convert join to filter if the corresponding equivalent
// constant for each key cannot be found?

// Create a RelationExpr representing the constant
// Having the RelationType is technically not an accurate
// representation of the expression, but relation type doesn't
// matter when it comes to rendering.
let mut row_packer = repr::RowPacker::new();
let constant_row_expr = RelationExpr::Constant {
rows: vec![(row_packer.pack(constant_row), 1)],
typ: RelationType::empty(),
};
// Create a RelationExpr representing the constant
// Having the RelationType is technically not an accurate
// representation of the expression, but relation type doesn't
// matter when it comes to rendering.
let mut row_packer = repr::RowPacker::new();
let constant_row_expr = RelationExpr::Constant {
rows: vec![(row_packer.pack(constant_row), 1)],
typ: RelationType::empty(),
};

self.render_constant(&constant_row_expr, scope, worker_index);
self.render_constant(&constant_row_expr, scope, worker_index);

let (constant_collection, errs) = self.collection(&constant_row_expr).unwrap();
match self.arrangement(&inputs[0], keys) {
Some(ArrangementFlavor::Local(oks, es)) => {
let result = oks.semijoin(&constant_collection).arrange_named("Semijoin");
let es = errs.concat(&es.as_collection(|k, _v| k.clone())).arrange();
self.set_local(input, keys, (result, es));
}
Some(ArrangementFlavor::Trace(_gid, oks, es)) => {
let result = oks.semijoin(&constant_collection).arrange_named("Semijoin");
let es = errs.concat(&es.as_collection(|k, _v| k.clone())).arrange();
self.set_local(input, keys, (result, es));
}
None => {
panic!("Arrangement alarmingly absent!");
}
};
let (constant_collection, errs) = self.collection(&constant_row_expr).unwrap();
match self.arrangement(&inner_input, keys) {
Some(ArrangementFlavor::Local(oks, es)) => {
let result = oks.semijoin(&constant_collection).arrange_named("Semijoin");
let es = errs.concat(&es.as_collection(|k, _v| k.clone())).arrange();
self.set_local(input, keys, (result, es));
}
Some(ArrangementFlavor::Trace(_gid, oks, es)) => {
let result = oks.semijoin(&constant_collection).arrange_named("Semijoin");
let es = errs.concat(&es.as_collection(|k, _v| k.clone())).arrange();
self.set_local(input, keys, (result, es));
}
None => {
panic!("Arrangement alarmingly absent!");
}
};
}
}
}
}
4 changes: 1 addition & 3 deletions src/dataflow/src/render/mod.rs
Expand Up @@ -937,9 +937,7 @@ where
self.collections.insert(relation_expr.clone(), collection);
}
expr::JoinImplementation::Semijoin => {
if let RelationExpr::ArrangeBy { keys, .. } = &inputs[0] {
self.render_semi_join(relation_expr, &keys[0], scope, worker_index);
}
self.render_semi_join(relation_expr, scope, worker_index);
}
expr::JoinImplementation::Unimplemented => {
panic!("Attempt to render unimplemented join");
Expand Down

0 comments on commit 3361062

Please sign in to comment.