Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 1 addition & 74 deletions datafusion/common/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use std::collections::HashSet;
use std::convert::Infallible;
use std::fmt;
use std::str::FromStr;
use std::sync::Arc;

/// A named reference to a qualified field in a schema.
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
Expand Down Expand Up @@ -156,79 +155,6 @@ impl Column {
}
}

/// Qualify column if not done yet.
///
/// If this column already has a [relation](Self::relation), it will be returned as is and the given parameters are
/// ignored. Otherwise this will search through the given schemas to find the column. This will use the first schema
/// that matches.
///
/// A schema matches if there is a single column that -- when unqualified -- matches this column. There is an
/// exception for `USING` statements, see below.
///
/// # Using columns
/// Take the following SQL statement:
///
/// ```sql
/// SELECT id FROM t1 JOIN t2 USING(id)
/// ```
///
/// In this case, both `t1.id` and `t2.id` will match unqualified column `id`. To express this possibility, use
/// `using_columns`. Each entry in this array is a set of columns that are bound together via a `USING` clause. So
/// in this example this would be `[{t1.id, t2.id}]`.
#[deprecated(
since = "20.0.0",
note = "use normalize_with_schemas_and_ambiguity_check instead"
)]
pub fn normalize_with_schemas(
self,
schemas: &[&Arc<DFSchema>],
using_columns: &[HashSet<Column>],
) -> Result<Self> {
if self.relation.is_some() {
return Ok(self);
}

for schema in schemas {
let qualified_fields =
schema.qualified_fields_with_unqualified_name(&self.name);
match qualified_fields.len() {
0 => continue,
1 => {
return Ok(Column::from(qualified_fields[0]));
}
_ => {
// More than 1 fields in this schema have their names set to self.name.
//
// This should only happen when a JOIN query with USING constraint references
// join columns using unqualified column name. For example:
//
// ```sql
// SELECT id FROM t1 JOIN t2 USING(id)
// ```
//
// In this case, both `t1.id` and `t2.id` will match unqualified column `id`.
// We will use the relation from the first matched field to normalize self.

// Compare matched fields with one USING JOIN clause at a time
let columns = schema.columns_with_unqualified_name(&self.name);
for using_col in using_columns {
let all_matched = columns.iter().all(|f| using_col.contains(f));
// All matched fields belong to the same using column set, in orther words
// the same join clause. We simply pick the qualifier from the first match.
if all_matched {
return Ok(columns[0].clone());
}
}
}
}
}

_schema_err!(SchemaError::FieldNotFound {
field: Box::new(Column::new(self.relation.clone(), self.name)),
valid_fields: schemas.iter().flat_map(|s| s.columns()).collect(),
})
}

/// Qualify column if not done yet.
///
/// If this column already has a [relation](Self::relation), it will be returned as is and the given parameters are
Expand Down Expand Up @@ -381,6 +307,7 @@ mod tests {
use super::*;
use arrow::datatypes::DataType;
use arrow_schema::SchemaBuilder;
use std::sync::Arc;

fn create_qualified_schema(qualifier: &str, names: Vec<&str>) -> Result<DFSchema> {
let mut schema_builder = SchemaBuilder::new();
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/expr_rewriter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub trait FunctionRewrite {
) -> Result<Transformed<Expr>>;
}

/// Recursively call [`Column::normalize_with_schemas`] on all [`Column`] expressions
/// Recursively call `LogicalPlanBuilder::normalize` on all [`Column`] expressions
/// in the `expr` expression tree.
pub fn normalize_col(expr: Expr, plan: &LogicalPlan) -> Result<Expr> {
expr.transform(|expr| {
Expand Down