From fffe9a06c08d150a9dddd56a0a7e7f11390ee8cb Mon Sep 17 00:00:00 2001 From: doki Date: Mon, 14 Mar 2022 11:57:44 +0800 Subject: [PATCH 01/10] add Expr::QualifiedWildcard --- datafusion-expr/src/expr.rs | 7 +++++++ datafusion/src/datasource/listing/helpers.rs | 3 ++- datafusion/src/logical_plan/expr_rewriter.rs | 3 +++ datafusion/src/logical_plan/expr_schema.rs | 8 ++++++++ datafusion/src/logical_plan/expr_visitor.rs | 3 ++- datafusion/src/optimizer/common_subexpr_eliminate.rs | 4 ++++ datafusion/src/optimizer/simplify_expressions.rs | 3 ++- datafusion/src/optimizer/utils.rs | 11 ++++++++++- datafusion/src/physical_plan/planner.rs | 3 +++ datafusion/src/sql/utils.rs | 1 + 10 files changed, 42 insertions(+), 4 deletions(-) diff --git a/datafusion-expr/src/expr.rs b/datafusion-expr/src/expr.rs index e3924d9bdb03..c5ae7ae3e9a4 100644 --- a/datafusion-expr/src/expr.rs +++ b/datafusion-expr/src/expr.rs @@ -228,6 +228,9 @@ pub enum Expr { }, /// Represents a reference to all fields in a schema. Wildcard, + QualifiedWildcard { + qualifier: String, + }, } /// Fixed seed for the hashing so that Ords are consistent across runs @@ -512,6 +515,7 @@ impl fmt::Debug for Expr { } } Expr::Wildcard => write!(f, "*"), + Expr::QualifiedWildcard { qualifier } => write!(f, "{}.*", qualifier), Expr::GetIndexedField { ref expr, key } => { write!(f, "({:?})[{}]", expr, key) } @@ -696,6 +700,9 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result { Expr::Wildcard => Err(DataFusionError::Internal( "Create name does not support wildcard".to_string(), )), + Expr::QualifiedWildcard { .. } => Err(DataFusionError::Internal( + "Create name does not support qualified wildcard".to_string(), + )), } } diff --git a/datafusion/src/datasource/listing/helpers.rs b/datafusion/src/datasource/listing/helpers.rs index 335d8275fcce..91c7f2357777 100644 --- a/datafusion/src/datasource/listing/helpers.rs +++ b/datafusion/src/datasource/listing/helpers.rs @@ -108,7 +108,8 @@ impl ExpressionVisitor for ApplicabilityVisitor<'_> { | Expr::AggregateFunction { .. } | Expr::Sort { .. } | Expr::WindowFunction { .. } - | Expr::Wildcard => { + | Expr::Wildcard + | Expr::QualifiedWildcard { .. } => { *self.is_applicable = false; Recursion::Stop(self) } diff --git a/datafusion/src/logical_plan/expr_rewriter.rs b/datafusion/src/logical_plan/expr_rewriter.rs index 2457939280ee..eb73f9c2360a 100644 --- a/datafusion/src/logical_plan/expr_rewriter.rs +++ b/datafusion/src/logical_plan/expr_rewriter.rs @@ -218,6 +218,9 @@ impl ExprRewritable for Expr { negated, }, Expr::Wildcard => Expr::Wildcard, + Expr::QualifiedWildcard { qualifier } => { + Expr::QualifiedWildcard { qualifier } + } Expr::GetIndexedField { expr, key } => Expr::GetIndexedField { expr: rewrite_boxed(expr, rewriter)?, key, diff --git a/datafusion/src/logical_plan/expr_schema.rs b/datafusion/src/logical_plan/expr_schema.rs index 347dcd3deec4..303bb6bafaae 100644 --- a/datafusion/src/logical_plan/expr_schema.rs +++ b/datafusion/src/logical_plan/expr_schema.rs @@ -116,6 +116,10 @@ impl ExprSchemable for Expr { Expr::Wildcard => Err(DataFusionError::Internal( "Wildcard expressions are not valid in a logical query plan".to_owned(), )), + Expr::QualifiedWildcard { .. } => Err(DataFusionError::Internal( + "QualifiedWildcard expressions are not valid in a logical query plan" + .to_owned(), + )), Expr::GetIndexedField { ref expr, key } => { let data_type = expr.get_type(schema)?; @@ -178,6 +182,10 @@ impl ExprSchemable for Expr { Expr::Wildcard => Err(DataFusionError::Internal( "Wildcard expressions are not valid in a logical query plan".to_owned(), )), + Expr::QualifiedWildcard { .. } => Err(DataFusionError::Internal( + "QualifiedWildcard expressions are not valid in a logical query plan" + .to_owned(), + )), Expr::GetIndexedField { ref expr, key } => { let data_type = expr.get_type(input_schema)?; get_indexed_field(&data_type, key).map(|x| x.is_nullable()) diff --git a/datafusion/src/logical_plan/expr_visitor.rs b/datafusion/src/logical_plan/expr_visitor.rs index 077caace40e5..49e910da0abc 100644 --- a/datafusion/src/logical_plan/expr_visitor.rs +++ b/datafusion/src/logical_plan/expr_visitor.rs @@ -106,7 +106,8 @@ impl ExprVisitable for Expr { Expr::Column(_) | Expr::ScalarVariable(_) | Expr::Literal(_) - | Expr::Wildcard => Ok(visitor), + | Expr::Wildcard + | Expr::QualifiedWildcard { .. } => Ok(visitor), Expr::BinaryExpr { left, right, .. } => { let visitor = left.accept(visitor)?; right.accept(visitor) diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs b/datafusion/src/optimizer/common_subexpr_eliminate.rs index 48b6abeae6f0..7cd2596f6bcc 100644 --- a/datafusion/src/optimizer/common_subexpr_eliminate.rs +++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs @@ -459,6 +459,10 @@ impl ExprIdentifierVisitor<'_> { Expr::Wildcard => { desc.push_str("Wildcard-"); } + Expr::QualifiedWildcard { qualifier } => { + desc.push_str("QualifiedWildcard-"); + desc.push_str(qualifier); + } Expr::GetIndexedField { key, .. } => { desc.push_str("GetIndexedField-"); desc.push_str(&key.to_string()); diff --git a/datafusion/src/optimizer/simplify_expressions.rs b/datafusion/src/optimizer/simplify_expressions.rs index 521d976d1513..81dfcf84b90e 100644 --- a/datafusion/src/optimizer/simplify_expressions.rs +++ b/datafusion/src/optimizer/simplify_expressions.rs @@ -377,7 +377,8 @@ impl<'a> ConstEvaluator<'a> { | Expr::Column(_) | Expr::WindowFunction { .. } | Expr::Sort { .. } - | Expr::Wildcard => false, + | Expr::Wildcard + | Expr::QualifiedWildcard { .. } => false, Expr::ScalarFunction { fun, .. } => Self::volatility_ok(fun.volatility()), Expr::ScalarUDF { fun, .. } => Self::volatility_ok(fun.signature.volatility), Expr::Literal(_) diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs index 41d1e4bca03b..42fd80e5079b 100644 --- a/datafusion/src/optimizer/utils.rs +++ b/datafusion/src/optimizer/utils.rs @@ -84,6 +84,7 @@ impl ExpressionVisitor for ColumnNameVisitor<'_> { | Expr::AggregateUDF { .. } | Expr::InList { .. } | Expr::Wildcard + | Expr::QualifiedWildcard { .. } | Expr::GetIndexedField { .. } => {} } Ok(Recursion::Continue(self)) @@ -349,6 +350,10 @@ pub fn expr_sub_expressions(expr: &Expr) -> Result> { Expr::Wildcard { .. } => Err(DataFusionError::Internal( "Wildcard expressions are not valid in a logical query plan".to_owned(), )), + Expr::QualifiedWildcard { .. } => Err(DataFusionError::Internal( + "QualifiedWildcard expressions are not valid in a logical query plan" + .to_owned(), + )), } } @@ -505,9 +510,13 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result { Ok(expr) } } - Expr::Wildcard { .. } => Err(DataFusionError::Internal( + Expr::Wildcard => Err(DataFusionError::Internal( "Wildcard expressions are not valid in a logical query plan".to_owned(), )), + Expr::QualifiedWildcard { .. } => Err(DataFusionError::Internal( + "QualifiedWildcard expressions are not valid in a logical query plan" + .to_owned(), + )), Expr::GetIndexedField { expr: _, key } => Ok(Expr::GetIndexedField { expr: Box::new(expressions[0].clone()), key: key.clone(), diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 4055b1488422..5f17fd8618e8 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -206,6 +206,9 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result { Expr::Wildcard => Err(DataFusionError::Internal( "Create physical name does not support wildcard".to_string(), )), + Expr::QualifiedWildcard { .. } => Err(DataFusionError::Internal( + "Create physical name does not support qualified wildcard".to_string(), + )), } } diff --git a/datafusion/src/sql/utils.rs b/datafusion/src/sql/utils.rs index 8ec0a49a2fd6..00d04d24e229 100644 --- a/datafusion/src/sql/utils.rs +++ b/datafusion/src/sql/utils.rs @@ -372,6 +372,7 @@ where Ok(expr.clone()) } Expr::Wildcard => Ok(Expr::Wildcard), + Expr::QualifiedWildcard { .. } => Ok(expr.clone()), Expr::GetIndexedField { expr, key } => Ok(Expr::GetIndexedField { expr: Box::new(clone_with_replacement(expr.as_ref(), replacement_fn)?), key: key.clone(), From 1b23fce207c88710534c396df1e5dd7951af006b Mon Sep 17 00:00:00 2001 From: doki Date: Mon, 14 Mar 2022 21:27:44 +0800 Subject: [PATCH 02/10] qualified wildcard tests --- datafusion-common/src/dfschema.rs | 10 +++++ datafusion/src/logical_plan/builder.rs | 15 +++++++ datafusion/src/sql/planner.rs | 15 +++++-- datafusion/tests/sql/mod.rs | 1 + datafusion/tests/sql/wildcard.rs | 58 ++++++++++++++++++++++++++ 5 files changed, 95 insertions(+), 4 deletions(-) create mode 100644 datafusion/tests/sql/wildcard.rs diff --git a/datafusion-common/src/dfschema.rs b/datafusion-common/src/dfschema.rs index 6a3dcb050e2d..6399c33a7f36 100644 --- a/datafusion-common/src/dfschema.rs +++ b/datafusion-common/src/dfschema.rs @@ -222,6 +222,16 @@ impl DFSchema { } } + pub fn fields_with_qualified(&self, qualifier: &String) -> Vec<&DFField> { + self.fields + .iter() + .filter(|field| { + field.qualifier().is_some() + && field.qualifier.as_ref().unwrap().eq(qualifier) + }) + .collect() + } + /// Find all fields match the given name pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&DFField> { self.fields diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index d0bfb5c1f5e0..ad855a4f305f 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -1090,6 +1090,21 @@ pub(crate) fn expand_wildcard( } } +pub(crate) fn expand_qualified_wildcard( + qualifier: &String, + schema: &DFSchema, + plan: &LogicalPlan, +) -> Result> { + let qualified_fields = schema + .fields_with_qualified(qualifier) + .into_iter() + .map(|f| f.clone()) + .collect(); + let qualifier_schema = + DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone())?; + expand_wildcard(&qualifier_schema, plan) +} + #[cfg(test)] mod tests { use arrow::datatypes::{DataType, Field}; diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index b528e56112f3..6ca277078d5e 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -28,7 +28,8 @@ use crate::datasource::TableProvider; use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits}; use crate::logical_plan::Expr::Alias; use crate::logical_plan::{ - and, builder::expand_wildcard, col, lit, normalize_col, union_with_alias, Column, + and, builder::expand_qualified_wildcard, builder::expand_wildcard, col, lit, + normalize_col, union_with_alias, Column, CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable, DFSchema, DFSchemaRef, DropTable, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, ToDFSchema, ToStringifiedPlan, @@ -1002,6 +1003,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } expand_wildcard(input_schema, plan)? } + Expr::QualifiedWildcard { ref qualifier } => { + expand_qualified_wildcard(qualifier, input_schema, plan)? + } _ => vec![normalize_col(expr, plan)?], }) }) @@ -1202,9 +1206,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { normalize_ident(alias), )), SelectItem::Wildcard => Ok(Expr::Wildcard), - SelectItem::QualifiedWildcard(_) => Err(DataFusionError::NotImplemented( - "Qualified wildcards are not supported".to_string(), - )), + SelectItem::QualifiedWildcard(ref object_name) => { + let table_ref: TableReference = object_name.try_into()?; + Ok(Expr::QualifiedWildcard { + qualifier: table_ref.table().to_string(), + }) + } } } diff --git a/datafusion/tests/sql/mod.rs b/datafusion/tests/sql/mod.rs index a548d619d635..94b15ea749d9 100644 --- a/datafusion/tests/sql/mod.rs +++ b/datafusion/tests/sql/mod.rs @@ -94,6 +94,7 @@ pub mod select; pub mod timestamp; pub mod udf; pub mod union; +pub mod wildcard; pub mod window; mod explain; diff --git a/datafusion/tests/sql/wildcard.rs b/datafusion/tests/sql/wildcard.rs new file mode 100644 index 000000000000..ebf92f928d54 --- /dev/null +++ b/datafusion/tests/sql/wildcard.rs @@ -0,0 +1,58 @@ +use super::*; + +#[tokio::test] +async fn select_qualified_wildcard() -> Result<()> { + let mut ctx = ExecutionContext::new(); + register_aggregate_simple_csv(&mut ctx).await?; + + let sql = "SELECT agg.* FROM aggregate_simple as agg order by c1"; + let results = execute_to_batches(&mut ctx, sql).await; + + let expected = vec![ + "+---------+----------------+-------+", + "| c1 | c2 | c3 |", + "+---------+----------------+-------+", + "| 0.00001 | 0.000000000001 | true |", + "| 0.00002 | 0.000000000002 | false |", + "| 0.00002 | 0.000000000002 | false |", + "| 0.00003 | 0.000000000003 | true |", + "| 0.00003 | 0.000000000003 | true |", + "| 0.00003 | 0.000000000003 | true |", + "| 0.00004 | 0.000000000004 | false |", + "| 0.00004 | 0.000000000004 | false |", + "| 0.00004 | 0.000000000004 | false |", + "| 0.00004 | 0.000000000004 | false |", + "| 0.00005 | 0.000000000005 | true |", + "| 0.00005 | 0.000000000005 | true |", + "| 0.00005 | 0.000000000005 | true |", + "| 0.00005 | 0.000000000005 | true |", + "| 0.00005 | 0.000000000005 | true |", + "+---------+----------------+-------+", + ]; + + assert_batches_eq!(expected, &results); + + Ok(()) +} + +#[tokio::test] +async fn select_qualified_wildcard_join() -> Result<()> { + let mut ctx = create_join_context("t1_id", "t2_id")?; + let sql = + "SELECT tb1.*, tb2.* FROM t1 tb1 JOIN t2 tb2 ON t2_id = t1_id ORDER BY t1_id"; + let expected = vec![ + "+-------+---------+-------+---------+", + "| t1_id | t1_name | t2_id | t2_name |", + "+-------+---------+-------+---------+", + "| 11 | a | 11 | z |", + "| 22 | b | 22 | y |", + "| 44 | d | 44 | x |", + "+-------+---------+-------+---------+", + ]; + + let results = execute_to_batches(&mut ctx, sql).await; + + assert_batches_eq!(expected, &results); + + Ok(()) +} From 1cf9093ff371220fb957cb2c51d9d46f64252f87 Mon Sep 17 00:00:00 2001 From: doki Date: Mon, 14 Mar 2022 21:49:51 +0800 Subject: [PATCH 03/10] add license for wildcard.rs --- datafusion/tests/sql/wildcard.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/datafusion/tests/sql/wildcard.rs b/datafusion/tests/sql/wildcard.rs index ebf92f928d54..0bae65d2bdfc 100644 --- a/datafusion/tests/sql/wildcard.rs +++ b/datafusion/tests/sql/wildcard.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use super::*; #[tokio::test] From e4daa93ef750d9bf07e20156983b9873b22e8d44 Mon Sep 17 00:00:00 2001 From: doki Date: Mon, 14 Mar 2022 23:22:29 +0800 Subject: [PATCH 04/10] clippy --- datafusion-common/src/dfschema.rs | 2 +- datafusion/src/logical_plan/builder.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion-common/src/dfschema.rs b/datafusion-common/src/dfschema.rs index 6399c33a7f36..8fc211651a88 100644 --- a/datafusion-common/src/dfschema.rs +++ b/datafusion-common/src/dfschema.rs @@ -222,7 +222,7 @@ impl DFSchema { } } - pub fn fields_with_qualified(&self, qualifier: &String) -> Vec<&DFField> { + pub fn fields_with_qualified(&self, qualifier: &str) -> Vec<&DFField> { self.fields .iter() .filter(|field| { diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index ad855a4f305f..8b07895469b3 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -1091,14 +1091,14 @@ pub(crate) fn expand_wildcard( } pub(crate) fn expand_qualified_wildcard( - qualifier: &String, + qualifier: &str, schema: &DFSchema, plan: &LogicalPlan, ) -> Result> { let qualified_fields = schema .fields_with_qualified(qualifier) .into_iter() - .map(|f| f.clone()) + .cloned() .collect(); let qualifier_schema = DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone())?; From 80d0483af5a37af44d1f0067ce8454b2d7c379ee Mon Sep 17 00:00:00 2001 From: doki Date: Tue, 15 Mar 2022 10:52:55 +0800 Subject: [PATCH 05/10] fix bug --- datafusion-expr/src/expr.rs | 17 ++++++- datafusion/src/logical_plan/builder.rs | 23 ++++++---- datafusion/src/logical_plan/expr_rewriter.rs | 12 +++-- .../src/optimizer/common_subexpr_eliminate.rs | 14 +++++- datafusion/src/sql/planner.rs | 44 +++++++++++++++++-- datafusion/tests/sql/wildcard.rs | 22 ++++++++++ 6 files changed, 113 insertions(+), 19 deletions(-) diff --git a/datafusion-expr/src/expr.rs b/datafusion-expr/src/expr.rs index c5ae7ae3e9a4..1a302920e9bf 100644 --- a/datafusion-expr/src/expr.rs +++ b/datafusion-expr/src/expr.rs @@ -229,7 +229,9 @@ pub enum Expr { /// Represents a reference to all fields in a schema. Wildcard, QualifiedWildcard { - qualifier: String, + catalog: Option, + schema: Option, + table: String, }, } @@ -515,7 +517,18 @@ impl fmt::Debug for Expr { } } Expr::Wildcard => write!(f, "*"), - Expr::QualifiedWildcard { qualifier } => write!(f, "{}.*", qualifier), + Expr::QualifiedWildcard { + catalog, + schema, + table, + } => match (catalog, schema, table) { + (None, None, table) => write!(f, "{}.*", table), + (None, Some(schema), table) => write!(f, "{}.{}.*", schema, table), + (Some(catalog), Some(schema), table) => { + write!(f, "{}.{}.{}.*", catalog, schema, table) + } + _ => panic!("invalid qualifier!"), + }, Expr::GetIndexedField { ref expr, key } => { write!(f, "({:?})[{}]", expr, key) } diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 8b07895469b3..3f71911d8ef5 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -1091,18 +1091,23 @@ pub(crate) fn expand_wildcard( } pub(crate) fn expand_qualified_wildcard( - qualifier: &str, + table: &str, + table_provider: Option>, schema: &DFSchema, plan: &LogicalPlan, ) -> Result> { - let qualified_fields = schema - .fields_with_qualified(qualifier) - .into_iter() - .cloned() - .collect(); - let qualifier_schema = - DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone())?; - expand_wildcard(&qualifier_schema, plan) + if let Some(table_provider) = table_provider { + expand_wildcard(&table_provider.schema().to_dfschema()?, plan) + } else { + let qualified_fields = schema + .fields_with_qualified(table) + .into_iter() + .cloned() + .collect(); + let qualifier_schema = + DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone())?; + expand_wildcard(&qualifier_schema, plan) + } } #[cfg(test)] diff --git a/datafusion/src/logical_plan/expr_rewriter.rs b/datafusion/src/logical_plan/expr_rewriter.rs index eb73f9c2360a..6504d85f77e0 100644 --- a/datafusion/src/logical_plan/expr_rewriter.rs +++ b/datafusion/src/logical_plan/expr_rewriter.rs @@ -218,9 +218,15 @@ impl ExprRewritable for Expr { negated, }, Expr::Wildcard => Expr::Wildcard, - Expr::QualifiedWildcard { qualifier } => { - Expr::QualifiedWildcard { qualifier } - } + Expr::QualifiedWildcard { + catalog, + schema, + table, + } => Expr::QualifiedWildcard { + catalog, + schema, + table, + }, Expr::GetIndexedField { expr, key } => Expr::GetIndexedField { expr: rewrite_boxed(expr, rewriter)?, key, diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs b/datafusion/src/optimizer/common_subexpr_eliminate.rs index 7cd2596f6bcc..2b289592d752 100644 --- a/datafusion/src/optimizer/common_subexpr_eliminate.rs +++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs @@ -459,9 +459,19 @@ impl ExprIdentifierVisitor<'_> { Expr::Wildcard => { desc.push_str("Wildcard-"); } - Expr::QualifiedWildcard { qualifier } => { + Expr::QualifiedWildcard { + catalog, + schema, + table, + } => { desc.push_str("QualifiedWildcard-"); - desc.push_str(qualifier); + if let Some(catalog) = catalog { + desc.push_str(&format!("{}-", catalog.clone())); + } + if let Some(schema) = schema { + desc.push_str(&format!("{}-", schema.clone())); + } + desc.push_str(table); } Expr::GetIndexedField { key, .. } => { desc.push_str("GetIndexedField-"); diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index 6ca277078d5e..fc6d5a462183 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -1003,8 +1003,33 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } expand_wildcard(input_schema, plan)? } - Expr::QualifiedWildcard { ref qualifier } => { - expand_qualified_wildcard(qualifier, input_schema, plan)? + Expr::QualifiedWildcard { + catalog, + schema, + table, + } => { + let table_ref = match (&catalog, &schema, &table) { + (None, None, table) => TableReference::Bare { table }, + (None, Some(schema), table) => { + TableReference::Partial { schema, table } + } + (Some(catalog), Some(schema), table) => { + TableReference::Full { + catalog, + schema, + table, + } + } + _ => panic!("invalid qualifier!"), + }; + let table_provider = + self.schema_provider.get_table_provider(table_ref); + expand_qualified_wildcard( + &table, + table_provider, + input_schema, + plan, + )? } _ => vec![normalize_col(expr, plan)?], }) @@ -1208,8 +1233,21 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { SelectItem::Wildcard => Ok(Expr::Wildcard), SelectItem::QualifiedWildcard(ref object_name) => { let table_ref: TableReference = object_name.try_into()?; + let (catalog, schema, table) = match table_ref { + TableReference::Bare { table } => (None, None, table), + TableReference::Partial { schema, table } => { + (None, Some(schema.to_string()), table) + } + TableReference::Full { + catalog, + schema, + table, + } => (Some(catalog.to_string()), Some(schema.to_string()), table), + }; Ok(Expr::QualifiedWildcard { - qualifier: table_ref.table().to_string(), + catalog, + schema, + table: table.to_string(), }) } } diff --git a/datafusion/tests/sql/wildcard.rs b/datafusion/tests/sql/wildcard.rs index 0bae65d2bdfc..e62b790c47ee 100644 --- a/datafusion/tests/sql/wildcard.rs +++ b/datafusion/tests/sql/wildcard.rs @@ -73,3 +73,25 @@ async fn select_qualified_wildcard_join() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn select_non_alias_qualified_wildcard_join() -> Result<()> { + let mut ctx = create_join_context("t1_id", "t2_id")?; + let sql = + "SELECT t1.*, tb2.* FROM t1 tb1 JOIN t2 tb2 ON t2_id = t1_id ORDER BY t1_id"; + let expected = vec![ + "+-------+---------+-------+---------+", + "| t1_id | t1_name | t2_id | t2_name |", + "+-------+---------+-------+---------+", + "| 11 | a | 11 | z |", + "| 22 | b | 22 | y |", + "| 44 | d | 44 | x |", + "+-------+---------+-------+---------+", + ]; + + let results = execute_to_batches(&mut ctx, sql).await; + + assert_batches_eq!(expected, &results); + + Ok(()) +} From 20607ccbbfeb88a0bd369ecc6555591227e1b371 Mon Sep 17 00:00:00 2001 From: doki Date: Tue, 15 Mar 2022 20:38:56 +0800 Subject: [PATCH 06/10] code optimization --- datafusion/src/optimizer/common_subexpr_eliminate.rs | 4 ++-- datafusion/src/sql/planner.rs | 6 +++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs b/datafusion/src/optimizer/common_subexpr_eliminate.rs index 2b289592d752..6dd7dbe97cfd 100644 --- a/datafusion/src/optimizer/common_subexpr_eliminate.rs +++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs @@ -466,10 +466,10 @@ impl ExprIdentifierVisitor<'_> { } => { desc.push_str("QualifiedWildcard-"); if let Some(catalog) = catalog { - desc.push_str(&format!("{}-", catalog.clone())); + desc.push_str(&format!("{}.", catalog.clone())); } if let Some(schema) = schema { - desc.push_str(&format!("{}-", schema.clone())); + desc.push_str(&format!("{}.", schema.clone())); } desc.push_str(table); } diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index fc6d5a462183..7fb83eeca0d7 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -1020,7 +1020,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { table, } } - _ => panic!("invalid qualifier!"), + _ => { + return Err(DataFusionError::Plan( + "invalid qualified wildcard".to_string(), + )) + } }; let table_provider = self.schema_provider.get_table_provider(table_ref); From 706d97d3bb27944fb7d57b08ca23423636134ff1 Mon Sep 17 00:00:00 2001 From: doki Date: Thu, 17 Mar 2022 23:01:33 +0800 Subject: [PATCH 07/10] solve conflicts --- datafusion/src/sql/planner.rs | 2 +- datafusion/tests/sql/wildcard.rs | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index 6f9aa49443a5..4f6b7eb61143 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -29,7 +29,7 @@ use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits}; use crate::logical_plan::Expr::Alias; use crate::logical_plan::{ and, builder::expand_qualified_wildcard, builder::expand_wildcard, col, lit, - normalize_col, union_with_alias, Column, + normalize_col, union_with_alias, Column, CreateCatalogSchema, CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable, DFSchema, DFSchemaRef, DropTable, Expr, LogicalPlan, LogicalPlanBuilder, Operator, PlanType, ToDFSchema, ToStringifiedPlan, diff --git a/datafusion/tests/sql/wildcard.rs b/datafusion/tests/sql/wildcard.rs index e62b790c47ee..33630c1d1258 100644 --- a/datafusion/tests/sql/wildcard.rs +++ b/datafusion/tests/sql/wildcard.rs @@ -19,11 +19,11 @@ use super::*; #[tokio::test] async fn select_qualified_wildcard() -> Result<()> { - let mut ctx = ExecutionContext::new(); + let mut ctx = SessionContext::new(); register_aggregate_simple_csv(&mut ctx).await?; let sql = "SELECT agg.* FROM aggregate_simple as agg order by c1"; - let results = execute_to_batches(&mut ctx, sql).await; + let results = execute_to_batches(&ctx, sql).await; let expected = vec![ "+---------+----------------+-------+", @@ -54,7 +54,7 @@ async fn select_qualified_wildcard() -> Result<()> { #[tokio::test] async fn select_qualified_wildcard_join() -> Result<()> { - let mut ctx = create_join_context("t1_id", "t2_id")?; + let ctx = create_join_context("t1_id", "t2_id")?; let sql = "SELECT tb1.*, tb2.* FROM t1 tb1 JOIN t2 tb2 ON t2_id = t1_id ORDER BY t1_id"; let expected = vec![ @@ -67,7 +67,7 @@ async fn select_qualified_wildcard_join() -> Result<()> { "+-------+---------+-------+---------+", ]; - let results = execute_to_batches(&mut ctx, sql).await; + let results = execute_to_batches(&ctx, sql).await; assert_batches_eq!(expected, &results); @@ -76,7 +76,7 @@ async fn select_qualified_wildcard_join() -> Result<()> { #[tokio::test] async fn select_non_alias_qualified_wildcard_join() -> Result<()> { - let mut ctx = create_join_context("t1_id", "t2_id")?; + let ctx = create_join_context("t1_id", "t2_id")?; let sql = "SELECT t1.*, tb2.* FROM t1 tb1 JOIN t2 tb2 ON t2_id = t1_id ORDER BY t1_id"; let expected = vec![ @@ -89,7 +89,7 @@ async fn select_non_alias_qualified_wildcard_join() -> Result<()> { "+-------+---------+-------+---------+", ]; - let results = execute_to_batches(&mut ctx, sql).await; + let results = execute_to_batches(&ctx, sql).await; assert_batches_eq!(expected, &results); From 26c6b914c3489f56472a1237f8037665d2b8814b Mon Sep 17 00:00:00 2001 From: doki Date: Thu, 17 Mar 2022 23:20:50 +0800 Subject: [PATCH 08/10] change def of QualifiedWildcard --- datafusion-expr/src/expr.rs | 17 +------ datafusion/src/logical_plan/builder.rs | 5 +- datafusion/src/logical_plan/expr_rewriter.rs | 12 ++--- .../src/optimizer/common_subexpr_eliminate.rs | 14 +----- datafusion/src/sql/planner.rs | 46 ++----------------- 5 files changed, 15 insertions(+), 79 deletions(-) diff --git a/datafusion-expr/src/expr.rs b/datafusion-expr/src/expr.rs index 440c47d9bb9e..3856503680d2 100644 --- a/datafusion-expr/src/expr.rs +++ b/datafusion-expr/src/expr.rs @@ -229,9 +229,7 @@ pub enum Expr { /// Represents a reference to all fields in a schema. Wildcard, QualifiedWildcard { - catalog: Option, - schema: Option, - table: String, + qualifier: String, }, } @@ -517,18 +515,7 @@ impl fmt::Debug for Expr { } } Expr::Wildcard => write!(f, "*"), - Expr::QualifiedWildcard { - catalog, - schema, - table, - } => match (catalog, schema, table) { - (None, None, table) => write!(f, "{}.*", table), - (None, Some(schema), table) => write!(f, "{}.{}.*", schema, table), - (Some(catalog), Some(schema), table) => { - write!(f, "{}.{}.{}.*", catalog, schema, table) - } - _ => panic!("invalid qualifier!"), - }, + Expr::QualifiedWildcard { qualifier } => write!(f, "{}.*", qualifier), Expr::GetIndexedField { ref expr, key } => { write!(f, "({:?})[{}]", expr, key) } diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 3f71911d8ef5..4427493b0f3e 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -1091,7 +1091,7 @@ pub(crate) fn expand_wildcard( } pub(crate) fn expand_qualified_wildcard( - table: &str, + qualifier: &str, table_provider: Option>, schema: &DFSchema, plan: &LogicalPlan, @@ -1099,8 +1099,9 @@ pub(crate) fn expand_qualified_wildcard( if let Some(table_provider) = table_provider { expand_wildcard(&table_provider.schema().to_dfschema()?, plan) } else { + // if it doesnt exist in table_provider, it should be an alias let qualified_fields = schema - .fields_with_qualified(table) + .fields_with_qualified(qualifier) .into_iter() .cloned() .collect(); diff --git a/datafusion/src/logical_plan/expr_rewriter.rs b/datafusion/src/logical_plan/expr_rewriter.rs index 317fa668e4ed..f4bfc3914cd2 100644 --- a/datafusion/src/logical_plan/expr_rewriter.rs +++ b/datafusion/src/logical_plan/expr_rewriter.rs @@ -218,15 +218,9 @@ impl ExprRewritable for Expr { negated, }, Expr::Wildcard => Expr::Wildcard, - Expr::QualifiedWildcard { - catalog, - schema, - table, - } => Expr::QualifiedWildcard { - catalog, - schema, - table, - }, + Expr::QualifiedWildcard { qualifier } => { + Expr::QualifiedWildcard { qualifier } + } Expr::GetIndexedField { expr, key } => Expr::GetIndexedField { expr: rewrite_boxed(expr, rewriter)?, key, diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs b/datafusion/src/optimizer/common_subexpr_eliminate.rs index b9cbc50ad732..b2884ea969d5 100644 --- a/datafusion/src/optimizer/common_subexpr_eliminate.rs +++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs @@ -460,19 +460,9 @@ impl ExprIdentifierVisitor<'_> { Expr::Wildcard => { desc.push_str("Wildcard-"); } - Expr::QualifiedWildcard { - catalog, - schema, - table, - } => { + Expr::QualifiedWildcard { qualifier } => { desc.push_str("QualifiedWildcard-"); - if let Some(catalog) = catalog { - desc.push_str(&format!("{}.", catalog.clone())); - } - if let Some(schema) = schema { - desc.push_str(&format!("{}.", schema.clone())); - } - desc.push_str(table); + desc.push_str(qualifier); } Expr::GetIndexedField { key, .. } => { desc.push_str("GetIndexedField-"); diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index 4f6b7eb61143..7d29a1adb240 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -1011,33 +1011,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } expand_wildcard(input_schema, plan)? } - Expr::QualifiedWildcard { - catalog, - schema, - table, - } => { - let table_ref = match (&catalog, &schema, &table) { - (None, None, table) => TableReference::Bare { table }, - (None, Some(schema), table) => { - TableReference::Partial { schema, table } - } - (Some(catalog), Some(schema), table) => { - TableReference::Full { - catalog, - schema, - table, - } - } - _ => { - return Err(DataFusionError::Plan( - "invalid qualified wildcard".to_string(), - )) - } - }; + Expr::QualifiedWildcard { ref qualifier } => { + let table_ref = TableReference::from(qualifier.as_ref()); let table_provider = self.schema_provider.get_table_provider(table_ref); expand_qualified_wildcard( - &table, + qualifier, table_provider, input_schema, plan, @@ -1244,23 +1223,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { )), SelectItem::Wildcard => Ok(Expr::Wildcard), SelectItem::QualifiedWildcard(ref object_name) => { - let table_ref: TableReference = object_name.try_into()?; - let (catalog, schema, table) = match table_ref { - TableReference::Bare { table } => (None, None, table), - TableReference::Partial { schema, table } => { - (None, Some(schema.to_string()), table) - } - TableReference::Full { - catalog, - schema, - table, - } => (Some(catalog.to_string()), Some(schema.to_string()), table), - }; - Ok(Expr::QualifiedWildcard { - catalog, - schema, - table: table.to_string(), - }) + let qualifier = format!("{}", object_name); + Ok(Expr::QualifiedWildcard { qualifier }) } } } From b9a953897b2e87a09844a038c9681c205f5de140 Mon Sep 17 00:00:00 2001 From: doki Date: Sun, 20 Mar 2022 21:30:03 +0800 Subject: [PATCH 09/10] add annotation --- datafusion-expr/src/expr.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/datafusion-expr/src/expr.rs b/datafusion-expr/src/expr.rs index 3856503680d2..4bad6e31f39a 100644 --- a/datafusion-expr/src/expr.rs +++ b/datafusion-expr/src/expr.rs @@ -228,9 +228,8 @@ pub enum Expr { }, /// Represents a reference to all fields in a schema. Wildcard, - QualifiedWildcard { - qualifier: String, - }, + /// Represents a reference to all fields in a specific schema. + QualifiedWildcard { qualifier: String }, } /// Fixed seed for the hashing so that Ords are consistent across runs From a2dc969879c13d8b9b463bd434f06b59fb622d10 Mon Sep 17 00:00:00 2001 From: doki Date: Tue, 22 Mar 2022 21:34:35 +0800 Subject: [PATCH 10/10] optimize expand_qualified_wildcard add comments and tests --- datafusion-common/src/dfschema.rs | 6 +-- datafusion/src/logical_plan/builder.rs | 28 +++++++------ datafusion/src/sql/planner.rs | 10 +---- datafusion/tests/sql/wildcard.rs | 56 +++++++++++++++++++++++++- 4 files changed, 72 insertions(+), 28 deletions(-) diff --git a/datafusion-common/src/dfschema.rs b/datafusion-common/src/dfschema.rs index 8fc211651a88..89a8b8c3cecc 100644 --- a/datafusion-common/src/dfschema.rs +++ b/datafusion-common/src/dfschema.rs @@ -222,13 +222,11 @@ impl DFSchema { } } + /// Find all fields having the given qualifier pub fn fields_with_qualified(&self, qualifier: &str) -> Vec<&DFField> { self.fields .iter() - .filter(|field| { - field.qualifier().is_some() - && field.qualifier.as_ref().unwrap().eq(qualifier) - }) + .filter(|field| field.qualifier().map(|q| q.eq(qualifier)).unwrap_or(false)) .collect() } diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 4427493b0f3e..5400e9682a5b 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -1029,6 +1029,8 @@ pub fn project_with_alias( Expr::Wildcard => { projected_expr.extend(expand_wildcard(input_schema, &plan)?) } + Expr::QualifiedWildcard { ref qualifier } => projected_expr + .extend(expand_qualified_wildcard(qualifier, input_schema, &plan)?), _ => projected_expr .push(columnize_expr(normalize_col(e, &plan)?, input_schema)), } @@ -1092,23 +1094,23 @@ pub(crate) fn expand_wildcard( pub(crate) fn expand_qualified_wildcard( qualifier: &str, - table_provider: Option>, schema: &DFSchema, plan: &LogicalPlan, ) -> Result> { - if let Some(table_provider) = table_provider { - expand_wildcard(&table_provider.schema().to_dfschema()?, plan) - } else { - // if it doesnt exist in table_provider, it should be an alias - let qualified_fields = schema - .fields_with_qualified(qualifier) - .into_iter() - .cloned() - .collect(); - let qualifier_schema = - DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone())?; - expand_wildcard(&qualifier_schema, plan) + let qualified_fields: Vec = schema + .fields_with_qualified(qualifier) + .into_iter() + .cloned() + .collect(); + if qualified_fields.is_empty() { + return Err(DataFusionError::Plan(format!( + "Invalid qualifier {}", + qualifier + ))); } + let qualifier_schema = + DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone())?; + expand_wildcard(&qualifier_schema, plan) } #[cfg(test)] diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs index 4849eabc965c..886052156305 100644 --- a/datafusion/src/sql/planner.rs +++ b/datafusion/src/sql/planner.rs @@ -1012,15 +1012,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { expand_wildcard(input_schema, plan)? } Expr::QualifiedWildcard { ref qualifier } => { - let table_ref = TableReference::from(qualifier.as_ref()); - let table_provider = - self.schema_provider.get_table_provider(table_ref); - expand_qualified_wildcard( - qualifier, - table_provider, - input_schema, - plan, - )? + expand_qualified_wildcard(qualifier, input_schema, plan)? } _ => vec![normalize_col(expr, plan)?], }) diff --git a/datafusion/tests/sql/wildcard.rs b/datafusion/tests/sql/wildcard.rs index 33630c1d1258..b312fe1212e5 100644 --- a/datafusion/tests/sql/wildcard.rs +++ b/datafusion/tests/sql/wildcard.rs @@ -52,6 +52,41 @@ async fn select_qualified_wildcard() -> Result<()> { Ok(()) } +#[tokio::test] +async fn select_non_alias_qualified_wildcard() -> Result<()> { + let mut ctx = SessionContext::new(); + register_aggregate_simple_csv(&mut ctx).await?; + + let sql = "SELECT aggregate_simple.* FROM aggregate_simple order by c1"; + let results = execute_to_batches(&ctx, sql).await; + + let expected = vec![ + "+---------+----------------+-------+", + "| c1 | c2 | c3 |", + "+---------+----------------+-------+", + "| 0.00001 | 0.000000000001 | true |", + "| 0.00002 | 0.000000000002 | false |", + "| 0.00002 | 0.000000000002 | false |", + "| 0.00003 | 0.000000000003 | true |", + "| 0.00003 | 0.000000000003 | true |", + "| 0.00003 | 0.000000000003 | true |", + "| 0.00004 | 0.000000000004 | false |", + "| 0.00004 | 0.000000000004 | false |", + "| 0.00004 | 0.000000000004 | false |", + "| 0.00004 | 0.000000000004 | false |", + "| 0.00005 | 0.000000000005 | true |", + "| 0.00005 | 0.000000000005 | true |", + "| 0.00005 | 0.000000000005 | true |", + "| 0.00005 | 0.000000000005 | true |", + "| 0.00005 | 0.000000000005 | true |", + "+---------+----------------+-------+", + ]; + + assert_batches_eq!(expected, &results); + + Ok(()) +} + #[tokio::test] async fn select_qualified_wildcard_join() -> Result<()> { let ctx = create_join_context("t1_id", "t2_id")?; @@ -77,8 +112,7 @@ async fn select_qualified_wildcard_join() -> Result<()> { #[tokio::test] async fn select_non_alias_qualified_wildcard_join() -> Result<()> { let ctx = create_join_context("t1_id", "t2_id")?; - let sql = - "SELECT t1.*, tb2.* FROM t1 tb1 JOIN t2 tb2 ON t2_id = t1_id ORDER BY t1_id"; + let sql = "SELECT t1.*, tb2.* FROM t1 JOIN t2 tb2 ON t2_id = t1_id ORDER BY t1_id"; let expected = vec![ "+-------+---------+-------+---------+", "| t1_id | t1_name | t2_id | t2_name |", @@ -95,3 +129,21 @@ async fn select_non_alias_qualified_wildcard_join() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn select_wrong_qualified_wildcard() -> Result<()> { + let mut ctx = SessionContext::new(); + register_aggregate_simple_csv(&mut ctx).await?; + + let sql = "SELECT agg.* FROM aggregate_simple order by c1"; + let result = ctx.create_logical_plan(sql); + match result { + Ok(_) => panic!("unexpected OK"), + Err(err) => assert_eq!( + err.to_string(), + "Error during planning: Invalid qualifier agg" + ), + }; + + Ok(()) +}