Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 221 additions & 28 deletions datafusion/core/src/physical_plan/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ use datafusion_common::{
plan_err, DataFusionError, JoinType, Result, ScalarValue, SharedResult,
};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::utils::{
normalize_ordering_equivalence_classes, normalize_sort_exprs,
};
use datafusion_physical_expr::{
EquivalentClass, LexOrdering, LexOrderingRef, OrderingEquivalenceProperties,
OrderingEquivalentClass, PhysicalExpr, PhysicalSortExpr,
};

use datafusion_physical_expr::utils::normalize_sort_exprs;

use futures::future::{BoxFuture, Shared};
use futures::{ready, FutureExt};
use itertools::Itertools;
Expand Down Expand Up @@ -186,24 +187,13 @@ pub fn calculate_join_output_ordering(
assert_eq!(maintains_input_order.len(), 2);
let left_maintains = maintains_input_order[0];
let right_maintains = maintains_input_order[1];
let (mut right_ordering, on_columns) = match join_type {
let mut right_ordering = match join_type {
// In the case below, right ordering should be offseted with the left
// side length, since we append the right table to the left table.
JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
let updated_on_columns = on_columns
.iter()
.map(|(left, right)| {
(
left.clone(),
Column::new(right.name(), right.index() + left_columns_len),
)
})
.collect::<Vec<_>>();
let updated_right_ordering =
add_offset_to_lex_ordering(right_ordering, left_columns_len)?;
(updated_right_ordering, updated_on_columns)
add_offset_to_lex_ordering(right_ordering, left_columns_len)?
}
_ => (right_ordering.to_vec(), on_columns.to_vec()),
_ => right_ordering.to_vec(),
};
let output_ordering = match (left_maintains, right_maintains) {
(true, true) => {
Expand All @@ -215,7 +205,7 @@ pub fn calculate_join_output_ordering(
// Special case, we can prefix ordering of right side with the ordering of left side.
if join_type == JoinType::Inner && probe_side == Some(JoinSide::Left) {
replace_on_columns_of_right_ordering(
&on_columns,
on_columns,
&mut right_ordering,
left_columns_len,
);
Expand All @@ -227,6 +217,11 @@ pub fn calculate_join_output_ordering(
(false, true) => {
// Special case, we can prefix ordering of left side with the ordering of right side.
if join_type == JoinType::Inner && probe_side == Some(JoinSide::Right) {
replace_on_columns_of_right_ordering(
on_columns,
&mut right_ordering,
left_columns_len,
);
merge_vectors(&right_ordering, left_ordering)
} else {
right_ordering
Expand Down Expand Up @@ -317,25 +312,39 @@ pub fn cross_join_equivalence_properties(
new_properties
}

/// Update right table ordering equivalences so that they point to valid indices
/// at the output of the join schema. To do so, we increment column indices by left table size
/// when join schema consist of combination of left and right schema (Inner, Left, Full, Right joins).
/// Update right table ordering equivalences so that: (1) They point to valid
/// indices at the output of the join schema, and (2) they are normalized w.r.t.
/// given equivalence properties. To do so, we first increment column indices by
/// the left table size when join schema consists of a combination of left and
/// right schemas (Inner, Left, Full, Right joins).
/// Then, we normalize the sort expressions of ordering equivalences one by one.
/// We make sure that each expression in the ordering equivalence is either:
/// - Is the head of an equivalent classes, or
/// - Doesn't have an equivalent column.
/// This way, once we normalize an expression according to equivalence properties,
/// then it can be safely used for ordering equivalence normalization.
fn get_updated_right_ordering_equivalence_properties(
join_type: &JoinType,
right_oeq_classes: &[OrderingEquivalentClass],
left_columns_len: usize,
join_eq_properties: &EquivalenceProperties,
) -> Result<Vec<OrderingEquivalentClass>> {
match join_type {
let updated_oeqs = match join_type {
// In these modes, indices of the right schema should be offset by
// the left table size.
JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => {
add_offset_to_ordering_equivalence_classes(
right_oeq_classes,
left_columns_len,
)
)?
}
_ => Ok(right_oeq_classes.to_vec()),
}
_ => right_oeq_classes.to_vec(),
};

Ok(normalize_ordering_equivalence_classes(
&updated_oeqs,
join_eq_properties,
))
}

/// Merge left and right sort expressions, checking for duplicates.
Expand All @@ -356,18 +365,19 @@ fn prefix_ordering_equivalence_with_existing_ordering(
oeq_classes: &[OrderingEquivalentClass],
eq_classes: &[EquivalentClass],
) -> Vec<OrderingEquivalentClass> {
let existing_ordering = normalize_sort_exprs(existing_ordering, eq_classes, &[]);
oeq_classes
.iter()
.map(|oeq_class| {
let normalized_head = normalize_sort_exprs(oeq_class.head(), eq_classes, &[]);
let updated_head = merge_vectors(existing_ordering, &normalized_head);
let updated_head = merge_vectors(&existing_ordering, &normalized_head);
let updated_others = oeq_class
.others()
.iter()
.map(|ordering| {
let normalized_ordering =
normalize_sort_exprs(ordering, eq_classes, &[]);
merge_vectors(existing_ordering, &normalized_ordering)
merge_vectors(&existing_ordering, &normalized_ordering)
})
.collect();
OrderingEquivalentClass::new(updated_head, updated_others)
Expand Down Expand Up @@ -411,6 +421,7 @@ pub fn combine_join_ordering_equivalence_properties(
join_type,
right_oeq_properties.classes(),
left_columns_len,
&join_eq_properties,
)?;
let left_output_ordering = left.output_ordering().unwrap_or(&[]);
// Right side ordering equivalence properties should be prepended with
Expand All @@ -435,15 +446,18 @@ pub fn combine_join_ordering_equivalence_properties(
join_type,
right_oeq_properties.classes(),
left_columns_len,
&join_eq_properties,
)?;
new_properties.extend(right_oeq_classes);
// In this special case, left side ordering can be prefixed with right side ordering.
if probe_side == Some(JoinSide::Right)
&& left.output_ordering().is_some()
&& *join_type == JoinType::Inner
{
let left_oeq_classes = right_oeq_properties.classes();
let left_oeq_classes = left_oeq_properties.classes();
let right_output_ordering = right.output_ordering().unwrap_or(&[]);
let right_output_ordering =
add_offset_to_lex_ordering(right_output_ordering, left_columns_len)?;
// Left side ordering equivalence properties should be prepended with
// those of the right side while constructing output ordering equivalence
// properties since stream side is the right side.
Expand All @@ -454,7 +468,7 @@ pub fn combine_join_ordering_equivalence_properties(
// to the ordering equivalences of the join.
let updated_left_oeq_classes =
prefix_ordering_equivalence_with_existing_ordering(
right_output_ordering,
&right_output_ordering,
left_oeq_classes,
join_eq_properties.classes(),
);
Expand Down Expand Up @@ -1368,6 +1382,7 @@ mod tests {
use arrow::datatypes::Fields;
use arrow::error::Result as ArrowResult;
use arrow::{datatypes::DataType, error::ArrowError};
use arrow_schema::SortOptions;
use datafusion_common::ScalarValue;
use std::pin::Pin;

Expand Down Expand Up @@ -1898,4 +1913,182 @@ mod tests {

Ok(())
}

#[test]
fn test_get_updated_right_ordering_equivalence_properties() -> Result<()> {
let join_type = JoinType::Inner;

let options = SortOptions::default();
let right_oeq_classes = OrderingEquivalentClass::new(
vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("x", 0)),
options,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("y", 1)),
options,
},
],
vec![vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("z", 2)),
options,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("w", 3)),
options,
},
]],
);

let left_columns_len = 4;

let fields: Fields = ["a", "b", "c", "d", "x", "y", "z", "w"]
.into_iter()
.map(|name| Field::new(name, DataType::Int32, true))
.collect();

let mut join_eq_properties =
EquivalenceProperties::new(Arc::new(Schema::new(fields)));
join_eq_properties
.add_equal_conditions((&Column::new("a", 0), &Column::new("x", 4)));
join_eq_properties
.add_equal_conditions((&Column::new("d", 3), &Column::new("w", 7)));

let result = get_updated_right_ordering_equivalence_properties(
&join_type,
&[right_oeq_classes.clone()],
left_columns_len,
&join_eq_properties,
)?;

let expected = OrderingEquivalentClass::new(
vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("a", 0)),
options,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("y", 5)),
options,
},
],
vec![vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("z", 6)),
options,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("d", 3)),
options,
},
]],
);

assert_eq!(result[0].head(), expected.head());
assert_eq!(result[0].others(), expected.others());

Ok(())
}

#[test]
fn test_calculate_join_output_ordering() -> Result<()> {
let options = SortOptions::default();
let left_ordering = vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("a", 0)),
options,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("c", 2)),
options,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("d", 3)),
options,
},
];
let right_ordering = vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("z", 2)),
options,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("y", 1)),
options,
},
];
let join_type = JoinType::Inner;
let on_columns = [(Column::new("b", 1), Column::new("x", 0))];
let left_columns_len = 5;
let maintains_input_orders = [[true, false], [false, true]];
let probe_sides = [Some(JoinSide::Left), Some(JoinSide::Right)];

let expected = [
Some(vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("a", 0)),
options,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("c", 2)),
options,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("d", 3)),
options,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("z", 7)),
options,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("y", 6)),
options,
},
]),
Some(vec![
PhysicalSortExpr {
expr: Arc::new(Column::new("z", 7)),
options,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("y", 6)),
options,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("a", 0)),
options,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("c", 2)),
options,
},
PhysicalSortExpr {
expr: Arc::new(Column::new("d", 3)),
options,
},
]),
];

for (i, (maintains_input_order, probe_side)) in
maintains_input_orders.iter().zip(probe_sides).enumerate()
{
assert_eq!(
calculate_join_output_ordering(
&left_ordering,
&right_ordering,
join_type,
&on_columns,
left_columns_len,
maintains_input_order,
probe_side
)?,
expected[i]
);
}

Ok(())
}
}
4 changes: 2 additions & 2 deletions datafusion/physical-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,6 @@ pub use sort_expr::{
};
pub use utils::{
expr_list_eq_any_order, expr_list_eq_strict_order,
normalize_expr_with_equivalence_properties, normalize_out_expr_with_columns_map,
reverse_order_bys, split_conjunction,
normalize_expr_with_equivalence_properties, normalize_ordering_equivalence_classes,
normalize_out_expr_with_columns_map, reverse_order_bys, split_conjunction,
};
Loading