Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion datafusion/common/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ impl Column {
/// where `"foo.BAR"` would be parsed to a reference to column named `foo.BAR`
pub fn from_qualified_name(flat_name: impl Into<String>) -> Self {
let flat_name = flat_name.into();
Self::from_idents(parse_identifiers_normalized(&flat_name, false)).unwrap_or_else(

Self::from_idents(parse_identifiers_normalized(&flat_name, true)).unwrap_or_else(
|| Self {
relation: None,
name: flat_name,
Expand Down
22 changes: 21 additions & 1 deletion datafusion/common/src/table_reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ impl TableReference {
/// identifier, normalizing `s` to lowercase.
/// See docs on [`TableReference`] for more details.
pub fn parse_str(s: &str) -> Self {
Self::parse_str_normalized(s, false)
Self::parse_str_normalized(s, true)
}

/// Forms a [`TableReference`] by parsing `s` as a multipart SQL
Expand Down Expand Up @@ -331,6 +331,26 @@ impl TableReference {
} => vec![catalog.to_string(), schema.to_string(), table.to_string()],
}
}

/// Will ignore case for all [TableReference] identifiers
pub fn to_ignore_case(&self) -> Self {
match self {
TableReference::Bare { table } => Self::bare(table.to_ascii_lowercase()),
TableReference::Partial { schema, table } => {
Self::partial(schema.to_ascii_lowercase(), table.to_ascii_lowercase())
}

TableReference::Full {
catalog,
schema,
table,
} => Self::full(
catalog.to_ascii_lowercase(),
schema.to_ascii_lowercase(),
table.to_ascii_lowercase(),
),
}
}
}

/// Parse a string into a TableReference, normalizing where appropriate
Expand Down
104 changes: 104 additions & 0 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1607,12 +1607,29 @@ impl SessionContext {
provider: Arc<dyn TableProvider>,
) -> Result<Option<Arc<dyn TableProvider>>> {
let table_ref: TableReference = table_ref.into();
let table_ref = self.table_ref_ident_normalization(table_ref);

let table = table_ref.table().to_owned();
self.state
.read()
.schema_for_ref(table_ref)?
.register_table(table, provider)
}
// Normalize table ident (convert ident to lowercase)
fn table_ref_ident_normalization(&self, table_ref: TableReference) -> TableReference {
let normalize = self
.state()
.config()
.options()
.sql_parser
.enable_ident_normalization;

if normalize {
table_ref.to_ignore_case()
} else {
table_ref
}
}

/// Deregisters the given table.
///
Expand All @@ -1622,6 +1639,8 @@ impl SessionContext {
table_ref: impl Into<TableReference>,
) -> Result<Option<Arc<dyn TableProvider>>> {
let table_ref = table_ref.into();
let table_ref = self.table_ref_ident_normalization(table_ref);

let table = table_ref.table().to_owned();
self.state
.read()
Expand All @@ -1632,6 +1651,8 @@ impl SessionContext {
/// Return `true` if the specified table exists in the schema provider.
pub fn table_exist(&self, table_ref: impl Into<TableReference>) -> Result<bool> {
let table_ref: TableReference = table_ref.into();
let table_ref = self.table_ref_ident_normalization(table_ref);

let table = table_ref.table();
let table_ref = table_ref.clone();
Ok(self
Expand All @@ -1650,6 +1671,8 @@ impl SessionContext {
/// [`register_table`]: SessionContext::register_table
pub async fn table(&self, table_ref: impl Into<TableReference>) -> Result<DataFrame> {
let table_ref: TableReference = table_ref.into();
let table_ref = self.table_ref_ident_normalization(table_ref);

let provider = self.table_provider(table_ref.clone()).await?;
let plan = LogicalPlanBuilder::scan(
table_ref,
Expand Down Expand Up @@ -2456,6 +2479,87 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_table_name_normalization_disabled() -> Result<()> {
let ctx = SessionContext::new();
ctx.sql("SET datafusion.sql_parser.enable_ident_normalization=false")
.await?;

ctx.register_csv(
"UPPERCASE_TABLE",
"tests/tpch-csv/customer.csv",
Default::default(),
)
.await?;

let result = plan_and_collect(
&ctx,
format!("select c_name from UPPERCASE_TABLE limit 3").as_str(),
)
.await?;

let actual = arrow::util::pretty::pretty_format_batches(&result)
.unwrap()
.to_string();
assert_snapshot!(actual, @r"
+--------------------+
| c_name |
+--------------------+
| Customer#000000002 |
| Customer#000000003 |
| Customer#000000004 |
+--------------------+
");

assert!(!ctx.table_exist("uppercase_table")?);
assert!(ctx.table_exist("UPPERCASE_TABLE")?);

assert!(ctx.deregister_table("uppercase_table")?.is_none());
assert!(ctx.deregister_table("UPPERCASE_TABLE")?.is_some());

Ok(())
}

#[tokio::test]
async fn test_table_name_normalization_enabled() -> Result<()> {
let ctx = SessionContext::new();
ctx.sql("SET datafusion.sql_parser.enable_ident_normalization=true")
.await?;

ctx.register_csv(
"UPPERCASE_TABLE",
"tests/tpch-csv/customer.csv",
Default::default(),
)
.await?;

let result = plan_and_collect(
&ctx,
format!("select c_name from uppercase_table limit 3").as_str(),
)
.await?;

let actual = arrow::util::pretty::pretty_format_batches(&result)
.unwrap()
.to_string();
assert_snapshot!(actual, @r"
+--------------------+
| c_name |
+--------------------+
| Customer#000000002 |
| Customer#000000003 |
| Customer#000000004 |
+--------------------+
");

assert!(ctx.table_exist("uppercase_table")?);
assert!(ctx.table_exist("UPPERCASE_TABLE")?);

assert!(ctx.deregister_table("uppercase_table")?.is_some());

Ok(())
}

struct MyPhysicalPlanner {}

#[async_trait]
Expand Down
6 changes: 3 additions & 3 deletions datafusion/expr/src/expr_rewriter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,12 +416,12 @@ mod test {

#[test]
fn normalize_cols() {
let expr = col("a") + col("b") + col("c");
let expr = col("A") + col("b") + col("c");

// Schemas with some matching and some non matching cols
let schema_a = make_schema_with_empty_metadata(
vec![Some("tableA".into()), Some("tableA".into())],
vec!["a", "aa"],
vec!["A", "aa"],
);
let schema_c = make_schema_with_empty_metadata(
vec![Some("tableC".into()), Some("tableC".into())],
Expand All @@ -442,7 +442,7 @@ mod test {
.unwrap();
assert_eq!(
normalized_expr,
col("tableA.a") + col("tableB.b") + col("tableC.c")
col("tableA.A") + col("tableB.b") + col("tableC.c")
);
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2222,13 +2222,13 @@ mod tests {
.unwrap();
assert_snapshot!(plan.schema().as_ref(), @"fields:[employee_csv.id, employee_csv.first_name, employee_csv.last_name, employee_csv.state, employee_csv.salary], metadata:{}");

// Note scan of "EMPLOYEE_CSV" is treated as a SQL identifier
// (and thus normalized to "employee"csv") as well
// Identifiers should be normalized at parsing time
// if explicitly set to uppercase it should be preserved
let projection = None;
let plan =
LogicalPlanBuilder::scan("EMPLOYEE_CSV", table_source(&schema), projection)
.unwrap();
assert_snapshot!(plan.schema().as_ref(), @"fields:[employee_csv.id, employee_csv.first_name, employee_csv.last_name, employee_csv.state, employee_csv.salary], metadata:{}");
assert_snapshot!(plan.schema().as_ref(), @"fields:[EMPLOYEE_CSV.id, EMPLOYEE_CSV.first_name, EMPLOYEE_CSV.last_name, EMPLOYEE_CSV.state, EMPLOYEE_CSV.salary], metadata:{}");
}

#[test]
Expand Down
Loading