Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 78 additions & 2 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,26 @@ impl ExprSchemable for Expr {
_ => expr.get_type(schema),
},
Expr::Negative(expr) => expr.get_type(schema),
Expr::Column(c) => Ok(schema.data_type(c)?.clone()),
Expr::Column(c) => {
// First try to resolve the column as-is
match schema.data_type(c) {
Ok(data_type) => Ok(data_type.clone()),
Err(e) => {
// If the column has a qualifier but wasn't found, try without the qualifier
// This handles cases where aggregations produce unqualified schemas
// but subsequent operations still reference the qualified names
if c.relation.is_some() {
let unqualified = Column::new_unqualified(&c.name);
match schema.data_type(&unqualified) {
Ok(data_type) => Ok(data_type.clone()),
Err(_) => Err(e), // Return the original error
}
} else {
Err(e)
}
}
}
}
Expr::OuterReferenceColumn(ty, _) => Ok(ty.clone()),
Expr::ScalarVariable(ty, _) => Ok(ty.clone()),
Expr::Literal(l, _) => Ok(l.data_type()),
Expand Down Expand Up @@ -275,7 +294,26 @@ impl ExprSchemable for Expr {
|| low.nullable(input_schema)?
|| high.nullable(input_schema)?),

Expr::Column(c) => input_schema.nullable(c),
Expr::Column(c) => {
// First try to resolve the column as-is
match input_schema.nullable(c) {
Ok(nullable) => Ok(nullable),
Err(e) => {
// If the column has a qualifier but wasn't found, try without the qualifier
// This handles cases where aggregations produce unqualified schemas
// but subsequent operations still reference the qualified names
if c.relation.is_some() {
let unqualified = Column::new_unqualified(&c.name);
match input_schema.nullable(&unqualified) {
Ok(nullable) => Ok(nullable),
Err(_) => Err(e), // Return the original error
}
} else {
Err(e)
}
}
}
}
Expr::OuterReferenceColumn(_, _) => Ok(true),
Expr::Literal(value, _) => Ok(value.is_null()),
Expr::Case(case) => {
Expand Down Expand Up @@ -778,6 +816,7 @@ pub fn cast_subquery(subquery: Subquery, cast_to_type: &DataType) -> Result<Subq
mod tests {
use super::*;
use crate::{col, lit};
use arrow::datatypes::Field;

use datafusion_common::{internal_err, DFSchema, HashMap, ScalarValue};

Expand Down Expand Up @@ -881,6 +920,43 @@ mod tests {
);
}

#[test]
fn test_qualified_column_after_aggregation() {
// Test for qualified column reference resolution after aggregation
// This test verifies the fix for the issue where binary expressions
// fail when referencing qualified column names after aggregation
// produces unqualified schemas.

// Create a schema that simulates the result of an aggregation
// where the output field is unqualified (just "value")
let unqualified_schema = DFSchema::from_unqualified_fields(
vec![Field::new("value", DataType::Float64, false)].into(),
std::collections::HashMap::new(),
)
.unwrap();

// Create a qualified column reference as would be produced
// in a query like: avg(memory_usage_bytes) / 1024
// where the aggregation produces "value" but the binary expression
// still references the original qualified name
let qualified_col = col("memory_usage_bytes.value");

// Before the fix, this would fail with:
// "No field named memory_usage_bytes.value. Valid fields are value."
// After the fix, it should successfully resolve to the unqualified "value" field
let data_type = qualified_col.get_type(&unqualified_schema).unwrap();
assert_eq!(data_type, DataType::Float64);

// Test nullable resolution as well
let nullable = qualified_col.nullable(&unqualified_schema).unwrap();
assert!(!nullable);

// Test with binary expression
let expr = qualified_col / lit(1024);
let data_type = expr.get_type(&unqualified_schema).unwrap();
assert_eq!(data_type, DataType::Float64);
}

#[test]
fn test_expr_metadata() {
let mut meta = HashMap::new();
Expand Down