Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ mod sp_repartition_fuzz_tests {
// Define a and f are aliases
eq_properties.add_equal_conditions(col_a, col_f)?;
// Column e has constant value.
eq_properties = eq_properties.add_constants([ConstExpr::new(col_e.clone())]);
eq_properties = eq_properties.add_constants([ConstExpr::from(col_e)]);

// Randomly order columns for sorting
let mut rng = StdRng::seed_from_u64(seed);
Expand Down
28 changes: 28 additions & 0 deletions datafusion/physical-expr/src/equivalence/class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,28 @@ use datafusion_common::JoinType;
/// - `across_partitions`: A boolean flag indicating whether the constant expression is
/// valid across partitions. If set to `true`, the constant expression has same value for all partitions.
/// If set to `false`, the constant expression may have different values for different partitions.
///
/// # Example
///
/// ```rust
/// # use datafusion_physical_expr::ConstExpr;
/// # use datafusion_physical_expr_common::expressions::lit;
/// let col = lit(5);
/// // Create a constant expression from a physical expression ref
/// let const_expr = ConstExpr::from(&col);
/// // create a constant expression from a physical expression
/// let const_expr = ConstExpr::from(col);
/// ```
pub struct ConstExpr {
expr: Arc<dyn PhysicalExpr>,
across_partitions: bool,
}

impl ConstExpr {
/// Create a new constant expression from a physical expression.
///
/// Note you can also use `ConstExpr::from` to create a constant expression
/// from a reference as well
pub fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
Self {
expr,
Expand Down Expand Up @@ -85,6 +101,18 @@ impl ConstExpr {
}
}

impl From<Arc<dyn PhysicalExpr>> for ConstExpr {
fn from(expr: Arc<dyn PhysicalExpr>) -> Self {
Self::new(expr)
}
}

impl From<&Arc<dyn PhysicalExpr>> for ConstExpr {
fn from(expr: &Arc<dyn PhysicalExpr>) -> Self {
Self::new(Arc::clone(expr))
}
}

/// Checks whether `expr` is among in the `const_exprs`.
pub fn const_exprs_contains(
const_exprs: &[ConstExpr],
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/equivalence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ mod tests {
// Define a and f are aliases
eq_properties.add_equal_conditions(col_a, col_f)?;
// Column e has constant value.
eq_properties = eq_properties.add_constants([ConstExpr::new(Arc::clone(col_e))]);
eq_properties = eq_properties.add_constants([ConstExpr::from(col_e)]);

// Randomly order columns for sorting
let mut rng = StdRng::seed_from_u64(seed);
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-expr/src/equivalence/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,9 +556,9 @@ mod tests {
let eq_group = EquivalenceGroup::new(eq_group);
eq_properties.add_equivalence_group(eq_group);

let constants = constants.into_iter().map(|expr| {
ConstExpr::new(Arc::clone(expr)).with_across_partitions(true)
});
let constants = constants
.into_iter()
.map(|expr| ConstExpr::from(expr).with_across_partitions(true));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change makes it easier to follow the logic here

eq_properties = eq_properties.add_constants(constants);

let reqs = convert_to_sort_exprs(&reqs);
Expand Down
22 changes: 10 additions & 12 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,13 @@ impl EquivalenceProperties {
// Left expression is constant, add right as constant
if !const_exprs_contains(&self.constants, right) {
self.constants
.push(ConstExpr::new(Arc::clone(right)).with_across_partitions(true));
.push(ConstExpr::from(right).with_across_partitions(true));
}
} else if self.is_expr_constant(right) {
// Right expression is constant, add left as constant
if !const_exprs_contains(&self.constants, left) {
self.constants
.push(ConstExpr::new(Arc::clone(left)).with_across_partitions(true));
.push(ConstExpr::from(left).with_across_partitions(true));
}
}

Expand Down Expand Up @@ -300,7 +300,7 @@ impl EquivalenceProperties {
{
if !const_exprs_contains(&self.constants, &expr) {
let const_expr =
ConstExpr::new(expr).with_across_partitions(across_partitions);
ConstExpr::from(expr).with_across_partitions(across_partitions);
self.constants.push(const_expr);
}
}
Expand Down Expand Up @@ -404,7 +404,7 @@ impl EquivalenceProperties {
// we add column `a` as constant to the algorithm state. This enables us
// to deduce that `(b + c) ASC` is satisfied, given `a` is constant.
eq_properties = eq_properties
.add_constants(std::iter::once(ConstExpr::new(normalized_req.expr)));
.add_constants(std::iter::once(ConstExpr::from(normalized_req.expr)));
}
true
}
Expand Down Expand Up @@ -832,9 +832,8 @@ impl EquivalenceProperties {
&& !const_exprs_contains(&projected_constants, target)
{
// Expression evaluates to single value
projected_constants.push(
ConstExpr::new(Arc::clone(target)).with_across_partitions(true),
);
projected_constants
.push(ConstExpr::from(target).with_across_partitions(true));
}
}
projected_constants
Expand Down Expand Up @@ -927,8 +926,8 @@ impl EquivalenceProperties {
// Note that these expressions are not properly "constants". This is just
// an implementation strategy confined to this function.
for (PhysicalSortExpr { expr, .. }, idx) in &ordered_exprs {
eq_properties = eq_properties
.add_constants(std::iter::once(ConstExpr::new(Arc::clone(expr))));
eq_properties =
eq_properties.add_constants(std::iter::once(ConstExpr::from(expr)));
search_indices.shift_remove(idx);
}
// Add new ordered section to the state.
Expand Down Expand Up @@ -2147,8 +2146,7 @@ mod tests {
let col_h = &col("h", &test_schema)?;

// Add column h as constant
eq_properties =
eq_properties.add_constants(vec![ConstExpr::new(Arc::clone(col_h))]);
eq_properties = eq_properties.add_constants(vec![ConstExpr::from(col_h)]);

let test_cases = vec![
// TEST CASE 1
Expand Down Expand Up @@ -2458,7 +2456,7 @@ mod tests {
for case in cases {
let mut properties = base_properties
.clone()
.add_constants(case.constants.into_iter().map(ConstExpr::new));
.add_constants(case.constants.into_iter().map(ConstExpr::from));
for [left, right] in &case.equal_conditions {
properties.add_equal_conditions(left, right)?
}
Expand Down
6 changes: 2 additions & 4 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,11 @@ impl FilterExec {
// Filter evaluates to single value for all partitions
if input_eqs.is_expr_constant(binary.left()) {
res_constants.push(
ConstExpr::new(binary.right().clone())
.with_across_partitions(true),
ConstExpr::from(binary.right()).with_across_partitions(true),
)
} else if input_eqs.is_expr_constant(binary.right()) {
res_constants.push(
ConstExpr::new(binary.left().clone())
.with_across_partitions(true),
ConstExpr::from(binary.left()).with_across_partitions(true),
)
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ fn calculate_union_eq_properties(
// TODO: Check whether constant expressions evaluates the same value or not for each partition
let across_partitions = false;
return Some(
ConstExpr::new(meet_constant.owned_expr())
ConstExpr::from(meet_constant.owned_expr())
.with_across_partitions(across_partitions),
);
}
Expand Down
4 changes: 1 addition & 3 deletions datafusion/physical-plan/src/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,9 +598,7 @@ pub fn get_window_mode(
options: None,
}));
// Treat partition by exprs as constant. During analysis of requirements are satisfied.
let const_exprs = partitionby_exprs
.iter()
.map(|expr| ConstExpr::new(expr.clone()));
let const_exprs = partitionby_exprs.iter().map(ConstExpr::from);
let partition_by_eqs = input_eqs.add_constants(const_exprs);
let order_by_reqs = PhysicalSortRequirement::from_sort_exprs(orderby_keys);
let reverse_order_by_reqs =
Expand Down