diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 88fd9fa6b7ef..aae0b3a1b018 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -318,15 +318,15 @@ pub fn expr_to_columns(expr: &Expr, accum: &mut HashSet) -> Result<()> { /// Find excluded columns in the schema, if any /// SELECT * EXCLUDE(col1, col2), would return `vec![col1, col2]` fn get_excluded_columns( - opt_exclude: Option, - opt_except: Option, + opt_exclude: Option<&ExcludeSelectItem>, + opt_except: Option<&ExceptSelectItem>, schema: &DFSchema, qualifier: &Option, ) -> Result> { let mut idents = vec![]; if let Some(excepts) = opt_except { - idents.push(excepts.first_element); - idents.extend(excepts.additional_elements); + idents.push(&excepts.first_element); + idents.extend(&excepts.additional_elements); } if let Some(exclude) = opt_exclude { match exclude { @@ -387,7 +387,7 @@ fn get_exprs_except_skipped( pub fn expand_wildcard( schema: &DFSchema, plan: &LogicalPlan, - wildcard_options: Option, + wildcard_options: Option<&WildcardAdditionalOptions>, ) -> Result> { let using_columns = plan.using_columns()?; let mut columns_to_skip = using_columns @@ -417,7 +417,7 @@ pub fn expand_wildcard( .. }) = wildcard_options { - get_excluded_columns(opt_exclude, opt_except, schema, &None)? + get_excluded_columns(opt_exclude.as_ref(), opt_except.as_ref(), schema, &None)? } else { vec![] }; @@ -430,7 +430,7 @@ pub fn expand_wildcard( pub fn expand_qualified_wildcard( qualifier: &str, schema: &DFSchema, - wildcard_options: Option, + wildcard_options: Option<&WildcardAdditionalOptions>, ) -> Result> { let qualifier = TableReference::from(qualifier); let qualified_fields: Vec = schema @@ -451,7 +451,12 @@ pub fn expand_qualified_wildcard( .. }) = wildcard_options { - get_excluded_columns(opt_exclude, opt_except, schema, &Some(qualifier))? + get_excluded_columns( + opt_exclude.as_ref(), + opt_except.as_ref(), + schema, + &Some(qualifier), + )? } else { vec![] }; diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index e25fdc863b18..23605d037735 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -24,6 +24,7 @@ use crate::utils::{ resolve_columns, resolve_positions_to_exprs, }; +use datafusion_common::Column; use datafusion_common::{ get_target_functional_dependencies, not_impl_err, plan_err, DFSchemaRef, DataFusionError, Result, @@ -40,7 +41,9 @@ use datafusion_expr::utils::{ use datafusion_expr::{ Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning, }; -use sqlparser::ast::{Distinct, Expr as SQLExpr, WildcardAdditionalOptions, WindowType}; +use sqlparser::ast::{ + Distinct, Expr as SQLExpr, ReplaceSelectItem, WildcardAdditionalOptions, WindowType, +}; use sqlparser::ast::{NamedWindowDefinition, Select, SelectItem, TableWithJoins}; impl<'a, S: ContextProvider> SqlToRel<'a, S> { @@ -359,17 +362,44 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { return plan_err!("SELECT * with no tables specified is not valid"); } // do not expand from outer schema - expand_wildcard(plan.schema().as_ref(), plan, Some(options)) + let expanded_exprs = + expand_wildcard(plan.schema().as_ref(), plan, Some(&options))?; + // If there is a REPLACE statement, replace that column with the given + // replace expression. Column name remains the same. + if let Some(replace) = options.opt_replace { + self.replace_columns( + plan, + empty_from, + planner_context, + expanded_exprs, + replace, + ) + } else { + Ok(expanded_exprs) + } } SelectItem::QualifiedWildcard(ref object_name, options) => { Self::check_wildcard_options(&options)?; let qualifier = format!("{object_name}"); // do not expand from outer schema - expand_qualified_wildcard( + let expanded_exprs = expand_qualified_wildcard( &qualifier, plan.schema().as_ref(), - Some(options), - ) + Some(&options), + )?; + // If there is a REPLACE statement, replace that column with the given + // replace expression. Column name remains the same. + if let Some(replace) = options.opt_replace { + self.replace_columns( + plan, + empty_from, + planner_context, + expanded_exprs, + replace, + ) + } else { + Ok(expanded_exprs) + } } } } @@ -380,16 +410,54 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { opt_exclude: _opt_exclude, opt_except: _opt_except, opt_rename, - opt_replace, + opt_replace: _opt_replace, } = options; - if opt_rename.is_some() || opt_replace.is_some() { - not_impl_err!("wildcard * with RENAME or REPLACE not supported ") + if opt_rename.is_some() { + Err(DataFusionError::NotImplemented( + "wildcard * with RENAME not supported ".to_string(), + )) } else { Ok(()) } } + /// If there is a REPLACE statement in the projected expression in the form of + /// "REPLACE (some_column_within_an_expr AS some_column)", this function replaces + /// that column with the given replace expression. Column name remains the same. + /// Multiple REPLACEs are also possible with comma separations. + fn replace_columns( + &self, + plan: &LogicalPlan, + empty_from: bool, + planner_context: &mut PlannerContext, + mut exprs: Vec, + replace: ReplaceSelectItem, + ) -> Result> { + for expr in exprs.iter_mut() { + if let Expr::Column(Column { name, .. }) = expr { + if let Some(item) = replace + .items + .iter() + .find(|item| item.column_name.value == *name) + { + let new_expr = self.sql_select_to_rex( + SelectItem::UnnamedExpr(item.expr.clone()), + plan, + empty_from, + planner_context, + )?[0] + .clone(); + *expr = Expr::Alias(Alias { + expr: Box::new(new_expr), + name: name.clone(), + }); + } + } + } + Ok(exprs) + } + /// Wrap a plan in a projection fn project(&self, input: LogicalPlan, expr: Vec) -> Result { self.validate_schema_satisfies_exprs(input.schema(), &expr)?; diff --git a/datafusion/sqllogictest/test_files/functions.slt b/datafusion/sqllogictest/test_files/functions.slt index fd3a28dfe06d..ac9fc942b4a4 100644 --- a/datafusion/sqllogictest/test_files/functions.slt +++ b/datafusion/sqllogictest/test_files/functions.slt @@ -772,3 +772,42 @@ query I SELECT strpos(arrow_cast('helloworld', 'Dictionary(Int32, Utf8)'), 'world') ---- 6 + +statement ok +CREATE TABLE products ( +product_id INT PRIMARY KEY, +product_name VARCHAR(100), +price DECIMAL(10, 2)) + +statement ok +INSERT INTO products (product_id, product_name, price) VALUES +(1, 'OldBrand Product 1', 19.99), +(2, 'OldBrand Product 2', 29.99), +(3, 'OldBrand Product 3', 39.99), +(4, 'OldBrand Product 4', 49.99) + +query ITR +SELECT * REPLACE (price*2 AS price) FROM products +---- +1 OldBrand Product 1 39.98 +2 OldBrand Product 2 59.98 +3 OldBrand Product 3 79.98 +4 OldBrand Product 4 99.98 + +# types are conserved +query ITR +SELECT * REPLACE (product_id/2 AS product_id) FROM products +---- +0 OldBrand Product 1 19.99 +1 OldBrand Product 2 29.99 +1 OldBrand Product 3 39.99 +2 OldBrand Product 4 49.99 + +# multiple replace statements with qualified wildcard +query ITR +SELECT products.* REPLACE (price*2 AS price, product_id+1000 AS product_id) FROM products +---- +1001 OldBrand Product 1 39.98 +1002 OldBrand Product 2 59.98 +1003 OldBrand Product 3 79.98 +1004 OldBrand Product 4 99.98 \ No newline at end of file