Skip to content

Commit

Permalink
Merge branch 'apache:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
LorrensP-2158466 committed Jun 17, 2024
2 parents 4157d6a + a923c65 commit 07bbe95
Show file tree
Hide file tree
Showing 7 changed files with 295 additions and 137 deletions.
2 changes: 1 addition & 1 deletion datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ struct Args {

#[clap(
long,
help = "The max number of rows to display for 'Table' format\n[default: 40] [possible values: numbers(0/10/...), inf(no limit)]",
help = "The max number of rows to display for 'Table' format\n[possible values: numbers(0/10/...), inf(no limit)]",
default_value = "40"
)]
maxrows: MaxRows,
Expand Down
239 changes: 237 additions & 2 deletions datafusion/core/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use crate::catalog::schema::SchemaProvider;
use dashmap::DashMap;
use datafusion_common::{exec_err, not_impl_err, Result};
use std::any::Any;
use std::collections::BTreeSet;
use std::ops::ControlFlow;
use std::sync::Arc;

/// Represent a list of named [`CatalogProvider`]s.
Expand Down Expand Up @@ -157,11 +159,11 @@ impl CatalogProviderList for MemoryCatalogProviderList {
/// access required to read table details (e.g. statistics).
///
/// The pattern that DataFusion itself uses to plan SQL queries is to walk over
/// the query to [find all schema / table references in an `async` function],
/// the query to [find all table references],
/// performing required remote catalog in parallel, and then plans the query
/// using that snapshot.
///
/// [find all schema / table references in an `async` function]: crate::execution::context::SessionState::resolve_table_references
/// [find all table references]: resolve_table_references
///
/// # Example Catalog Implementations
///
Expand Down Expand Up @@ -295,6 +297,182 @@ impl CatalogProvider for MemoryCatalogProvider {
}
}

/// Collects all tables and views referenced in the SQL statement. CTEs are collected separately.
/// This can be used to determine which tables need to be in the catalog for a query to be planned.
///
/// # Returns
///
/// A `(table_refs, ctes)` tuple, the first element contains table and view references and the second
/// element contains any CTE aliases that were defined and possibly referenced.
///
/// ## Example
///
/// ```
/// # use datafusion_sql::parser::DFParser;
/// # use datafusion::catalog::resolve_table_references;
/// let query = "SELECT a FROM foo where x IN (SELECT y FROM bar)";
/// let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
/// let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
/// assert_eq!(table_refs.len(), 2);
/// assert_eq!(table_refs[0].to_string(), "bar");
/// assert_eq!(table_refs[1].to_string(), "foo");
/// assert_eq!(ctes.len(), 0);
/// ```
///
/// ## Example with CTEs
///
/// ```
/// # use datafusion_sql::parser::DFParser;
/// # use datafusion::catalog::resolve_table_references;
/// let query = "with my_cte as (values (1), (2)) SELECT * from my_cte;";
/// let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
/// let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
/// assert_eq!(table_refs.len(), 0);
/// assert_eq!(ctes.len(), 1);
/// assert_eq!(ctes[0].to_string(), "my_cte");
/// ```
pub fn resolve_table_references(
statement: &datafusion_sql::parser::Statement,
enable_ident_normalization: bool,
) -> datafusion_common::Result<(Vec<TableReference>, Vec<TableReference>)> {
use crate::sql::planner::object_name_to_table_reference;
use datafusion_sql::parser::{
CopyToSource, CopyToStatement, Statement as DFStatement,
};
use information_schema::INFORMATION_SCHEMA;
use information_schema::INFORMATION_SCHEMA_TABLES;
use sqlparser::ast::*;

struct RelationVisitor {
relations: BTreeSet<ObjectName>,
all_ctes: BTreeSet<ObjectName>,
ctes_in_scope: Vec<ObjectName>,
}

impl RelationVisitor {
/// Record the reference to `relation`, if it's not a CTE reference.
fn insert_relation(&mut self, relation: &ObjectName) {
if !self.relations.contains(relation)
&& !self.ctes_in_scope.contains(relation)
{
self.relations.insert(relation.clone());
}
}
}

impl Visitor for RelationVisitor {
type Break = ();

fn pre_visit_relation(&mut self, relation: &ObjectName) -> ControlFlow<()> {
self.insert_relation(relation);
ControlFlow::Continue(())
}

fn pre_visit_query(&mut self, q: &Query) -> ControlFlow<Self::Break> {
if let Some(with) = &q.with {
for cte in &with.cte_tables {
// The non-recursive CTE name is not in scope when evaluating the CTE itself, so this is valid:
// `WITH t AS (SELECT * FROM t) SELECT * FROM t`
// Where the first `t` refers to a predefined table. So we are careful here
// to visit the CTE first, before putting it in scope.
if !with.recursive {
// This is a bit hackish as the CTE will be visited again as part of visiting `q`,
// but thankfully `insert_relation` is idempotent.
cte.visit(self);
}
self.ctes_in_scope
.push(ObjectName(vec![cte.alias.name.clone()]));
}
}
ControlFlow::Continue(())
}

fn post_visit_query(&mut self, q: &Query) -> ControlFlow<Self::Break> {
if let Some(with) = &q.with {
for _ in &with.cte_tables {
// Unwrap: We just pushed these in `pre_visit_query`
self.all_ctes.insert(self.ctes_in_scope.pop().unwrap());
}
}
ControlFlow::Continue(())
}

fn pre_visit_statement(&mut self, statement: &Statement) -> ControlFlow<()> {
if let Statement::ShowCreate {
obj_type: ShowCreateObject::Table | ShowCreateObject::View,
obj_name,
} = statement
{
self.insert_relation(obj_name)
}

// SHOW statements will later be rewritten into a SELECT from the information_schema
let requires_information_schema = matches!(
statement,
Statement::ShowFunctions { .. }
| Statement::ShowVariable { .. }
| Statement::ShowStatus { .. }
| Statement::ShowVariables { .. }
| Statement::ShowCreate { .. }
| Statement::ShowColumns { .. }
| Statement::ShowTables { .. }
| Statement::ShowCollation { .. }
);
if requires_information_schema {
for s in INFORMATION_SCHEMA_TABLES {
self.relations.insert(ObjectName(vec![
Ident::new(INFORMATION_SCHEMA),
Ident::new(*s),
]));
}
}
ControlFlow::Continue(())
}
}

let mut visitor = RelationVisitor {
relations: BTreeSet::new(),
all_ctes: BTreeSet::new(),
ctes_in_scope: vec![],
};

fn visit_statement(statement: &DFStatement, visitor: &mut RelationVisitor) {
match statement {
DFStatement::Statement(s) => {
let _ = s.as_ref().visit(visitor);
}
DFStatement::CreateExternalTable(table) => {
visitor
.relations
.insert(ObjectName(vec![Ident::from(table.name.as_str())]));
}
DFStatement::CopyTo(CopyToStatement { source, .. }) => match source {
CopyToSource::Relation(table_name) => {
visitor.insert_relation(table_name);
}
CopyToSource::Query(query) => {
query.visit(visitor);
}
},
DFStatement::Explain(explain) => visit_statement(&explain.statement, visitor),
}
}

visit_statement(statement, &mut visitor);

let table_refs = visitor
.relations
.into_iter()
.map(|x| object_name_to_table_reference(x, enable_ident_normalization))
.collect::<datafusion_common::Result<_>>()?;
let ctes = visitor
.all_ctes
.into_iter()
.map(|x| object_name_to_table_reference(x, enable_ident_normalization))
.collect::<datafusion_common::Result<_>>()?;
Ok((table_refs, ctes))
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -363,4 +541,61 @@ mod tests {
let cat = Arc::new(MemoryCatalogProvider::new()) as Arc<dyn CatalogProvider>;
assert!(cat.deregister_schema("foo", false).unwrap().is_none());
}

#[test]
fn resolve_table_references_shadowed_cte() {
use datafusion_sql::parser::DFParser;

// An interesting edge case where the `t` name is used both as an ordinary table reference
// and as a CTE reference.
let query = "WITH t AS (SELECT * FROM t) SELECT * FROM t";
let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
assert_eq!(table_refs.len(), 1);
assert_eq!(ctes.len(), 1);
assert_eq!(ctes[0].to_string(), "t");
assert_eq!(table_refs[0].to_string(), "t");

// UNION is a special case where the CTE is not in scope for the second branch.
let query = "(with t as (select 1) select * from t) union (select * from t)";
let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
assert_eq!(table_refs.len(), 1);
assert_eq!(ctes.len(), 1);
assert_eq!(ctes[0].to_string(), "t");
assert_eq!(table_refs[0].to_string(), "t");

// Nested CTEs are also handled.
// Here the first `u` is a CTE, but the second `u` is a table reference.
// While `t` is always a CTE.
let query = "(with t as (with u as (select 1) select * from u) select * from u cross join t)";
let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
assert_eq!(table_refs.len(), 1);
assert_eq!(ctes.len(), 2);
assert_eq!(ctes[0].to_string(), "t");
assert_eq!(ctes[1].to_string(), "u");
assert_eq!(table_refs[0].to_string(), "u");
}

#[test]
fn resolve_table_references_recursive_cte() {
use datafusion_sql::parser::DFParser;

let query = "
WITH RECURSIVE nodes AS (
SELECT 1 as id
UNION ALL
SELECT id + 1 as id
FROM nodes
WHERE id < 10
)
SELECT * FROM nodes
";
let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
assert_eq!(table_refs.len(), 0);
assert_eq!(ctes.len(), 1);
assert_eq!(ctes[0].to_string(), "nodes");
}
}
62 changes: 34 additions & 28 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,24 +303,12 @@ macro_rules! get_statistics {
))),
DataType::Int8 => Ok(Arc::new(Int8Array::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
x.and_then(|x| {
if let Ok(v) = i8::try_from(*x) {
Some(v)
} else {
None
}
})
x.and_then(|x| i8::try_from(*x).ok())
}),
))),
DataType::Int16 => Ok(Arc::new(Int16Array::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
x.and_then(|x| {
if let Ok(v) = i16::try_from(*x) {
Some(v)
} else {
None
}
})
x.and_then(|x| i16::try_from(*x).ok())
}),
))),
DataType::Int32 => Ok(Arc::new(Int32Array::from_iter(
Expand All @@ -331,24 +319,12 @@ macro_rules! get_statistics {
))),
DataType::UInt8 => Ok(Arc::new(UInt8Array::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
x.and_then(|x| {
if let Ok(v) = u8::try_from(*x) {
Some(v)
} else {
None
}
})
x.and_then(|x| u8::try_from(*x).ok())
}),
))),
DataType::UInt16 => Ok(Arc::new(UInt16Array::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| {
x.and_then(|x| {
if let Ok(v) = u16::try_from(*x) {
Some(v)
} else {
None
}
})
x.and_then(|x| u16::try_from(*x).ok())
}),
))),
DataType::UInt32 => Ok(Arc::new(UInt32Array::from_iter(
Expand Down Expand Up @@ -572,13 +548,38 @@ macro_rules! make_data_page_stats_iterator {
};
}

make_data_page_stats_iterator!(MinInt32DataPageStatsIterator, min, Index::INT32, i32);
make_data_page_stats_iterator!(MaxInt32DataPageStatsIterator, max, Index::INT32, i32);
make_data_page_stats_iterator!(MinInt64DataPageStatsIterator, min, Index::INT64, i64);
make_data_page_stats_iterator!(MaxInt64DataPageStatsIterator, max, Index::INT64, i64);

macro_rules! get_data_page_statistics {
($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => {
paste! {
match $data_type {
Some(DataType::Int8) => Ok(Arc::new(
Int8Array::from_iter(
[<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
.map(|x| {
x.into_iter().filter_map(|x| {
x.and_then(|x| i8::try_from(x).ok())
})
})
.flatten()
)
)),
Some(DataType::Int16) => Ok(Arc::new(
Int16Array::from_iter(
[<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
.map(|x| {
x.into_iter().filter_map(|x| {
x.and_then(|x| i16::try_from(x).ok())
})
})
.flatten()
)
)),
Some(DataType::Int32) => Ok(Arc::new(Int32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))),
Some(DataType::Int64) => Ok(Arc::new(Int64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))),
_ => unimplemented!()
}
Expand Down Expand Up @@ -666,6 +667,11 @@ where
{
let iter = iterator.flat_map(|(len, index)| match index {
Index::NONE => vec![None; len],
Index::INT32(native_index) => native_index
.indexes
.iter()
.map(|x| x.null_count.map(|x| x as u64))
.collect::<Vec<_>>(),
Index::INT64(native_index) => native_index
.indexes
.iter()
Expand Down

0 comments on commit 07bbe95

Please sign in to comment.