Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/catalog/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub async fn fetch_table_providers(
let table_names = schema_provider.table_names();

let results: Vec<DataFusionResult<Option<(String, Arc<dyn TableProvider>)>>> =
stream::iter(table_names.into_iter())
stream::iter(table_names)
.map(|table_name| {
let schema_provider = Arc::clone(&schema_provider);
async move {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ fn merge_branches(mut branches: Vec<Branch>) -> Result<MergeResult> {
.join_on(right_plan, datafusion_common::JoinType::Inner, on_exprs)?
.build()?
};
name_map.extend(out.into_iter());
name_map.extend(out);
}
Ok((acc_plan, name_map))
}
Expand Down
31 changes: 14 additions & 17 deletions crates/executor/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ impl UserQuery {
return ex_error::OnlyPrimitiveStatementsSnafu.fail();
}

for (name, value) in names.into_iter().zip(value_list.into_iter()) {
for (name, value) in names.into_iter().zip(value_list) {
let session_value = if let SqlExpr::Value(ValueWithSpan { value: v, .. }) = value {
Ok(SessionProperty::from_value(
name.clone(),
Expand Down Expand Up @@ -775,15 +775,13 @@ impl UserQuery {
.fail();
}
}
ObjectType::Schema => {
if !if_exists && catalog.schema(&schema_name).is_none() {
return ex_error::SchemaNotFoundInDatabaseSnafu {
operation_on: OperationOn::Table(OperationType::Drop),
schema: schema_name,
db: catalog_name.to_string(),
}
.fail();
ObjectType::Schema if !if_exists && catalog.schema(&schema_name).is_none() => {
return ex_error::SchemaNotFoundInDatabaseSnafu {
operation_on: OperationOn::Table(OperationType::Drop),
schema: schema_name,
db: catalog_name.to_string(),
}
.fail();
}
_ => {}
}
Expand Down Expand Up @@ -3142,8 +3140,7 @@ pub fn merge_clause_projection<S: ContextProvider>(
.rows
.into_iter()
.next()
.ok_or_else(|| ex_error::MergeInsertOnlyOneRowSnafu.build())?
.into_iter(),
.ok_or_else(|| ex_error::MergeInsertOnlyOneRowSnafu.build())?,
) {
let column_name = column.value.clone();
let expr = sql_planner
Expand Down Expand Up @@ -3240,7 +3237,7 @@ fn collect_merge_clause_expressions(

let case_expr = match (updates, insert) {
(Some(updates), Some(inserts)) => {
let builder_opt = updates.into_iter().chain(inserts.into_iter()).fold(
let builder_opt = updates.into_iter().chain(inserts).fold(
None::<CaseBuilder>,
|acc, (w, t)| {
if let Some(mut acc) = acc {
Expand Down Expand Up @@ -3356,10 +3353,9 @@ async fn target_filter_expression(
.partition_fields(*current_snapshot.snapshot_id())
.map_err(IcebergError::from)
.context(ex_error::IcebergSnafu)?;
let expr = partition_fields
.iter()
.zip(partition_column_bounds.into_iter())
.fold(None, |acc, (column, [min, max])| {
let expr = partition_fields.iter().zip(partition_column_bounds).fold(
None,
|acc, (column, [min, max])| {
let column_expr = col(column.source_name());
let expr = and(
datafusion_expr::Expr::BinaryExpr(BinaryExpr::new(
Expand All @@ -3378,7 +3374,8 @@ async fn target_filter_expression(
} else {
Some(expr)
}
});
},
);
Ok(expr)
}

Expand Down
25 changes: 25 additions & 0 deletions crates/executor/src/tests/sql/dml/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,28 @@ test_query!(
QUALIFY ROW_NUMBER() OVER (PARTITION BY city ORDER BY retail_price) = 1;",
snapshot_path = "select"
);

// Regression test for issue #131: when a SELECT-list alias (`start_tstamp`)
// shadows an actual column of the FROM-clause CTE, references to that name
// inside other projection expressions must still resolve to the FROM-clause
// column (per ANSI SQL / Snowflake), not the alias. If the alias is inlined
// instead, the CASE predicate degenerates to `user_start_tstamp =
// user_start_tstamp` (always true) and the aggregate returns `S2` instead of
// the correct `S1`.
test_query!(
alias_shadows_column_in_aggregate_case,
"WITH s AS (
SELECT 'S1' AS sid,
TIMESTAMP '2020-01-01 00:00:00' AS start_tstamp,
TIMESTAMP '2020-01-01 00:00:00' AS user_start_tstamp
UNION ALL
SELECT 'S2' AS sid,
TIMESTAMP '2020-01-01 05:00:00' AS start_tstamp,
TIMESTAMP '2020-01-01 00:00:00' AS user_start_tstamp
)
SELECT user_start_tstamp AS start_tstamp,
MAX(CASE WHEN start_tstamp = user_start_tstamp THEN sid END) AS first_sid
FROM s
GROUP BY user_start_tstamp;",
snapshot_path = "select"
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
source: crates/executor/src/tests/sql/dml/select.rs
assertion_line: 39
description: "\"WITH s AS (\n SELECT 'S1' AS sid,\n TIMESTAMP '2020-01-01 00:00:00' AS start_tstamp,\n TIMESTAMP '2020-01-01 00:00:00' AS user_start_tstamp\n UNION ALL\n SELECT 'S2' AS sid,\n TIMESTAMP '2020-01-01 05:00:00' AS start_tstamp,\n TIMESTAMP '2020-01-01 00:00:00' AS user_start_tstamp\n )\n SELECT user_start_tstamp AS start_tstamp,\n MAX(CASE WHEN start_tstamp = user_start_tstamp THEN sid END) AS first_sid\n FROM s\n GROUP BY user_start_tstamp;\""
---
Ok(
[
"+---------------------+-----------+",
"| start_tstamp | first_sid |",
"+---------------------+-----------+",
"| 2020-01-01T00:00:00 | S1 |",
"+---------------------+-----------+",
],
)
2 changes: 1 addition & 1 deletion crates/functions/src/conversion/to_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl ScalarUDFImpl for ToArrayFunc {
let values_array = if flat_values.is_empty() {
new_empty_array(&elem_type)
} else {
ScalarValue::iter_to_array(flat_values.into_iter())?
ScalarValue::iter_to_array(flat_values)?
};
let offset_buf = OffsetBuffer::new(offsets.into());

Expand Down
16 changes: 4 additions & 12 deletions crates/functions/src/string-binary/substr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,18 +226,10 @@ impl ScalarUDFImpl for SubstrFunc {
}

let first_data_type = match &arg_types[0] {
DataType::Dictionary(key_type, value_type) => {
if key_type.is_integer() && is_string_coercible(value_type) {
coerce_string_type(value_type)
} else {
return InvalidArgumentTypeSnafu {
function_name: self.name().to_string(),
position: position_name(0).to_string(),
expected_type: "a string or binary coercible type".to_string(),
actual_type: format!("{:?}", &arg_types[0]),
}
.fail()?;
}
DataType::Dictionary(key_type, value_type)
if key_type.is_integer() && is_string_coercible(value_type) =>
{
coerce_string_type(value_type)
}
data_type if is_string_coercible(data_type) => coerce_string_type(data_type),
data_type if is_binary_type(data_type) => coerce_binary_type(data_type),
Expand Down
18 changes: 18 additions & 0 deletions crates/functions/src/tests/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,24 @@ fn test_inline_aliases_in_query() -> DFResult<()> {
"SELECT regexp_replace(name, 'yes', '', 1, 1) AS name, regexp_replace(name, 'yes', '', 1, 1) AS test FROM (SELECT column1 AS name FROM (VALUES ('yesnotyes')))"),
("SELECT sum(jan_sales) AS jan_sales, sum(jan_sales / 1) AS jan_sales_per_sq_foot FROM (SELECT sum(CASE WHEN d_moy = 1 THEN ws_ext_sales_price * ws_quantity ELSE 0 END) AS jan_sales FROM web_sales, date_dim UNION ALL SELECT sum(CASE WHEN d_moy = 1 THEN cs_sales_price * cs_quantity ELSE 0 END) AS jan_sales FROM catalog_sales, date_dim)",
"SELECT sum(jan_sales) AS jan_sales, sum(jan_sales / 1) AS jan_sales_per_sq_foot FROM (SELECT sum(CASE WHEN d_moy = 1 THEN ws_ext_sales_price * ws_quantity ELSE 0 END) AS jan_sales FROM web_sales, date_dim UNION ALL SELECT sum(CASE WHEN d_moy = 1 THEN cs_sales_price * cs_quantity ELSE 0 END) AS jan_sales FROM catalog_sales, date_dim)"),
// Regression test for issue #131: a SELECT-list alias that shadows a
// FROM-clause column name must NOT be inlined into other projection
// expressions. Here `start_tstamp` is both a column of CTE `s` and
// the alias of the first projection; inlining would turn the CASE
// condition into the tautology `user_start_tstamp = user_start_tstamp`
// and silently produce the wrong result. Because FROM references a
// named relation (the CTE `s`) whose schema is invisible at the AST
// level, we conservatively skip projection-list alias inlining.
(
"WITH s AS (SELECT 'S1' AS sid, TIMESTAMP '2020-01-01 00:00:00' AS start_tstamp, TIMESTAMP '2020-01-01 00:00:00' AS user_start_tstamp UNION ALL SELECT 'S2' AS sid, TIMESTAMP '2020-01-01 05:00:00' AS start_tstamp, TIMESTAMP '2020-01-01 00:00:00' AS user_start_tstamp) SELECT user_start_tstamp AS start_tstamp, MAX(CASE WHEN start_tstamp = user_start_tstamp THEN sid END) AS first_sid FROM s GROUP BY user_start_tstamp",
"WITH s AS (SELECT 'S1' AS sid, TIMESTAMP '2020-01-01 00:00:00' AS start_tstamp, TIMESTAMP '2020-01-01 00:00:00' AS user_start_tstamp UNION ALL SELECT 'S2' AS sid, TIMESTAMP '2020-01-01 05:00:00' AS start_tstamp, TIMESTAMP '2020-01-01 00:00:00' AS user_start_tstamp) SELECT user_start_tstamp AS start_tstamp, MAX(CASE WHEN start_tstamp = user_start_tstamp THEN sid END) AS first_sid FROM s GROUP BY user_start_tstamp",
),
// Same shape but over a named table: projection-list aliases must not
// be inlined because a table column with the same name may exist.
(
"SELECT user_start_tstamp AS start_tstamp, MAX(CASE WHEN start_tstamp = user_start_tstamp THEN sid END) AS first_sid FROM sessions GROUP BY user_start_tstamp",
"SELECT user_start_tstamp AS start_tstamp, MAX(CASE WHEN start_tstamp = user_start_tstamp THEN sid END) AS first_sid FROM sessions GROUP BY user_start_tstamp",
),
];

for (input, expected) in cases {
Expand Down
57 changes: 48 additions & 9 deletions crates/functions/src/visitors/inline_aliases_in_query.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use datafusion::logical_expr::sqlparser::ast::{Expr, Function, SetOperator, VisitMut};
use datafusion::sql::sqlparser::ast::{
Query, SelectItem, SetExpr, Statement, TableFactor, VisitorMut, visit_expressions_mut,
Query, SelectItem, SetExpr, Statement, TableFactor, TableWithJoins, VisitorMut,
visit_expressions_mut,
};
use std::collections::{HashMap, HashSet};
use std::ops::ControlFlow;
Expand Down Expand Up @@ -60,22 +61,33 @@ impl VisitorMut for InlineAliasesInSelect {
}
}

// Per ANSI SQL (and Snowflake), a SELECT-list alias must NOT shadow a
// column of the FROM-clause relation when that same name is referenced
// inside another projection expression. If the FROM clause contains any
// non-derived relation (a named table, CTE, table function, etc.), we
// can't see its schema at the AST level, so we must assume any
// identifier could refer to one of its columns. Inlining projection
// aliases in that case can silently produce wrong results (issue #131).
let inline_in_projection = from_is_alias_inline_safe(&select.from);

for item in &mut select.projection {
match item {
SelectItem::ExprWithAlias { expr, alias } => {
//Don't substitute aliases for the same alias & subquery idents
substitute_aliases(
expr,
&alias_expr_map,
Some(&alias.value),
Some(&|e| contains_ident_value(&subquery_idents, e)),
);
if inline_in_projection {
//Don't substitute aliases for the same alias & subquery idents
substitute_aliases(
expr,
&alias_expr_map,
Some(&alias.value),
Some(&|e| contains_ident_value(&subquery_idents, e)),
);
}
//Don't add to a substitution map if the alias is the same as the subquery ident
if !subquery_idents.contains(&alias.value) {
alias_expr_map.insert(alias.value.clone(), expr.clone());
}
}
SelectItem::UnnamedExpr(expr) => {
SelectItem::UnnamedExpr(expr) if inline_in_projection => {
//Don't substitute subquery idents
substitute_aliases(
expr,
Expand Down Expand Up @@ -127,6 +139,33 @@ impl VisitorMut for InlineAliasesInSelect {
}
}

/// Returns `true` when every `TableFactor` in the FROM clause is a derived
/// subquery whose columns we've already collected into `subquery_idents` (or
/// when FROM is empty). In those cases it is safe to inline SELECT-list
/// aliases into other projection expressions, because any identifier we'd
/// substitute either can't refer to a real column (empty FROM) or is filtered
/// out by the `subquery_idents` check. As soon as FROM contains a named
/// table, CTE, or table function we can't see the schema of, bail out.
fn from_is_alias_inline_safe(from: &[TableWithJoins]) -> bool {
from.iter().all(|twj| {
factor_is_alias_inline_safe(&twj.relation)
&& twj
.joins
.iter()
.all(|j| factor_is_alias_inline_safe(&j.relation))
})
}

fn factor_is_alias_inline_safe(factor: &TableFactor) -> bool {
match factor {
TableFactor::Derived { .. } => true,
TableFactor::NestedJoin {
table_with_joins, ..
} => from_is_alias_inline_safe(std::slice::from_ref(table_with_joins)),
_ => false,
}
}

/// Substitute aliases inside arbitrary expressions, recursively
fn substitute_aliases(
expr: &mut Expr,
Expand Down