Skip to content

Commit

Permalink
initial checkin
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Mar 6, 2020
1 parent a54e7cb commit 935c936
Show file tree
Hide file tree
Showing 14 changed files with 672 additions and 569 deletions.
185 changes: 112 additions & 73 deletions src/dataflow/render/delta_join.rs

Large diffs are not rendered by default.

214 changes: 114 additions & 100 deletions src/dataflow/render/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -772,31 +772,21 @@ where
) -> Collection<G, Row> {
if let RelationExpr::Join {
inputs,
variables,
equivalences,
demand,
implementation: expr::JoinImplementation::Differential(start, order),
} = relation_expr
{
// For the moment, assert that each relation participates at most
// once in each equivalence class. If not, we should be able to
// push a filter upwards, and if we can't do that it means a bit
// more filter logic in this operator which doesn't exist yet.
assert!(variables.iter().all(|h| {
let len = h.len();
let mut list = h.iter().map(|(i, _)| i).collect::<Vec<_>>();
list.sort();
list.dedup();
len == list.len()
}));

let variables = variables
.iter()
.map(|v| {
let mut result = v.clone();
result.sort();
result
})
.collect::<Vec<_>>();
let column_types = relation_expr.typ().column_types;
let arity = column_types.len();

// We maintain a private copy of `equivalences`, which we will digest
// as we produce the join.
let mut equivalences = equivalences.clone();
for equivalence in equivalences.iter_mut() {
equivalence.sort();
equivalence.dedup();
}

for input in inputs.iter() {
self.ensure_rendered(input, env, scope, worker_index);
Expand All @@ -813,114 +803,142 @@ where
prior_arities.push(offset);
offset += arities[input];
}
let input_relation = arities
.iter()
.enumerate()
.flat_map(|(r, a)| std::iter::repeat(r).take(*a))
.collect::<Vec<_>>();

// Unwrap demand
// TODO: If we pushed predicates into the operator, we could have a
// more accurate view of demand that does not include the support of
// all predicates.
let demand = if let Some(demand) = demand {
demand.clone()
} else {
// Assume demand encompasses all columns
arities.iter().map(|arity| (0..*arity).collect()).collect()
};
let demand = demand.clone().unwrap_or_else(|| (0..arity).collect());

// This collection will evolve as we join in more inputs.
let mut joined = self.collection(&inputs[*start]).unwrap();

// Maintain sources of each in-progress column.
let mut columns = (0..arities[*start])
.map(|c| (*start, c))
let mut source_columns = (prior_arities[*start]
..prior_arities[*start] + arities[*start])
.collect::<Vec<_>>();

let mut predicates = predicates.to_vec();
joined = crate::render::delta_join::build_filter(
joined,
&columns,
&source_columns,
&mut predicates,
&prior_arities,
&mut equivalences,
env,
);

// The intent is to maintain `joined` as the full cross
// product of all input relations so far, subject to all
// of the equality constraints in `variables`. This means
let mut inputs_joined = std::collections::HashSet::new();
inputs_joined.insert(start);

for (_index, (input, next_keys)) in order.iter().enumerate() {
// Keys for the incoming updates are determined by locating
// the elements of `next_keys` among the existing `columns`.
for (input, next_keys) in order.iter() {
// Keys for the next input to be joined must be produced from
// ScalarExprs found in `equivalences`, re-written to bind the
// appropriate columns (as `joined` has permuted columns).
let prev_keys = next_keys
.iter()
.map(|k| {
if let ScalarExpr::Column(c) = k {
variables
.iter()
.find(|v| v.contains(&(*input, *c)))
.expect("Column in key not bound!")
.iter()
.flat_map(|rel_col1| {
// Find the first (rel,col) pair in `columns`.
// One *should* exist, but it is not the case that all must.us
columns.iter().position(|rel_col2| rel_col1 == rel_col2)
})
.next()
.expect("Column in key not bound by prior column")
} else {
panic!("Non-column keys are not currently supported");
}
.map(|expr| {
// We expect to find `expr` in some `equivalence` which
// has a bound expression. Otherwise, the join plan is
// defective and we should panic.
let equivalence = equivalences
.iter()
.find(|equivs| equivs.contains(expr))
.expect("Expression in join plan is not in an equivalence relation");

// We expect to find exactly one bound expression, as
// multiple bound expressions should result in a filter
// and be removed once they have.
let mut bound_expr = equivalence
.iter()
.find(|expr| {
expr.support()
.into_iter()
.all(|c| source_columns.contains(&c))
})
.expect("Expression in join plan is not bound at time of use")
.clone();

bound_expr.visit_mut(&mut |e| {
if let ScalarExpr::Column(c) = e {
*c = source_columns
.iter()
.position(|x| x == c)
.expect("Did not find bound column in source_columns");
}
});
bound_expr
})
.collect::<Vec<_>>();

// Determine which columns from `joined` and `input` will be kept
inputs_joined.insert(input);
let prev_outputs = columns
// We should extract each element of `next_keys` from `equivalences`,
// as each *should* now be a redundant constraint. We do this so that
// the demand analysis does not require these columns be produced.
for equivalence in equivalences.iter_mut() {
equivalence.retain(|expr| !next_keys.contains(expr));
}
equivalences.retain(|e| e.len() > 1);

// Determine which columns from `joined` and `input` should be
// retained. Columns should be retained if they are required by
// `demand`, or are in the support of an equivalence class.
let mut column_demand = std::collections::HashSet::new();
for equivalence in equivalences.iter() {
for expr in equivalence.iter() {
column_demand.extend(expr.support());
}
}
column_demand.extend(demand.iter().cloned());

// Identify the *indexes* of columns that are demanded by any
// remaining predicates and equivalence classes.
let prev_vals = source_columns
.iter()
.enumerate()
.flat_map(|(i, (r, c))| {
// TODO: Check if this discards key repetitions.
let output_demand = demand[*r].contains(c);
let future_demand = variables.iter().any(|variable| {
variable.contains(&(*r, *c))
&& variable.iter().any(|(r2, _)| !inputs_joined.contains(r2))
});
if output_demand || future_demand {
.filter_map(|(i, c)| {
if column_demand.contains(c) {
Some(i)
} else {
None
}
})
.collect::<Vec<_>>();
let next_outputs = (0..arities[*input])
.flat_map(|i| {
let output_demand = demand[*input].contains(&i);
let future_demand = variables.iter().any(|variable| {
variable.contains(&(*input, i))
&& variable.iter().any(|(r2, _)| !inputs_joined.contains(r2))
});
if output_demand || future_demand {
Some(i)
let next_vals = (0..arities[*input])
.filter_map(|c| {
if column_demand.contains(&c) {
Some(c)
} else {
None
}
})
.collect::<Vec<_>>();

// List the new locations the columns will be in
columns = prev_outputs
// Identify the columns we intend to retain.
source_columns = prev_vals
.iter()
.map(|i| columns[*i])
.chain(next_outputs.iter().map(|i| (*input, *i)))
.map(|i| source_columns[*i])
.chain(
next_vals
.iter()
.map(|i| prior_arities[input_relation[*i]] + *i),
)
.collect();

// We exploit the demand information to restrict `prev` to its demanded columns.
let env_clone = env.clone();
let prev_keyed = joined
.map({
move |row| {
let datums = row.unpack();
let key_row = Row::pack(prev_keys.iter().map(|i| datums[*i]));
(key_row, Row::pack(prev_outputs.iter().map(|i| datums[*i])))
let temp_storage = RowArena::new();
let key = Row::pack(
prev_keys
.iter()
.map(|e| e.eval(&datums, &env_clone, &temp_storage)),
);
let row = Row::pack(prev_vals.iter().map(|i| datums[*i]));
(key, row)
}
})
.arrange_named::<OrdValSpine<_, _, _, _>>(&format!("JoinStage: {}", input));
Expand All @@ -935,7 +953,7 @@ where
Some(Row::pack(
prev_datums
.iter()
.chain(next_outputs.iter().map(|i| &next_datums[*i])),
.chain(next_vals.iter().map(|i| &next_datums[*i])),
))
})
}
Expand All @@ -948,7 +966,7 @@ where
Some(Row::pack(
prev_datums
.iter()
.chain(next_outputs.iter().map(|i| &next_datums[*i])),
.chain(next_vals.iter().map(|i| &next_datums[*i])),
))
})
}
Expand All @@ -959,37 +977,33 @@ where

joined = crate::render::delta_join::build_filter(
joined,
&columns,
&source_columns,
&mut predicates,
&prior_arities,
&mut equivalences,
env,
);
}

// We are obliged to produce demanded columns in order, with dummy data allowed
// in non-demanded locations. They must all be in order, in any case. All demanded
// columns should be present in `columns` (and probably not much else).
// columns should be present in `source_columns` (and probably not much else).

let mut position_or = Vec::new();
for rel in 0..inputs.len() {
for col in 0..arities[rel] {
position_or.push(if demand[rel].contains(&col) {
Ok(columns
.iter()
.position(|rel_col| rel_col == &(rel, col))
.expect("Demanded column not found"))
let position_or = (0..arity)
.map(|col| {
if let Some(pos) = source_columns.iter().position(|c| c == &col) {
Ok(pos)
} else {
Err({
let typ = &types[rel].column_types[col];
let typ = &column_types[col];
if typ.nullable {
Datum::Null
} else {
typ.scalar_type.dummy_datum()
}
})
});
}
}
}
})
.collect::<Vec<_>>();

joined.map(move |row| {
let datums = row.unpack();
Expand Down
Loading

0 comments on commit 935c936

Please sign in to comment.