Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 63 additions & 120 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,8 @@ use arrow::array::{builder::StringBuilder, RecordBatch};
use arrow::compute::SortOptions;
use arrow::datatypes::{Schema, SchemaRef};
use datafusion_common::display::ToStringifiedPlan;
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeVisitor,
};
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::TableReference;
use datafusion_common::{
exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema,
ScalarValue,
Expand All @@ -83,7 +82,7 @@ use datafusion_expr::{
WindowFrameBound, WriteOp,
};
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::expressions::{Column, Literal};
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_expr::{
create_physical_sort_exprs, LexOrdering, PhysicalSortExpr,
};
Expand All @@ -93,7 +92,6 @@ use datafusion_physical_plan::execution_plan::InvariantLevel;
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_physical_plan::recursive_query::RecursiveQueryExec;
use datafusion_physical_plan::unnest::ListUnnest;
use datafusion_sql::TableReference;
use sqlparser::ast::NullTreatment;

use async_trait::async_trait;
Expand Down Expand Up @@ -2185,11 +2183,7 @@ impl DefaultPhysicalPlanner {
let physical_expr =
self.create_physical_expr(e, input_logical_schema, session_state);

// Check for possible column name mismatches
let final_physical_expr =
maybe_fix_physical_column_name(physical_expr, &input_physical_schema);

tuple_err((final_physical_expr, physical_name))
tuple_err((physical_expr, physical_name))
})
.collect::<Result<Vec<_>>>()?;

Expand Down Expand Up @@ -2295,47 +2289,6 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
}
}

// Handle the case where the name of a physical column expression does not match the corresponding physical input fields names.
// Physical column names are derived from the physical schema, whereas physical column expressions are derived from the logical column names.
//
// This is a special case that applies only to column expressions. Logical plans may slightly modify column names by appending a suffix (e.g., using ':'),
// to avoid duplicates—since DFSchemas do not allow duplicate names. For example: `count(Int64(1)):1`.
fn maybe_fix_physical_column_name(
expr: Result<Arc<dyn PhysicalExpr>>,
input_physical_schema: &SchemaRef,
) -> Result<Arc<dyn PhysicalExpr>> {
let Ok(expr) = expr else { return expr };
expr.transform_down(|node| {
if let Some(column) = node.as_any().downcast_ref::<Column>() {
let idx = column.index();
let physical_field = input_physical_schema.field(idx);
let expr_col_name = column.name();
let physical_name = physical_field.name();

if expr_col_name != physical_name {
// handle edge cases where the physical_name contains ':'.
let colon_count = physical_name.matches(':').count();
let mut splits = expr_col_name.match_indices(':');
let split_pos = splits.nth(colon_count);

if let Some((i, _)) = split_pos {
let base_name = &expr_col_name[..i];
if base_name == physical_name {
let updated_column = Column::new(physical_name, idx);
return Ok(Transformed::yes(Arc::new(updated_column)));
}
}
}

// If names already match or fix is not possible, just leave it as it is
Ok(Transformed::no(node))
} else {
Ok(Transformed::no(node))
}
})
.data()
}

struct OptimizationInvariantChecker<'a> {
rule: &'a Arc<dyn PhysicalOptimizerRule + Send + Sync>,
}
Expand Down Expand Up @@ -2439,12 +2392,10 @@ mod tests {
};
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::TaskContext;
use datafusion_expr::{
col, lit, LogicalPlanBuilder, Operator, UserDefinedLogicalNodeCore,
};
use datafusion_expr::builder::subquery_alias;
use datafusion_expr::{col, lit, LogicalPlanBuilder, UserDefinedLogicalNodeCore};
use datafusion_functions_aggregate::count::count_all;
use datafusion_functions_aggregate::expr_fn::sum;
use datafusion_physical_expr::expressions::{BinaryExpr, IsNotNullExpr};
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};

Expand Down Expand Up @@ -3005,71 +2956,6 @@ mod tests {
}
}

#[tokio::test]
async fn test_maybe_fix_colon_in_physical_name() {
// The physical schema has a field name with a colon
let schema = Schema::new(vec![Field::new("metric:avg", DataType::Int32, false)]);
let schema_ref: SchemaRef = Arc::new(schema);

// What might happen after deduplication
let logical_col_name = "metric:avg:1";
let expr_with_suffix =
Arc::new(Column::new(logical_col_name, 0)) as Arc<dyn PhysicalExpr>;
let expr_result = Ok(expr_with_suffix);

// Call function under test
let fixed_expr =
maybe_fix_physical_column_name(expr_result, &schema_ref).unwrap();

// Downcast back to Column so we can check the name
let col = fixed_expr
.as_any()
.downcast_ref::<Column>()
.expect("Column");

assert_eq!(col.name(), "metric:avg");
}

#[tokio::test]
async fn test_maybe_fix_nested_column_name_with_colon() {
let schema = Schema::new(vec![Field::new("column", DataType::Int32, false)]);
let schema_ref: SchemaRef = Arc::new(schema);

// Construct the nested expr
let col_expr = Arc::new(Column::new("column:1", 0)) as Arc<dyn PhysicalExpr>;
let is_not_null_expr = Arc::new(IsNotNullExpr::new(col_expr.clone()));

// Create a binary expression and put the column inside
let binary_expr = Arc::new(BinaryExpr::new(
is_not_null_expr.clone(),
Operator::Or,
is_not_null_expr.clone(),
)) as Arc<dyn PhysicalExpr>;

let fixed_expr =
maybe_fix_physical_column_name(Ok(binary_expr), &schema_ref).unwrap();

let bin = fixed_expr
.as_any()
.downcast_ref::<BinaryExpr>()
.expect("Expected BinaryExpr");

// Check that both sides where renamed
for expr in &[bin.left(), bin.right()] {
let is_not_null = expr
.as_any()
.downcast_ref::<IsNotNullExpr>()
.expect("Expected IsNotNull");

let col = is_not_null
.arg()
.as_any()
.downcast_ref::<Column>()
.expect("Expected Column");

assert_eq!(col.name(), "column");
}
}
struct ErrorExtensionPlanner {}

#[async_trait]
Expand Down Expand Up @@ -3566,4 +3452,61 @@ digraph {

Ok(())
}

// Reproducer for DataFusion issue #17405:
//
// The following SQL is semantically invalid. Notably, the `SELECT left_table.a, right_table.a`
// clause is missing from the explicit logical plan:
//
// SELECT a FROM (
// -- SELECT left_table.a, right_table.a
// FROM left_table
// FULL JOIN right_table ON left_table.a = right_table.a
// ) AS alias
// GROUP BY a;
//
// As a result, the variables within `alias` subquery are not properly distinguished, which
// leads to a bug for logical and physical planning.
//
// The fix is to implicitly insert a Projection node to represent the missing SELECT clause to
// ensure each field is correctly aliased to a unique name when the SubqueryAlias node is added.
#[tokio::test]
async fn subquery_alias_confusing_the_optimizer() -> Result<()> {
let state = make_session_state();

let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let schema = Arc::new(schema);

let table = MemTable::try_new(schema.clone(), vec![vec![]])?;
let table = Arc::new(table);

let source = DefaultTableSource::new(table);
let source = Arc::new(source);

let left = LogicalPlanBuilder::scan("left", source.clone(), None)?;
let right = LogicalPlanBuilder::scan("right", source, None)?.build()?;

let join_keys = (
vec![datafusion_common::Column::new(Some("left"), "a")],
vec![datafusion_common::Column::new(Some("right"), "a")],
);

let join = left.join(right, JoinType::Full, join_keys, None)?.build()?;

let alias = subquery_alias(join, "alias")?;

let planner = DefaultPhysicalPlanner::default();

let logical_plan = LogicalPlanBuilder::new(alias)
.aggregate(vec![col("a:1")], Vec::<Expr>::new())?
.build()?;
let _physical_plan = planner.create_physical_plan(&logical_plan, &state).await?;

let optimized_logical_plan = state.optimize(&logical_plan)?;
let _optimized_physical_plan = planner
.create_physical_plan(&optimized_logical_plan, &state)
.await?;

Ok(())
}
}
122 changes: 71 additions & 51 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! This module provides a builder for creating LogicalPlans

use std::any::Any;
use std::borrow::Cow;
use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use std::iter::once;
Expand Down Expand Up @@ -1517,37 +1518,49 @@ impl ValuesFields {
}
}

// `name_map` tracks a mapping between a field name and the number of appearances of that field.
//
// Some field names might already come to this function with the count (number of times it appeared)
// as a suffix e.g. id:1, so there's still a chance of name collisions, for example,
// if these three fields passed to this function: "col:1", "col" and "col", the function
// would rename them to -> col:1, col, col:1 causing a posteriror error when building the DFSchema.
// that's why we need the `seen` set, so the fields are always unique.
//
pub fn change_redundant_column(fields: &Fields) -> Vec<Field> {
let mut name_map = HashMap::new();
let mut seen: HashSet<String> = HashSet::new();
/// Returns aliases to make field names unique.
///
/// Returns a vector of optional aliases, one per input field. `None` means keep the original name,
/// `Some(alias)` means rename to the alias to ensure uniqueness.
///
/// Used when creating [`SubqueryAlias`] or similar operations that strip table qualifiers but need
/// to maintain unique column names.
///
/// # Example
/// Input fields: `[a, a, b, b, a, a:1]` ([`DFSchema`] valid when duplicate fields have different qualifiers)
/// Returns: `[None, Some("a:1"), None, Some("b:1"), Some("a:2"), Some("a:1:1")]`
pub fn unique_field_aliases(fields: &Fields) -> Vec<Option<String>> {
// Some field names might already come to this function with the count (number of times it appeared)
// as a suffix e.g. id:1, so there's still a chance of name collisions, for example,
// if these three fields passed to this function: "col:1", "col" and "col", the function
// would rename them to -> col:1, col, col:1 causing a posterior error when building the DFSchema.
// That's why we need the `seen` set, so the fields are always unique.

// Tracks a mapping between a field name and the number of appearances of that field.
let mut name_map = HashMap::<&str, usize>::new();
// Tracks all the fields and aliases that were previously seen.
let mut seen = HashSet::<Cow<String>>::new();

fields
.into_iter()
.iter()
.map(|field| {
let base_name = field.name();
let count = name_map.entry(base_name.clone()).or_insert(0);
let mut new_name = base_name.clone();
let original_name = field.name();
let mut name = Cow::Borrowed(original_name);

let count = name_map.entry(original_name).or_insert(0);

// Loop until we find a name that hasn't been used
while seen.contains(&new_name) {
// Loop until we find a name that hasn't been used.
while seen.contains(&name) {
*count += 1;
new_name = format!("{base_name}:{count}");
name = Cow::Owned(format!("{original_name}:{count}"));
}

seen.insert(new_name.clone());
seen.insert(name.clone());

let mut modified_field =
Field::new(&new_name, field.data_type().clone(), field.is_nullable());
modified_field.set_metadata(field.metadata().clone());
modified_field
match name {
Cow::Borrowed(_) => None,
Cow::Owned(alias) => Some(alias),
}
})
.collect()
}
Expand Down Expand Up @@ -2675,34 +2688,6 @@ mod tests {
Ok(())
}

#[test]
fn test_change_redundant_column() -> Result<()> {
let t1_field_1 = Field::new("a", DataType::Int32, false);
let t2_field_1 = Field::new("a", DataType::Int32, false);
let t2_field_3 = Field::new("a", DataType::Int32, false);
let t2_field_4 = Field::new("a:1", DataType::Int32, false);
let t1_field_2 = Field::new("b", DataType::Int32, false);
let t2_field_2 = Field::new("b", DataType::Int32, false);

let field_vec = vec![
t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
];
let remove_redundant = change_redundant_column(&Fields::from(field_vec));

assert_eq!(
remove_redundant,
vec![
Field::new("a", DataType::Int32, false),
Field::new("a:1", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("b:1", DataType::Int32, false),
Field::new("a:2", DataType::Int32, false),
Field::new("a:1:1", DataType::Int32, false),
]
);
Ok(())
}

#[test]
fn plan_builder_from_logical_plan() -> Result<()> {
let plan =
Expand Down Expand Up @@ -2787,4 +2772,39 @@ mod tests {

Ok(())
}

#[test]
fn test_unique_field_aliases() {
let t1_field_1 = Field::new("a", DataType::Int32, false);
let t2_field_1 = Field::new("a", DataType::Int32, false);
let t2_field_3 = Field::new("a", DataType::Int32, false);
let t2_field_4 = Field::new("a:1", DataType::Int32, false);
let t1_field_2 = Field::new("b", DataType::Int32, false);
let t2_field_2 = Field::new("b", DataType::Int32, false);

let fields = vec![
t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
];
let fields = Fields::from(fields);

let remove_redundant = unique_field_aliases(&fields);

// Input [a, a, b, b, a, a:1] becomes [None, a:1, None, b:1, a:2, a:1:1]
// First occurrence of each field name keeps original name (None), duplicates get
// incremental suffixes (:1, :2, etc.).
// Crucially in this case the 2nd occurrence of `a` gets rewritten to `a:1` which later
// conflicts with the last column which is _actually_ called `a:1` so we need to rename it
// as well to `a:1:1`.
assert_eq!(
remove_redundant,
vec![
None,
Some("a:1".to_string()),
None,
Some("b:1".to_string()),
Some("a:2".to_string()),
Some("a:1:1".to_string()),
]
);
}
}
Loading
Loading