diff --git a/datafusion-common/src/dfschema.rs b/datafusion-common/src/dfschema.rs index 6a3dcb050e2d..89a8b8c3cecc 100644 --- a/datafusion-common/src/dfschema.rs +++ b/datafusion-common/src/dfschema.rs @@ -222,6 +222,14 @@ impl DFSchema { } } + /// Find all fields having the given qualifier + pub fn fields_with_qualified(&self, qualifier: &str) -> Vec<&DFField> { + self.fields + .iter() + .filter(|field| field.qualifier().map(|q| q.eq(qualifier)).unwrap_or(false)) + .collect() + } + /// Find all fields match the given name pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&DFField> { self.fields diff --git a/datafusion-expr/src/expr.rs b/datafusion-expr/src/expr.rs index 081e9f8c1f37..4bad6e31f39a 100644 --- a/datafusion-expr/src/expr.rs +++ b/datafusion-expr/src/expr.rs @@ -228,6 +228,8 @@ pub enum Expr { }, /// Represents a reference to all fields in a schema. Wildcard, + /// Represents a reference to all fields in a specific schema. + QualifiedWildcard { qualifier: String }, } /// Fixed seed for the hashing so that Ords are consistent across runs @@ -512,6 +514,7 @@ impl fmt::Debug for Expr { } } Expr::Wildcard => write!(f, "*"), + Expr::QualifiedWildcard { qualifier } => write!(f, "{}.*", qualifier), Expr::GetIndexedField { ref expr, key } => { write!(f, "({:?})[{}]", expr, key) } @@ -696,6 +699,9 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result { Expr::Wildcard => Err(DataFusionError::Internal( "Create name does not support wildcard".to_string(), )), + Expr::QualifiedWildcard { .. } => Err(DataFusionError::Internal( + "Create name does not support qualified wildcard".to_string(), + )), } } diff --git a/datafusion/src/datasource/listing/helpers.rs b/datafusion/src/datasource/listing/helpers.rs index 92f4511845ad..1c62f579ac7c 100644 --- a/datafusion/src/datasource/listing/helpers.rs +++ b/datafusion/src/datasource/listing/helpers.rs @@ -108,7 +108,8 @@ impl ExpressionVisitor for ApplicabilityVisitor<'_> { | Expr::AggregateFunction { .. } | Expr::Sort { .. } | Expr::WindowFunction { .. } - | Expr::Wildcard => { + | Expr::Wildcard + | Expr::QualifiedWildcard { .. } => { *self.is_applicable = false; Recursion::Stop(self) } diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index d0bfb5c1f5e0..5400e9682a5b 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -1029,6 +1029,8 @@ pub fn project_with_alias( Expr::Wildcard => { projected_expr.extend(expand_wildcard(input_schema, &plan)?) } + Expr::QualifiedWildcard { ref qualifier } => projected_expr + .extend(expand_qualified_wildcard(qualifier, input_schema, &plan)?), _ => projected_expr .push(columnize_expr(normalize_col(e, &plan)?, input_schema)), } @@ -1090,6 +1092,27 @@ pub(crate) fn expand_wildcard( } } +pub(crate) fn expand_qualified_wildcard( + qualifier: &str, + schema: &DFSchema, + plan: &LogicalPlan, +) -> Result> { + let qualified_fields: Vec = schema + .fields_with_qualified(qualifier) + .into_iter() + .cloned() + .collect(); + if qualified_fields.is_empty() { + return Err(DataFusionError::Plan(format!( + "Invalid qualifier {}", + qualifier + ))); + } + let qualifier_schema = + DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone())?; + expand_wildcard(&qualifier_schema, plan) +} + #[cfg(test)] mod tests { use arrow::datatypes::{DataType, Field}; diff --git a/datafusion/src/logical_plan/expr_rewriter.rs b/datafusion/src/logical_plan/expr_rewriter.rs index d20b7633059d..b8afa8a367bd 100644 --- a/datafusion/src/logical_plan/expr_rewriter.rs +++ b/datafusion/src/logical_plan/expr_rewriter.rs @@ -218,6 +218,9 @@ impl ExprRewritable for Expr { negated, }, Expr::Wildcard => Expr::Wildcard, + Expr::QualifiedWildcard { qualifier } => { + Expr::QualifiedWildcard { qualifier } + } Expr::GetIndexedField { expr, key } => Expr::GetIndexedField { expr: rewrite_boxed(expr, rewriter)?, key, diff --git a/datafusion/src/logical_plan/expr_schema.rs b/datafusion/src/logical_plan/expr_schema.rs index 49025cea8db6..59341adbd057 100644 --- a/datafusion/src/logical_plan/expr_schema.rs +++ b/datafusion/src/logical_plan/expr_schema.rs @@ -116,6 +116,10 @@ impl ExprSchemable for Expr { Expr::Wildcard => Err(DataFusionError::Internal( "Wildcard expressions are not valid in a logical query plan".to_owned(), )), + Expr::QualifiedWildcard { .. } => Err(DataFusionError::Internal( + "QualifiedWildcard expressions are not valid in a logical query plan" + .to_owned(), + )), Expr::GetIndexedField { ref expr, key } => { let data_type = expr.get_type(schema)?; @@ -178,6 +182,10 @@ impl ExprSchemable for Expr { Expr::Wildcard => Err(DataFusionError::Internal( "Wildcard expressions are not valid in a logical query plan".to_owned(), )), + Expr::QualifiedWildcard { .. } => Err(DataFusionError::Internal( + "QualifiedWildcard expressions are not valid in a logical query plan" + .to_owned(), + )), Expr::GetIndexedField { ref expr, key } => { let data_type = expr.get_type(input_schema)?; get_indexed_field(&data_type, key).map(|x| x.is_nullable()) diff --git a/datafusion/src/logical_plan/expr_visitor.rs b/datafusion/src/logical_plan/expr_visitor.rs index 27c0a5a5b4fd..27fa9d3c79fa 100644 --- a/datafusion/src/logical_plan/expr_visitor.rs +++ b/datafusion/src/logical_plan/expr_visitor.rs @@ -106,7 +106,8 @@ impl ExprVisitable for Expr { Expr::Column(_) | Expr::ScalarVariable(_, _) | Expr::Literal(_) - | Expr::Wildcard => Ok(visitor), + | Expr::Wildcard + | Expr::QualifiedWildcard { .. } => Ok(visitor), Expr::BinaryExpr { left, right, .. } => { let visitor = left.accept(visitor)?; right.accept(visitor) diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs b/datafusion/src/optimizer/common_subexpr_eliminate.rs index 1c0cb7310306..b2884ea969d5 100644 --- a/datafusion/src/optimizer/common_subexpr_eliminate.rs +++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs @@ -460,6 +460,10 @@ impl ExprIdentifierVisitor<'_> { Expr::Wildcard => { desc.push_str("Wildcard-"); } + Expr::QualifiedWildcard { qualifier } => { + desc.push_str("QualifiedWildcard-"); + desc.push_str(qualifier); + } Expr::GetIndexedField { key, .. } => { desc.push_str("GetIndexedField-"); desc.push_str(&key.to_string()); diff --git a/datafusion/src/optimizer/simplify_expressions.rs b/datafusion/src/optimizer/simplify_expressions.rs index f46b11e6bbb8..e95d2e8589d8 100644 --- a/datafusion/src/optimizer/simplify_expressions.rs +++ b/datafusion/src/optimizer/simplify_expressions.rs @@ -377,7 +377,8 @@ impl<'a> ConstEvaluator<'a> { | Expr::Column(_) | Expr::WindowFunction { .. } | Expr::Sort { .. } - | Expr::Wildcard => false, + | Expr::Wildcard + | Expr::QualifiedWildcard { .. } => false, Expr::ScalarFunction { fun, .. } => Self::volatility_ok(fun.volatility()), Expr::ScalarUDF { fun, .. } => Self::volatility_ok(fun.signature.volatility), Expr::Literal(_) diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs index 121f8767441a..14e5ffa8c181 100644 --- a/datafusion/src/optimizer/utils.rs +++ b/datafusion/src/optimizer/utils.rs @@ -84,6 +84,7 @@ impl ExpressionVisitor for ColumnNameVisitor<'_> { | Expr::AggregateUDF { .. } | Expr::InList { .. } | Expr::Wildcard + | Expr::QualifiedWildcard { .. } | Expr::GetIndexedField { .. } => {} } Ok(Recursion::Continue(self)) @@ -350,6 +351,10 @@ pub fn expr_sub_expressions(expr: &Expr) -> Result> { Expr::Wildcard { .. } => Err(DataFusionError::Internal( "Wildcard expressions are not valid in a logical query plan".to_owned(), )), + Expr::QualifiedWildcard { .. } => Err(DataFusionError::Internal( + "QualifiedWildcard expressions are not valid in a logical query plan" + .to_owned(), + )), } } @@ -506,9 +511,13 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result { Ok(expr) } } - Expr::Wildcard { .. } => Err(DataFusionError::Internal( + Expr::Wildcard => Err(DataFusionError::Internal( "Wildcard expressions are not valid in a logical query plan".to_owned(), )), + Expr::QualifiedWildcard { .. } => Err(DataFusionError::Internal( + "QualifiedWildcard expressions are not valid in a logical query plan" + .to_owned(), + )), Expr::GetIndexedField { expr: _, key } => Ok(Expr::GetIndexedField { expr: Box::new(expressions[0].clone()), key: key.clone(), diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index ee4494cf58bf..50a54fe65aa5 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -206,6 +206,9 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result { Expr::Wildcard => Err(DataFusionError::Internal( "Create physical name does not support wildcard".to_string(), )), + Expr::QualifiedWildcard { .. } => Err(DataFusionError::Internal( + "Create physical name does not support qualified wildcard".to_string(), + )), } } diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index 8f0b87329322..886052156305 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -28,10 +28,11 @@ use crate::datasource::TableProvider; use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits}; use crate::logical_plan::Expr::Alias; use crate::logical_plan::{ - and, builder::expand_wildcard, col, lit, normalize_col, union_with_alias, Column, - CreateCatalogSchema, CreateExternalTable as PlanCreateExternalTable, - CreateMemoryTable, DFSchema, DFSchemaRef, DropTable, Expr, LogicalPlan, - LogicalPlanBuilder, Operator, PlanType, ToDFSchema, ToStringifiedPlan, + and, builder::expand_qualified_wildcard, builder::expand_wildcard, col, lit, + normalize_col, union_with_alias, Column, CreateCatalogSchema, + CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable, DFSchema, + DFSchemaRef, DropTable, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, + ToDFSchema, ToStringifiedPlan, }; use crate::optimizer::utils::exprlist_to_columns; use crate::prelude::JoinType; @@ -1010,6 +1011,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } expand_wildcard(input_schema, plan)? } + Expr::QualifiedWildcard { ref qualifier } => { + expand_qualified_wildcard(qualifier, input_schema, plan)? + } _ => vec![normalize_col(expr, plan)?], }) }) @@ -1210,9 +1214,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { normalize_ident(alias), )), SelectItem::Wildcard => Ok(Expr::Wildcard), - SelectItem::QualifiedWildcard(_) => Err(DataFusionError::NotImplemented( - "Qualified wildcards are not supported".to_string(), - )), + SelectItem::QualifiedWildcard(ref object_name) => { + let qualifier = format!("{}", object_name); + Ok(Expr::QualifiedWildcard { qualifier }) + } } } diff --git a/datafusion/src/sql/utils.rs b/datafusion/src/sql/utils.rs index 33c94dded65f..c970e9c67a12 100644 --- a/datafusion/src/sql/utils.rs +++ b/datafusion/src/sql/utils.rs @@ -372,6 +372,7 @@ where Ok(expr.clone()) } Expr::Wildcard => Ok(Expr::Wildcard), + Expr::QualifiedWildcard { .. } => Ok(expr.clone()), Expr::GetIndexedField { expr, key } => Ok(Expr::GetIndexedField { expr: Box::new(clone_with_replacement(expr.as_ref(), replacement_fn)?), key: key.clone(), diff --git a/datafusion/tests/sql/mod.rs b/datafusion/tests/sql/mod.rs index 05c5abd2d388..fa8f250ecec0 100644 --- a/datafusion/tests/sql/mod.rs +++ b/datafusion/tests/sql/mod.rs @@ -94,6 +94,7 @@ pub mod select; pub mod timestamp; pub mod udf; pub mod union; +pub mod wildcard; pub mod window; mod explain; diff --git a/datafusion/tests/sql/wildcard.rs b/datafusion/tests/sql/wildcard.rs new file mode 100644 index 000000000000..b312fe1212e5 --- /dev/null +++ b/datafusion/tests/sql/wildcard.rs @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::*; + +#[tokio::test] +async fn select_qualified_wildcard() -> Result<()> { + let mut ctx = SessionContext::new(); + register_aggregate_simple_csv(&mut ctx).await?; + + let sql = "SELECT agg.* FROM aggregate_simple as agg order by c1"; + let results = execute_to_batches(&ctx, sql).await; + + let expected = vec![ + "+---------+----------------+-------+", + "| c1 | c2 | c3 |", + "+---------+----------------+-------+", + "| 0.00001 | 0.000000000001 | true |", + "| 0.00002 | 0.000000000002 | false |", + "| 0.00002 | 0.000000000002 | false |", + "| 0.00003 | 0.000000000003 | true |", + "| 0.00003 | 0.000000000003 | true |", + "| 0.00003 | 0.000000000003 | true |", + "| 0.00004 | 0.000000000004 | false |", + "| 0.00004 | 0.000000000004 | false |", + "| 0.00004 | 0.000000000004 | false |", + "| 0.00004 | 0.000000000004 | false |", + "| 0.00005 | 0.000000000005 | true |", + "| 0.00005 | 0.000000000005 | true |", + "| 0.00005 | 0.000000000005 | true |", + "| 0.00005 | 0.000000000005 | true |", + "| 0.00005 | 0.000000000005 | true |", + "+---------+----------------+-------+", + ]; + + assert_batches_eq!(expected, &results); + + Ok(()) +} + +#[tokio::test] +async fn select_non_alias_qualified_wildcard() -> Result<()> { + let mut ctx = SessionContext::new(); + register_aggregate_simple_csv(&mut ctx).await?; + + let sql = "SELECT aggregate_simple.* FROM aggregate_simple order by c1"; + let results = execute_to_batches(&ctx, sql).await; + + let expected = vec![ + "+---------+----------------+-------+", + "| c1 | c2 | c3 |", + "+---------+----------------+-------+", + "| 0.00001 | 0.000000000001 | true |", + "| 0.00002 | 0.000000000002 | false |", + "| 0.00002 | 0.000000000002 | false |", + "| 0.00003 | 0.000000000003 | true |", + "| 0.00003 | 0.000000000003 | true |", + "| 0.00003 | 0.000000000003 | true |", + "| 0.00004 | 0.000000000004 | false |", + "| 0.00004 | 0.000000000004 | false |", + "| 0.00004 | 0.000000000004 | false |", + "| 0.00004 | 0.000000000004 | false |", + "| 0.00005 | 0.000000000005 | true |", + "| 0.00005 | 0.000000000005 | true |", + "| 0.00005 | 0.000000000005 | true |", + "| 0.00005 | 0.000000000005 | true |", + "| 0.00005 | 0.000000000005 | true |", + "+---------+----------------+-------+", + ]; + + assert_batches_eq!(expected, &results); + + Ok(()) +} + +#[tokio::test] +async fn select_qualified_wildcard_join() -> Result<()> { + let ctx = create_join_context("t1_id", "t2_id")?; + let sql = + "SELECT tb1.*, tb2.* FROM t1 tb1 JOIN t2 tb2 ON t2_id = t1_id ORDER BY t1_id"; + let expected = vec![ + "+-------+---------+-------+---------+", + "| t1_id | t1_name | t2_id | t2_name |", + "+-------+---------+-------+---------+", + "| 11 | a | 11 | z |", + "| 22 | b | 22 | y |", + "| 44 | d | 44 | x |", + "+-------+---------+-------+---------+", + ]; + + let results = execute_to_batches(&ctx, sql).await; + + assert_batches_eq!(expected, &results); + + Ok(()) +} + +#[tokio::test] +async fn select_non_alias_qualified_wildcard_join() -> Result<()> { + let ctx = create_join_context("t1_id", "t2_id")?; + let sql = "SELECT t1.*, tb2.* FROM t1 JOIN t2 tb2 ON t2_id = t1_id ORDER BY t1_id"; + let expected = vec![ + "+-------+---------+-------+---------+", + "| t1_id | t1_name | t2_id | t2_name |", + "+-------+---------+-------+---------+", + "| 11 | a | 11 | z |", + "| 22 | b | 22 | y |", + "| 44 | d | 44 | x |", + "+-------+---------+-------+---------+", + ]; + + let results = execute_to_batches(&ctx, sql).await; + + assert_batches_eq!(expected, &results); + + Ok(()) +} + +#[tokio::test] +async fn select_wrong_qualified_wildcard() -> Result<()> { + let mut ctx = SessionContext::new(); + register_aggregate_simple_csv(&mut ctx).await?; + + let sql = "SELECT agg.* FROM aggregate_simple order by c1"; + let result = ctx.create_logical_plan(sql); + match result { + Ok(_) => panic!("unexpected OK"), + Err(err) => assert_eq!( + err.to_string(), + "Error during planning: Invalid qualifier agg" + ), + }; + + Ok(()) +}