Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 48 additions & 49 deletions src/transform/src/fold_constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,10 +432,10 @@ impl FoldConstants {
rows: &[(Row, Diff)],
limit: Option<usize>,
) -> Option<Result<Vec<(Row, Diff)>, EvalError>> {
// Build a map from `group_key` to `Vec<Vec<an, ..., a1>>)`,
// where `an` is the input to the nth aggregate function in
// `aggregates`.
let mut groups = BTreeMap::new();
// A list of all keys, independent of the aggregates.
let mut keys = BTreeSet::new();
// For each aggregate, a map from keys to a list of (val, diff) pairs.
let mut groups = vec![BTreeMap::new(); aggregates.len()];
let temp_storage2 = RowArena::new();
let mut row_buf = Row::default();
let mut limit_remaining = limit.unwrap_or(usize::MAX);
Expand All @@ -451,10 +451,10 @@ impl FoldConstants {
))));
}

if limit_remaining < *diff as usize {
if limit_remaining < 1 as usize {
return None;
}
limit_remaining -= *diff as usize;
limit_remaining -= 1 as usize;
Comment on lines +454 to +457
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this doesn't need a cast.


let datums = row.unpack();
let temp_storage = RowArena::new();
Expand All @@ -463,25 +463,23 @@ impl FoldConstants {
.map(|e| e.eval(&datums, &temp_storage2))
.collect::<Result<Vec<_>, _>>()
{
Ok(key) => key,
Ok(key) => std::rc::Rc::new(key),
Err(e) => return Some(Err(e)),
};
let val = match aggregates
.iter()
.map(|agg| {
row_buf
.packer()
.extend([agg.expr.eval(&datums, &temp_storage)?]);
Ok::<_, EvalError>(row_buf.clone())
})
.collect::<Result<Vec<_>, _>>()
{
Ok(val) => val,
Err(e) => return Some(Err(e)),
};
let entry = groups.entry(key).or_insert_with(Vec::new);
for _ in 0..*diff {
entry.push(val.clone());
keys.insert((*key).clone());
for (aggregate, group) in aggregates.iter().zip(groups.iter_mut()) {
let entry = group.entry(key.clone()).or_insert_with(Vec::new);
row_buf
.packer()
.extend([aggregate.expr.eval(&datums, &temp_storage).ok()?]);
entry.push((row_buf.clone(), *diff));
}
}

// Consolidate each group, in the event that there are like records.
for group in groups.iter_mut() {
for vals in group.values_mut() {
differential_dataflow::consolidation::consolidate(vals);
}
}

Expand All @@ -490,33 +488,34 @@ impl FoldConstants {
// `Vec<Vec<k1, ..., kn, r1, ..., rn>>`
// where kn is the nth column of the key and rn is the
// result of the nth aggregate function for that group.
let new_rows = groups
.into_iter()
.map({
let mut row_buf = Row::default();
move |(key, vals)| {
let temp_storage = RowArena::new();
row_buf.packer().extend(key.into_iter().chain(
aggregates.iter().enumerate().map(|(i, agg)| {
if agg.distinct {
agg.func.eval(
vals.iter()
.map(|val| val[i].unpack_first())
.collect::<BTreeSet<_>>(),
&temp_storage,
)
} else {
agg.func.eval(
vals.iter().map(|val| val[i].unpack_first()),
&temp_storage,
)
}
}),
));
(row_buf.clone(), 1)
}
let mut row_buf = Row::default();
let new_rows = keys
.iter()
.map(|key| {
let temp_storage = RowArena::new();
row_buf.packer().extend(key.into_iter().cloned().chain(
groups.iter_mut().zip(aggregates).map(|(group, aggr)| {
let values = &group[key];
if aggr.distinct {
aggr.func.eval(
values.iter().map(|(val, _count)| val.unpack_first()),
&temp_storage,
)
} else {
aggr.func.eval(
values.iter().flat_map(|(val, count)| {
std::iter::repeat(val.unpack_first())
.take((*count).try_into().unwrap())
}),
&temp_storage,
)
}
}),
));
(row_buf.clone(), 1)
})
.collect();
.collect::<Vec<_>>();

Some(Ok(new_rows))
}

Expand Down
86 changes: 86 additions & 0 deletions src/transform/src/movement/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,92 @@ impl ProjectionPushdown {
for expr in exprs.iter() {
columns_to_pushdown.extend(expr.support());
}

// Check if any output columns of the flatmap are used, before removing them.
let output_columns_used = columns_to_pushdown.iter().any(|c| *c >= inner_arity);

// If we use none of the output columns, we can potentially simplify the flatmap.
// For example, `generate_series` could be simplified to `repeat`, plausibly.
// There may be other examples with known output counts, but I don't know them.
// The timestamp-based `generate_series` functions seemingly lack a division method.
if !output_columns_used {
use mz_expr::{BinaryFunc, TableFunc, UnaryFunc, VariadicFunc};

match func {
TableFunc::GenerateSeriesInt32 => {
// The arguments in `exprs` are `[start, stop, step]`.
// Per Brennan, the number of elements seems likely to be max (0, (stop - start + 1) / step)
let expr = MirScalarExpr::CallUnary {
func: UnaryFunc::CastInt32ToInt64(mz_expr::func::CastInt32ToInt64),
expr: Box::new(MirScalarExpr::CallVariadic {
func: VariadicFunc::Greatest,
exprs: vec![
MirScalarExpr::literal_ok(
mz_repr::Datum::Int32(0),
mz_repr::ScalarType::Int32,
),
MirScalarExpr::CallBinary {
func: BinaryFunc::DivInt32,
expr1: Box::new(MirScalarExpr::CallBinary {
func: BinaryFunc::SubInt32,
expr1: Box::new(MirScalarExpr::CallBinary {
func: BinaryFunc::AddInt32,
expr1: Box::new(exprs[1].clone()),
expr2: Box::new(MirScalarExpr::literal_ok(
mz_repr::Datum::Int32(1),
mz_repr::ScalarType::Int32,
)),
}),
expr2: Box::new(exprs[0].clone()),
}),
expr2: Box::new(exprs[2].clone()),
},
],
}),
};

*func = TableFunc::Repeat;
*exprs = vec![expr];
}
TableFunc::GenerateSeriesInt64 => {
// The arguments in `exprs` are `[start, stop, step]`.
// Per Brennan, the number of elements seems likely to be max (0, (stop - start + 1) / step)
let expr = MirScalarExpr::CallVariadic {
func: VariadicFunc::Greatest,
exprs: vec![
MirScalarExpr::literal_ok(
mz_repr::Datum::Int64(0),
mz_repr::ScalarType::Int64,
),
MirScalarExpr::CallBinary {
func: BinaryFunc::DivInt64,
expr1: Box::new(MirScalarExpr::CallBinary {
func: BinaryFunc::SubInt64,
expr1: Box::new(MirScalarExpr::CallBinary {
func: BinaryFunc::AddInt64,
expr1: Box::new(exprs[1].clone()),
expr2: Box::new(MirScalarExpr::literal_ok(
mz_repr::Datum::Int64(1),
mz_repr::ScalarType::Int64,
)),
}),
expr2: Box::new(exprs[0].clone()),
}),
expr2: Box::new(exprs[2].clone()),
},
],
};

*func = TableFunc::Repeat;
*exprs = vec![expr];
}
_ => {
// Do nothing for table functions we do not recognize.
// Doing nothing is a safe default.
}
}
}

columns_to_pushdown.retain(|c| *c < inner_arity);

reverse_permute(exprs.iter_mut(), columns_to_pushdown.iter());
Expand Down