Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
e2dcf3a
ready to review
berkaysynnada Apr 25, 2023
80825e7
license added
berkaysynnada Apr 25, 2023
d8cb8a3
simplifications
mustafasrepo Apr 27, 2023
7c92368
simplifications
mustafasrepo Apr 27, 2023
5d48806
sort expr's are taken separately for each table
berkaysynnada Apr 28, 2023
2c7ef16
we can return the sort info of the expression now
berkaysynnada May 5, 2023
12bd12e
check filter conditions
berkaysynnada May 5, 2023
37d302b
simplifications
mustafasrepo May 8, 2023
a5620ad
simplifications
mustafasrepo May 8, 2023
da47a4f
Merge branch 'main' into feature/determine-prunability
berkaysynnada May 8, 2023
8c22653
functions are implemented for SortInfo calculations
berkaysynnada May 9, 2023
51c63c5
node specialized tableSide functions
berkaysynnada May 9, 2023
5d8ef5a
NotImplemented errors are added, test comments are added
berkaysynnada May 9, 2023
01692bb
Comment change
berkaysynnada May 9, 2023
ff058ef
Simplify comparison node calculations
berkaysynnada May 10, 2023
e7c9479
Simplfications and better commenting
ozankabak May 11, 2023
78fbdd2
Merge branch 'main' into feature/determine-prunability
berkaysynnada May 11, 2023
9043e39
is_prunable function is updated with new Prunability function
berkaysynnada May 11, 2023
d0d1e99
Merge branch 'apache:main' into feature/determine-prunability
berkaysynnada May 11, 2023
665db27
Indices of sort expressions are updated with intermediate schema colu…
berkaysynnada May 12, 2023
b258966
Unused function is removed
berkaysynnada May 12, 2023
8d58f88
Future-proof index updating
berkaysynnada May 12, 2023
2bd3acb
An if let check is removed
berkaysynnada May 12, 2023
1071f80
Merge branch 'apache:main' into feature/determine-prunability
berkaysynnada May 16, 2023
15eb8ad
simplifications
mustafasrepo May 16, 2023
09f48b7
Simplifications
mustafasrepo May 16, 2023
8e7ad40
simplifications
mustafasrepo May 16, 2023
30d0e67
Change if condition
mustafasrepo May 16, 2023
b75ab93
Determine prunability of tables for join operations (#90)
berkaysynnada Jul 19, 2023
edf99cd
Merge branch 'main' into feature/determine-prunability
berkaysynnada Jul 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 105 additions & 3 deletions datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::joins::utils::{JoinFilter, JoinSide};
use crate::physical_plan::joins::SymmetricHashJoinExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use datafusion_common::config::OptimizerOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::DataFusionError;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::intervals::prunability::{ExprPrunabilityGraph, TableSide};
use datafusion_physical_expr::intervals::{check_support, is_datatype_supported};
use datafusion_physical_expr::PhysicalSortExpr;
use std::sync::Arc;

/// The PipelineChecker rule rejects non-runnable query plans that use
Expand Down Expand Up @@ -138,7 +142,8 @@ pub fn check_finiteness_requirements(
) -> Result<Transformed<PipelineStatePropagator>> {
if let Some(exec) = input.plan.as_any().downcast_ref::<SymmetricHashJoinExec>() {
if !(optimizer_options.allow_symmetric_joins_without_pruning
|| (exec.check_if_order_information_available()? && is_prunable(exec)))
|| (exec.check_if_order_information_available()?
&& is_prunable(exec, &input.children_unbounded)))
{
const MSG: &str = "Join operation cannot operate on a non-prunable stream without enabling \
the 'allow_symmetric_joins_without_pruning' configuration flag";
Expand All @@ -161,17 +166,114 @@ pub fn check_finiteness_requirements(
///
/// [`PhysicalExpr`]: crate::physical_plan::PhysicalExpr
/// [`Operator`]: datafusion_expr::Operator
fn is_prunable(join: &SymmetricHashJoinExec) -> bool {
join.filter().map_or(false, |filter| {
fn is_prunable(join: &SymmetricHashJoinExec, children_unbounded: &[bool]) -> bool {
if join.filter().map_or(false, |filter| {
check_support(filter.expression())
&& filter
.schema()
.fields()
.iter()
.all(|f| is_datatype_supported(f.data_type()))
}) {
if let Some(filter) = join.filter() {
if let Ok(mut graph) =
ExprPrunabilityGraph::try_new((*filter.expression()).clone())
{
let left_sort_expr = join.left().output_ordering().map(|s| s[0].clone());
let right_sort_expr =
join.right().output_ordering().map(|s| s[0].clone());
let new_left_sort = get_sort_expr_in_filter_schema(
&left_sort_expr,
filter,
JoinSide::Left,
);
let new_right_sort = get_sort_expr_in_filter_schema(
&right_sort_expr,
filter,
JoinSide::Right,
);
if let Ok((table_side, _)) =
graph.analyze_prunability(&new_left_sort, &new_right_sort)
{
return prunability_for_unbounded_tables(
children_unbounded[0],
children_unbounded[1],
&table_side,
);
}
}
}
}
false
}

// Updates index of the column with the new index (if PhysicalExpr is Column)
fn update_column_index(
sort_expr: &Option<PhysicalSortExpr>,
updated_idx: usize,
) -> Option<PhysicalSortExpr> {
sort_expr.as_ref().and_then(|sort_expr| {
sort_expr
.expr
.as_any()
.downcast_ref::<Column>()
.map(|column| {
let sort_name = column.name();
let options = sort_expr.options;
let expr = Arc::new(Column::new(sort_name, updated_idx));
PhysicalSortExpr { expr, options }
})
})
}

fn get_sort_expr_in_filter_schema(
sort_expr: &Option<PhysicalSortExpr>,
filter: &JoinFilter,
side: JoinSide,
) -> Option<PhysicalSortExpr> {
let sorted_column_index_in_filter = find_index_in_filter(filter, sort_expr, side);
sorted_column_index_in_filter.and_then(|idx| update_column_index(sort_expr, idx))
}

fn find_index_in_filter(
join_filter: &JoinFilter,
left_sort_expr: &Option<PhysicalSortExpr>,
join_side: JoinSide,
) -> Option<usize> {
for (i, (field, column_index)) in join_filter
.schema()
.fields()
.iter()
.zip(join_filter.column_indices())
.enumerate()
{
if let Some(physical_sort) = left_sort_expr {
if let Some(column) = physical_sort.expr.as_any().downcast_ref::<Column>() {
if column.name() == field.name() && column_index.side == join_side {
return Some(i);
}
}
}
}
None
}

fn prunability_for_unbounded_tables(
left_unbounded: bool,
right_unbounded: bool,
table_side: &TableSide,
) -> bool {
let (left_prunable, right_prunable) = match table_side {
TableSide::Left => (true, false),
TableSide::Right => (false, true),
TableSide::Both => (true, true),
TableSide::None => (false, false),
};
// If both sides are either bounded or prunable, return true (Can do calculations with bounded memory)
// Otherwise return false (Cannot do calculations with bounded memory)
(!left_unbounded || left_prunable) && (!right_unbounded || right_prunable)
}

#[cfg(test)]
mod sql_tests {
use super::*;
Expand Down
53 changes: 53 additions & 0 deletions datafusion/core/tests/sqllogictests/test_files/join.slt
Original file line number Diff line number Diff line change
Expand Up @@ -590,3 +590,56 @@ drop table IF EXISTS full_join_test;
# batch size
statement ok
set datafusion.execution.batch_size = 8192;

# test joins give error with unbounded tables by the analysis of prunability
statement ok
CREATE external table t1(a1 integer, a2 integer, a3 integer)
STORED as CSV
WITH HEADER ROW
WITH ORDER (a2 ASC, a3 ASC)
OPTIONS('infinite_source' 'true')
LOCATION 'tests/data/empty.csv';

statement ok
CREATE external table t2(a1 integer, a2 integer, a3 integer)
STORED as CSV
WITH HEADER ROW
WITH ORDER (a2 ASC, a3 ASC)
OPTIONS('infinite_source' 'true')
LOCATION 'tests/data/empty.csv';

statement ok
set datafusion.optimizer.allow_symmetric_joins_without_pruning = false

# query with a filter causing table to be not prunable
query error DataFusion error: PipelineChecker
SELECT t1.a1, t1.a2, t1.a3, t2.a1, t2.a2, t2.a3
FROM t1
JOIN t2
ON t1.a1 = t2.a1
WHERE t1.a3 < t2.a3 AND t1.a3 >= t2.a3

# query with a filter causing table to be prunable
statement ok
SELECT t1.a1, t1.a2, t1.a3, t2.a1, t2.a2, t2.a3
FROM t1
JOIN t2
ON t1.a1 = t2.a1
WHERE t2.a2 >= t1.a2 AND t1.a2 > t2.a2

statement ok
set datafusion.optimizer.allow_symmetric_joins_without_pruning = true

# query with a filter causing table to be not prunable, but the join is allowed
statement ok
SELECT t1.a1, t1.a2, t1.a3, t2.a1, t2.a2, t2.a3
FROM t1
JOIN t2
ON t1.a1 = t2.a1
WHERE t1.a2 < t2.a2 AND t1.a2 < t2.a2

statement ok
drop table t1;

statement ok
drop table t2;
1 change: 1 addition & 0 deletions datafusion/physical-expr/src/intervals/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

pub mod cp_solver;
pub mod interval_aritmetic;
pub mod prunability;
pub mod rounding;

pub mod test_utils;
Expand Down
Loading