Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,15 +318,15 @@ pub fn expr_to_columns(expr: &Expr, accum: &mut HashSet<Column>) -> Result<()> {
/// Find excluded columns in the schema, if any
/// SELECT * EXCLUDE(col1, col2), would return `vec![col1, col2]`
fn get_excluded_columns(
opt_exclude: Option<ExcludeSelectItem>,
opt_except: Option<ExceptSelectItem>,
opt_exclude: Option<&ExcludeSelectItem>,
opt_except: Option<&ExceptSelectItem>,
schema: &DFSchema,
qualifier: &Option<TableReference>,
) -> Result<Vec<Column>> {
let mut idents = vec![];
if let Some(excepts) = opt_except {
idents.push(excepts.first_element);
idents.extend(excepts.additional_elements);
idents.push(&excepts.first_element);
idents.extend(&excepts.additional_elements);
}
if let Some(exclude) = opt_exclude {
match exclude {
Expand Down Expand Up @@ -387,7 +387,7 @@ fn get_exprs_except_skipped(
pub fn expand_wildcard(
schema: &DFSchema,
plan: &LogicalPlan,
wildcard_options: Option<WildcardAdditionalOptions>,
wildcard_options: Option<&WildcardAdditionalOptions>,
) -> Result<Vec<Expr>> {
let using_columns = plan.using_columns()?;
let mut columns_to_skip = using_columns
Expand Down Expand Up @@ -417,7 +417,7 @@ pub fn expand_wildcard(
..
}) = wildcard_options
{
get_excluded_columns(opt_exclude, opt_except, schema, &None)?
get_excluded_columns(opt_exclude.as_ref(), opt_except.as_ref(), schema, &None)?
} else {
vec![]
};
Expand All @@ -430,7 +430,7 @@ pub fn expand_wildcard(
pub fn expand_qualified_wildcard(
qualifier: &str,
schema: &DFSchema,
wildcard_options: Option<WildcardAdditionalOptions>,
wildcard_options: Option<&WildcardAdditionalOptions>,
) -> Result<Vec<Expr>> {
let qualifier = TableReference::from(qualifier);
let qualified_fields: Vec<DFField> = schema
Expand All @@ -451,7 +451,12 @@ pub fn expand_qualified_wildcard(
..
}) = wildcard_options
{
get_excluded_columns(opt_exclude, opt_except, schema, &Some(qualifier))?
get_excluded_columns(
opt_exclude.as_ref(),
opt_except.as_ref(),
schema,
&Some(qualifier),
)?
} else {
vec![]
};
Expand Down
84 changes: 76 additions & 8 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::utils::{
resolve_columns, resolve_positions_to_exprs,
};

use datafusion_common::Column;
use datafusion_common::{
get_target_functional_dependencies, not_impl_err, plan_err, DFSchemaRef,
DataFusionError, Result,
Expand All @@ -40,7 +41,9 @@ use datafusion_expr::utils::{
use datafusion_expr::{
Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning,
};
use sqlparser::ast::{Distinct, Expr as SQLExpr, WildcardAdditionalOptions, WindowType};
use sqlparser::ast::{
Distinct, Expr as SQLExpr, ReplaceSelectItem, WildcardAdditionalOptions, WindowType,
};
use sqlparser::ast::{NamedWindowDefinition, Select, SelectItem, TableWithJoins};

impl<'a, S: ContextProvider> SqlToRel<'a, S> {
Expand Down Expand Up @@ -359,17 +362,44 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
return plan_err!("SELECT * with no tables specified is not valid");
}
// do not expand from outer schema
expand_wildcard(plan.schema().as_ref(), plan, Some(options))
let expanded_exprs =
expand_wildcard(plan.schema().as_ref(), plan, Some(&options))?;
// If there is a REPLACE statement, replace that column with the given
// replace expression. Column name remains the same.
if let Some(replace) = options.opt_replace {
self.replace_columns(
plan,
empty_from,
planner_context,
expanded_exprs,
replace,
)
} else {
Ok(expanded_exprs)
}
}
SelectItem::QualifiedWildcard(ref object_name, options) => {
Self::check_wildcard_options(&options)?;
let qualifier = format!("{object_name}");
// do not expand from outer schema
expand_qualified_wildcard(
let expanded_exprs = expand_qualified_wildcard(
&qualifier,
plan.schema().as_ref(),
Some(options),
)
Some(&options),
)?;
// If there is a REPLACE statement, replace that column with the given
// replace expression. Column name remains the same.
if let Some(replace) = options.opt_replace {
self.replace_columns(
plan,
empty_from,
planner_context,
expanded_exprs,
replace,
)
} else {
Ok(expanded_exprs)
}
}
}
}
Expand All @@ -380,16 +410,54 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
opt_exclude: _opt_exclude,
opt_except: _opt_except,
opt_rename,
opt_replace,
opt_replace: _opt_replace,
} = options;

if opt_rename.is_some() || opt_replace.is_some() {
not_impl_err!("wildcard * with RENAME or REPLACE not supported ")
if opt_rename.is_some() {
Err(DataFusionError::NotImplemented(
"wildcard * with RENAME not supported ".to_string(),
))
} else {
Ok(())
}
}

/// If there is a REPLACE statement in the projected expression in the form of
/// "REPLACE (some_column_within_an_expr AS some_column)", this function replaces
/// that column with the given replace expression. Column name remains the same.
/// Multiple REPLACEs are also possible with comma separations.
fn replace_columns(
&self,
plan: &LogicalPlan,
empty_from: bool,
planner_context: &mut PlannerContext,
mut exprs: Vec<Expr>,
replace: ReplaceSelectItem,
) -> Result<Vec<Expr>> {
for expr in exprs.iter_mut() {
if let Expr::Column(Column { name, .. }) = expr {
if let Some(item) = replace
.items
.iter()
.find(|item| item.column_name.value == *name)
{
let new_expr = self.sql_select_to_rex(
SelectItem::UnnamedExpr(item.expr.clone()),
plan,
empty_from,
planner_context,
)?[0]
.clone();
*expr = Expr::Alias(Alias {
expr: Box::new(new_expr),
name: name.clone(),
});
}
}
}
Ok(exprs)
}

/// Wrap a plan in a projection
fn project(&self, input: LogicalPlan, expr: Vec<Expr>) -> Result<LogicalPlan> {
self.validate_schema_satisfies_exprs(input.schema(), &expr)?;
Expand Down
39 changes: 39 additions & 0 deletions datafusion/sqllogictest/test_files/functions.slt
Original file line number Diff line number Diff line change
Expand Up @@ -772,3 +772,42 @@ query I
SELECT strpos(arrow_cast('helloworld', 'Dictionary(Int32, Utf8)'), 'world')
----
6

statement ok
CREATE TABLE products (
product_id INT PRIMARY KEY,
product_name VARCHAR(100),
price DECIMAL(10, 2))

statement ok
INSERT INTO products (product_id, product_name, price) VALUES
(1, 'OldBrand Product 1', 19.99),
(2, 'OldBrand Product 2', 29.99),
(3, 'OldBrand Product 3', 39.99),
(4, 'OldBrand Product 4', 49.99)

query ITR
SELECT * REPLACE (price*2 AS price) FROM products
----
1 OldBrand Product 1 39.98
2 OldBrand Product 2 59.98
3 OldBrand Product 3 79.98
4 OldBrand Product 4 99.98

# types are conserved
query ITR
SELECT * REPLACE (product_id/2 AS product_id) FROM products
----
0 OldBrand Product 1 19.99
1 OldBrand Product 2 29.99
1 OldBrand Product 3 39.99
2 OldBrand Product 4 49.99

# multiple replace statements with qualified wildcard
query ITR
SELECT products.* REPLACE (price*2 AS price, product_id+1000 AS product_id) FROM products
----
1001 OldBrand Product 1 39.98
1002 OldBrand Product 2 59.98
1003 OldBrand Product 3 79.98
1004 OldBrand Product 4 99.98