Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 54 additions & 3 deletions datafusion/sql/src/unparser/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,15 +428,16 @@ impl Default for TableWithJoinsBuilder {

#[derive(Clone)]
pub struct RelationBuilder {
relation: Option<TableFactorBuilder>,
pub relation: Option<TableFactorBuilder>,
}

#[derive(Clone)]
#[expect(clippy::large_enum_variant)]
enum TableFactorBuilder {
#[non_exhaustive]
pub enum TableFactorBuilder {
Table(TableRelationBuilder),
Derived(DerivedRelationBuilder),
Unnest(UnnestRelationBuilder),
TableFunction(TableFunctionRelationBuilder),
Empty,
}

Expand All @@ -458,6 +459,11 @@ impl RelationBuilder {
self
}

pub fn table_function(&mut self, value: TableFunctionRelationBuilder) -> &mut Self {
self.relation = Some(TableFactorBuilder::TableFunction(value));
self
}

pub fn empty(&mut self) -> &mut Self {
self.relation = Some(TableFactorBuilder::Empty);
self
Expand All @@ -474,6 +480,9 @@ impl RelationBuilder {
Some(TableFactorBuilder::Unnest(ref mut rel_builder)) => {
rel_builder.alias = value;
}
Some(TableFactorBuilder::TableFunction(ref mut rel_builder)) => {
rel_builder.alias = value;
}
Some(TableFactorBuilder::Empty) => (),
None => (),
}
Expand All @@ -484,6 +493,7 @@ impl RelationBuilder {
Some(TableFactorBuilder::Table(ref value)) => Some(value.build()?),
Some(TableFactorBuilder::Derived(ref value)) => Some(value.build()?),
Some(TableFactorBuilder::Unnest(ref value)) => Some(value.build()?),
Some(TableFactorBuilder::TableFunction(ref value)) => Some(value.build()?),
Some(TableFactorBuilder::Empty) => None,
None => return Err(Into::into(UninitializedFieldError::from("relation"))),
})
Expand Down Expand Up @@ -688,6 +698,47 @@ impl Default for UnnestRelationBuilder {
}
}

#[derive(Clone)]
pub struct TableFunctionRelationBuilder {
pub expr: Option<ast::Expr>,
pub alias: Option<ast::TableAlias>,
}

impl TableFunctionRelationBuilder {
pub fn expr(&mut self, value: ast::Expr) -> &mut Self {
self.expr = Some(value);
self
}

pub fn alias(&mut self, value: Option<ast::TableAlias>) -> &mut Self {
self.alias = value;
self
}

pub fn build(&self) -> Result<ast::TableFactor, BuilderError> {
Ok(ast::TableFactor::TableFunction {
expr: match self.expr {
Some(ref value) => value.clone(),
None => return Err(Into::into(UninitializedFieldError::from("expr"))),
},
alias: self.alias.clone(),
})
}

fn create_empty() -> Self {
Self {
expr: Default::default(),
alias: Default::default(),
}
}
}

impl Default for TableFunctionRelationBuilder {
fn default() -> Self {
Self::create_empty()
}
}

/// Runtime error when a `build()` method is called and one or more required fields
/// do not have a value.
#[derive(Debug, Clone)]
Expand Down
197 changes: 195 additions & 2 deletions datafusion/sql/src/unparser/dialect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@

use std::{collections::HashMap, sync::Arc};

use crate::unparser::ast::{
RelationBuilder, TableFactorBuilder, TableFunctionRelationBuilder,
};

use super::{
Unparser, utils::character_length_to_sql, utils::date_part_to_sql,
utils::sqlite_date_trunc_to_sql, utils::sqlite_from_unixtime_to_sql,
};
use arrow::array::timezone::Tz;
use arrow::datatypes::TimeUnit;
use chrono::DateTime;
use datafusion_common::Result;
use datafusion_expr::Expr;
use datafusion_common::{Result, plan_err};
use datafusion_expr::{Expr, LogicalPlan, Unnest};
use regex::Regex;
use sqlparser::ast::ValueWithSpan;
use sqlparser::tokenizer::Span;
use sqlparser::{
ast::{
Expand Down Expand Up @@ -200,6 +205,35 @@ pub trait Dialect: Send + Sync {
false
}

/// Allow the dialect implement to unparse the unnest plan as the dialect-specific flattened
/// array table factor.
///
/// Some dialects like Snowflake require FLATTEN function to unnest arrays in the FROM clause.
/// <https://docs.snowflake.com/en/sql-reference/functions/flatten#syntax>
fn unparse_unnest_table_factor(
&self,
_unnest: &Unnest,
_columns: &[Ident],
_unparser: &Unparser,
) -> Result<Option<TableFactorBuilder>> {
Ok(None)
}

/// Allows the dialect to override relation alias unparsing if the dialect has specific rules.
/// Returns true if the dialect has overridden the alias unparsing, false to use default unparsing.
///
/// This is useful for dialects that need to modify the alias based on specific conditions. For example,
/// in Snowflake, when using the FLATTEN function, the alias of the derived table needs to be adjusted
/// to match the output columns of the FLATTEN function. It can be used with [`Dialect::unparse_unnest_table_factor`] to achieve this.
/// See [`SnowflakeDialect`] implementation for an example.
fn relation_alias_overrides(
&self,
_relation_builder: &mut RelationBuilder,
_alias: Option<&ast::TableAlias>,
) -> bool {
false
}

/// Allows the dialect to override column alias unparsing if the dialect has specific rules.
/// Returns None if the default unparsing should be used, or Some(String) if there is
/// a custom implementation for the alias.
Expand Down Expand Up @@ -633,6 +667,165 @@ impl BigQueryDialect {
}
}

pub static UNNAMED_SNOWFLAKE_FLATTEN_SUBQUERY_PREFIX: &str = "__unnamed_flatten_subquery";
// Snowflake FLATTEN outputs 6 columns (0-indexed): SEQ=0, KEY=1, PATH=2, INDEX=3, VALUE=4, THIS=5.
// The VALUE column (index 4) holds the flattened element.
// https://docs.snowflake.com/en/sql-reference/functions/flatten#output
const FLATTEN_VALUE_COLUMN_IDX: usize = 4;

#[derive(Default)]
pub struct SnowflakeDialect {}

impl Dialect for SnowflakeDialect {
fn identifier_quote_style(&self, _: &str) -> Option<char> {
Some('"')
}

fn unnest_as_table_factor(&self) -> bool {
true
}

fn unparse_unnest_table_factor(
&self,
unnest: &Unnest,
columns: &[Ident],
unparser: &Unparser,
) -> Result<Option<TableFactorBuilder>> {
let LogicalPlan::Projection(projection) = unnest.input.as_ref() else {
return Ok(None);
};

if !matches!(projection.input.as_ref(), LogicalPlan::EmptyRelation(_)) {
// It may be possible that UNNEST is used as a source for the query.
// However, at this point, we don't yet know if it is just a single expression
// from another source or if it's from UNNEST.
//
// Unnest(Projection(EmptyRelation)) denotes a case with `UNNEST([...])`,
// which is normally safe to unnest as a table factor.
// However, in the future, more comprehensive checks can be added here.
return Ok(None);
};

let mut table_function_relation = TableFunctionRelationBuilder::default();
let mut exprs = projection
.expr
.iter()
.map(|e| unparser.expr_to_sql(e))
.collect::<Result<Vec<_>>>()?;

// These checks are defensive guards. In practice they are not reachable through
// standard SQL because the caller in `plan.rs` only invokes this method when the
// outer Projection has exactly one expression (the UNNEST placeholder), which means
// `columns.len()` is always 1. Multi-column UNNEST queries never enter this path.
if exprs.len() != 1 {
// Snowflake FLATTEN function only supports a single argument.
return plan_err!(
"Only support one argument for Snowflake FLATTEN, found {}",
exprs.len()
);
}

if columns.len() != 1 {
// Snowflake FLATTEN function only supports a single output column.
return plan_err!(
"Only support one output column for Snowflake FLATTEN, found {}",
columns.len()
);
}

exprs.extend(vec![
ast::Expr::Value(ValueWithSpan {
value: ast::Value::SingleQuotedString("".to_string()),
span: Span::empty(),
}),
ast::Expr::Value(ValueWithSpan {
value: ast::Value::Boolean(false),
span: Span::empty(),
}),
ast::Expr::Value(ValueWithSpan {
value: ast::Value::Boolean(false),
span: Span::empty(),
}),
ast::Expr::Value(ValueWithSpan {
value: ast::Value::SingleQuotedString("ARRAY".to_string()),
span: Span::empty(),
}),
]);

// Override the VALUE column alias (FLATTEN_VALUE_COLUMN_IDX) with the desired output
// column name so the caller can reference the flattened element by its logical name.
let column_alias = vec![
unparser.new_ident_quoted_if_needs("SEQ".to_string()),
unparser.new_ident_quoted_if_needs("KEY".to_string()),
unparser.new_ident_quoted_if_needs("PATH".to_string()),
unparser.new_ident_quoted_if_needs("INDEX".to_string()),
columns[0].clone(),
unparser.new_ident_quoted_if_needs("THIS".to_string()),
];

let func_expr = ast::Expr::Function(Function {
name: vec![Ident::new("FLATTEN")].into(),
uses_odbc_syntax: false,
parameters: ast::FunctionArguments::None,
args: ast::FunctionArguments::List(ast::FunctionArgumentList {
args: exprs
.into_iter()
.map(|e| ast::FunctionArg::Unnamed(ast::FunctionArgExpr::Expr(e)))
.collect(),
duplicate_treatment: None,
clauses: vec![],
}),
filter: None,
null_treatment: None,
over: None,
within_group: vec![],
});
table_function_relation.expr(func_expr);
table_function_relation.alias(Some(
unparser.new_table_alias(
unparser
.alias_generator
.next(UNNAMED_SNOWFLAKE_FLATTEN_SUBQUERY_PREFIX),
column_alias,
),
));
Ok(Some(TableFactorBuilder::TableFunction(
table_function_relation,
)))
}

fn relation_alias_overrides(
&self,
relation_builder: &mut RelationBuilder,
alias: Option<&ast::TableAlias>,
) -> bool {
// When using FLATTEN function, we need to adjust the alias of the derived table
// to match the output columns of the FLATTEN function. The 4th column corresponds
// to the flattened value, which we will alias to the desired output column name.
if let Some(TableFactorBuilder::TableFunction(rel_builder)) =
relation_builder.relation.as_mut()
&& let Some(value) = &alias
&& let Some(alias) = rel_builder.alias.as_mut()
&& alias
.name
.value
.starts_with(UNNAMED_SNOWFLAKE_FLATTEN_SUBQUERY_PREFIX)
&& value.columns.len() == 1
{
let mut new_columns = alias.columns.clone();
new_columns[FLATTEN_VALUE_COLUMN_IDX] = value.columns[0].clone();
let new_alias = ast::TableAlias {
name: value.name.clone(),
columns: new_columns,
explicit: true,
};
rel_builder.alias = Some(new_alias);
return true;
}
false
}
}

pub struct CustomDialect {
identifier_quote_style: Option<char>,
supports_nulls_first_in_sort: bool,
Expand Down
3 changes: 1 addition & 2 deletions datafusion/sql/src/unparser/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -867,8 +867,7 @@ impl Unparser<'_> {
.collect::<Result<Vec<_>>>()
}

/// This function can create an identifier with or without quotes based on the dialect rules
pub(super) fn new_ident_quoted_if_needs(&self, ident: String) -> Ident {
pub fn new_ident_quoted_if_needs(&self, ident: String) -> Ident {
let quote_style = self.dialect.identifier_quote_style(&ident);
Ident {
value: ident,
Expand Down
4 changes: 4 additions & 0 deletions datafusion/sql/src/unparser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod utils;

use self::dialect::{DefaultDialect, Dialect};
use crate::unparser::extension_unparser::UserDefinedLogicalNodeUnparser;
use datafusion_common::alias::AliasGenerator;
pub use expr::expr_to_sql;
pub use plan::plan_to_sql;
use std::sync::Arc;
Expand Down Expand Up @@ -58,6 +59,7 @@ pub struct Unparser<'a> {
dialect: &'a dyn Dialect,
pretty: bool,
extension_unparsers: Vec<Arc<dyn UserDefinedLogicalNodeUnparser>>,
pub alias_generator: AliasGenerator,
}

impl<'a> Unparser<'a> {
Expand All @@ -66,6 +68,7 @@ impl<'a> Unparser<'a> {
dialect,
pretty: false,
extension_unparsers: vec![],
alias_generator: AliasGenerator::new(),
}
}

Expand Down Expand Up @@ -136,6 +139,7 @@ impl Default for Unparser<'_> {
dialect: &DefaultDialect {},
pretty: false,
extension_unparsers: vec![],
alias_generator: AliasGenerator::new(),
}
}
}
Loading