Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement semi/anti join output statistics estimation #9800

Merged
merged 2 commits into from
Mar 30, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
332 changes: 282 additions & 50 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -825,27 +825,27 @@ fn estimate_join_cardinality(
right_stats: Statistics,
on: &JoinOn,
) -> Option<PartialJoinStatistics> {
let (left_col_stats, right_col_stats) = on
.iter()
.map(|(left, right)| {
match (
left.as_any().downcast_ref::<Column>(),
right.as_any().downcast_ref::<Column>(),
) {
(Some(left), Some(right)) => (
left_stats.column_statistics[left.index()].clone(),
right_stats.column_statistics[right.index()].clone(),
),
_ => (
ColumnStatistics::new_unknown(),
ColumnStatistics::new_unknown(),
),
}
})
.unzip::<_, _, Vec<_>, Vec<_>>();

match join_type {
JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
let (left_col_stats, right_col_stats) = on
.iter()
.map(|(left, right)| {
match (
left.as_any().downcast_ref::<Column>(),
right.as_any().downcast_ref::<Column>(),
) {
(Some(left), Some(right)) => (
left_stats.column_statistics[left.index()].clone(),
right_stats.column_statistics[right.index()].clone(),
),
_ => (
ColumnStatistics::new_unknown(),
ColumnStatistics::new_unknown(),
),
}
})
.unzip::<_, _, Vec<_>, Vec<_>>();

let ij_cardinality = estimate_inner_join_cardinality(
Statistics {
num_rows: left_stats.num_rows.clone(),
Expand Down Expand Up @@ -888,10 +888,45 @@ fn estimate_join_cardinality(
})
}

JoinType::LeftSemi
| JoinType::RightSemi
| JoinType::LeftAnti
| JoinType::RightAnti => None,
JoinType::LeftSemi | JoinType::LeftAnti => {
let cardinality = estimate_semi_join_cardinality(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't seem correct to me that the same calculation is used for both Semi and Anti joins (shouldn't they be the inverse of each other?)

Copy link
Contributor Author

@korowa korowa Mar 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, they were not correct. I've changed estimations a bit -- now disjoint statistics affects only semi-joins (filtering outer table should produce zero rows). For anti-joins, disjoint inputs don't seem to make much sense -- if statistics are non-overlapping the result will be equal to outer num_rows side, otherwise (having no info or overlapping statistics) -- it still will be estimated as outer side, since we know nothing about actual distribution besides min/max, and assuming that all rows will be filtered out is too much (may significantly affect further planning)

Statistics {
num_rows: left_stats.num_rows.clone(),
total_byte_size: Precision::Absent,
column_statistics: left_col_stats,
},
Statistics {
num_rows: right_stats.num_rows.clone(),
total_byte_size: Precision::Absent,
column_statistics: right_col_stats,
},
)?;

Some(PartialJoinStatistics {
num_rows: *cardinality.get_value()?,
column_statistics: left_stats.column_statistics,
})
}

JoinType::RightSemi | JoinType::RightAnti => {
let cardinality = estimate_semi_join_cardinality(
Statistics {
num_rows: right_stats.num_rows.clone(),
total_byte_size: Precision::Absent,
column_statistics: right_col_stats,
},
Statistics {
num_rows: left_stats.num_rows.clone(),
total_byte_size: Precision::Absent,
column_statistics: left_col_stats,
},
)?;

Some(PartialJoinStatistics {
num_rows: *cardinality.get_value()?,
column_statistics: right_stats.column_statistics,
})
}
}
}

Expand All @@ -903,6 +938,11 @@ fn estimate_inner_join_cardinality(
left_stats: Statistics,
right_stats: Statistics,
) -> Option<Precision<usize>> {
// Immediatedly return if inputs considered as non-overlapping
if let Some(estimation) = estimate_disjoint_inputs(&left_stats, &right_stats) {
return Some(estimation);
};

// The algorithm here is partly based on the non-histogram selectivity estimation
// from Spark's Catalyst optimizer.
let mut join_selectivity = Precision::Absent;
Expand All @@ -911,30 +951,13 @@ fn estimate_inner_join_cardinality(
.iter()
.zip(right_stats.column_statistics.iter())
{
// If there is no overlap in any of the join columns, this means the join
// itself is disjoint and the cardinality is 0. Though we can only assume
// this when the statistics are exact (since it is a very strong assumption).
if left_stat.min_value.get_value()? > right_stat.max_value.get_value()? {
return Some(
if left_stat.min_value.is_exact().unwrap_or(false)
&& right_stat.max_value.is_exact().unwrap_or(false)
{
Precision::Exact(0)
} else {
Precision::Inexact(0)
},
);
}
if left_stat.max_value.get_value()? < right_stat.min_value.get_value()? {
return Some(
if left_stat.max_value.is_exact().unwrap_or(false)
&& right_stat.min_value.is_exact().unwrap_or(false)
{
Precision::Exact(0)
} else {
Precision::Inexact(0)
},
);
// Break if any of statistics bounds are undefined
if left_stat.min_value.get_value().is_none()
|| left_stat.max_value.get_value().is_none()
|| right_stat.min_value.get_value().is_none()
|| right_stat.max_value.get_value().is_none()
{
return None;
}

let left_max_distinct = max_distinct_count(&left_stats.num_rows, left_stat);
Expand Down Expand Up @@ -968,6 +991,78 @@ fn estimate_inner_join_cardinality(
}
}

/// Estimates semi join cardinality based on statistics.
///
/// The estimation result is either zero, in cases inputs statistics are non-overlapping
/// or equal to number of rows for outer input.
fn estimate_semi_join_cardinality(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Long term it would be really nice to pull these types of calculations into some trait (aka an extensibility API)

outer_stats: Statistics,
inner_stats: Statistics,
) -> Option<Precision<usize>> {
// Immediatedly return if inputs considered as non-overlapping
if let Some(estimation) = estimate_disjoint_inputs(&outer_stats, &inner_stats) {
return Some(estimation);
};

// Otherwise estimate SemiJoin output as whole outer side
outer_stats
.num_rows
.get_value()
.map(|val| Precision::Inexact(*val))
}

/// Estimates if inputs are non-overlapping, using input statistics.
/// If inputs are disjoint, returns zero estimation, otherwise returns None
fn estimate_disjoint_inputs(
left_stats: &Statistics,
right_stats: &Statistics,
) -> Option<Precision<usize>> {
for (left_stat, right_stat) in left_stats
.column_statistics
.iter()
.zip(right_stats.column_statistics.iter())
{
// If there is no overlap in any of the join columns, this means the join
// itself is disjoint and the cardinality is 0. Though we can only assume
// this when the statistics are exact (since it is a very strong assumption).
let left_min_val = left_stat.min_value.get_value();
let right_max_val = right_stat.max_value.get_value();
if left_min_val.is_some()
&& right_max_val.is_some()
&& left_min_val > right_max_val
{
return Some(
if left_stat.min_value.is_exact().unwrap_or(false)
&& right_stat.max_value.is_exact().unwrap_or(false)
{
Precision::Exact(0)
} else {
Precision::Inexact(0)
},
);
}

let left_max_val = left_stat.max_value.get_value();
let right_min_val = right_stat.min_value.get_value();
if left_max_val.is_some()
&& right_min_val.is_some()
&& left_max_val < right_min_val
{
return Some(
if left_stat.max_value.is_exact().unwrap_or(false)
&& right_stat.min_value.is_exact().unwrap_or(false)
{
Precision::Exact(0)
} else {
Precision::Inexact(0)
},
);
}
}

None
}

/// Estimate the number of maximum distinct values that can be present in the
/// given column from its statistics. If distinct_count is available, uses it
/// directly. Otherwise, if the column is numeric and has min/max values, it
Expand Down Expand Up @@ -1716,9 +1811,11 @@ mod tests {
#[test]
fn test_inner_join_cardinality_single_column() -> Result<()> {
let cases: Vec<(PartialStats, PartialStats, Option<Precision<usize>>)> = vec![
// -----------------------------------------------------------------------------
// | left(rows, min, max, distinct), right(rows, min, max, distinct), expected |
// -----------------------------------------------------------------------------
// ------------------------------------------------
// | left(rows, min, max, distinct, null_count), |
// | right(rows, min, max, distinct, null_count), |
// | expected, |
// ------------------------------------------------

// Cardinality computation
// =======================
Expand Down Expand Up @@ -1824,6 +1921,11 @@ mod tests {
None,
),
// Non overlapping min/max (when exact=False).
(
(10, Absent, Inexact(4), Absent, Absent),
(10, Inexact(5), Absent, Absent, Absent),
Some(Inexact(0)),
),
(
(10, Inexact(0), Inexact(10), Absent, Absent),
(10, Inexact(11), Inexact(20), Absent, Absent),
Expand Down Expand Up @@ -2106,6 +2208,136 @@ mod tests {
Ok(())
}

#[test]
fn estimate_semi_join_cardinality_absent_rows() -> Result<()> {
let cases: Vec<(PartialStats, PartialStats, Option<Precision<usize>>)> = vec![
// ------------------------------------------------
// | outer(rows, min, max, distinct, null_count), |
// | inner(rows, min, max, distinct, null_count), |
// | expected, |
// ------------------------------------------------

// Cardinality computation
// =======================
//
// distinct(left) == NaN, distinct(right) == NaN
(
(50, Inexact(10), Inexact(20), Absent, Absent),
(10, Inexact(15), Inexact(25), Absent, Absent),
Some(Inexact(50)),
),
(
(10, Absent, Absent, Absent, Absent),
(50, Absent, Absent, Absent, Absent),
Some(Inexact(10)),
),
(
(50, Inexact(10), Inexact(20), Absent, Absent),
(10, Inexact(30), Inexact(40), Absent, Absent),
Some(Inexact(0)),
),
(
(50, Inexact(10), Absent, Absent, Absent),
(10, Absent, Inexact(5), Absent, Absent),
Some(Inexact(0)),
),
(
(50, Absent, Inexact(20), Absent, Absent),
(10, Inexact(30), Absent, Absent, Absent),
Some(Inexact(0)),
),
];

for (outer_info, inner_info, expected_cardinality) in cases {
let outer_num_rows = outer_info.0;
let outer_col_stats = vec![create_column_stats(
outer_info.1,
outer_info.2,
outer_info.3,
outer_info.4,
)];

let inner_num_rows = inner_info.0;
let inner_col_stats = vec![create_column_stats(
inner_info.1,
inner_info.2,
inner_info.3,
inner_info.4,
)];

assert_eq!(
estimate_semi_join_cardinality(
Statistics {
num_rows: Inexact(outer_num_rows),
total_byte_size: Absent,
column_statistics: outer_col_stats,
},
Statistics {
num_rows: Inexact(inner_num_rows),
total_byte_size: Absent,
column_statistics: inner_col_stats,
},
),
expected_cardinality
);
}

Ok(())
}

#[test]
fn test_semi_join_cardinality() -> Result<()> {
let dummy_column_stats =
vec![create_column_stats(Absent, Absent, Absent, Absent)];

let absent_outer_estimation = estimate_semi_join_cardinality(
Statistics {
num_rows: Absent,
total_byte_size: Absent,
column_statistics: dummy_column_stats.clone(),
},
Statistics {
num_rows: Exact(10),
total_byte_size: Absent,
column_statistics: dummy_column_stats.clone(),
},
);
assert_eq!(
absent_outer_estimation, None,
"Expected \"None\" esimated SemiJoin cardinality for absent outer num_rows"
);

let absent_inner_estimation = estimate_semi_join_cardinality(
Statistics {
num_rows: Inexact(500),
total_byte_size: Absent,
column_statistics: dummy_column_stats.clone(),
},
Statistics {
num_rows: Absent,
total_byte_size: Absent,
column_statistics: dummy_column_stats.clone(),
},
);
assert_eq!(absent_inner_estimation, Some(Inexact(500)), "Expected outer.num_rows esimated SemiJoin cardinality for absent inner num_rows");

let absent_inner_estimation = estimate_semi_join_cardinality(
Statistics {
num_rows: Absent,
total_byte_size: Absent,
column_statistics: dummy_column_stats.clone(),
},
Statistics {
num_rows: Absent,
total_byte_size: Absent,
column_statistics: dummy_column_stats.clone(),
},
);
assert_eq!(absent_inner_estimation, None, "Expected \"None\" esimated SemiJoin cardinality for absent outer and inner num_rows");

Ok(())
}

#[test]
fn test_calculate_join_output_ordering() -> Result<()> {
let options = SortOptions::default();
Expand Down
Loading