Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 23 additions & 22 deletions datafusion/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use arrow::{
record_batch::RecordBatch,
};

use crate::datasource::{MemTable, TableProvider};
use crate::datasource::{MemTable, TableProvider, TableType};

use super::{
catalog::{CatalogList, CatalogProvider},
Expand Down Expand Up @@ -105,14 +105,25 @@ impl InformationSchemaProvider {
if schema_name != INFORMATION_SCHEMA {
let schema = catalog.schema(&schema_name).unwrap();
for table_name in schema.table_names() {
builder.add_base_table(&catalog_name, &schema_name, table_name)
let table = schema.table(&table_name).unwrap();
builder.add_table(
&catalog_name,
&schema_name,
table_name,
table.table_type(),
);
}
}
}

// Add a final list for the information schema tables themselves
builder.add_system_table(&catalog_name, INFORMATION_SCHEMA, TABLES);
builder.add_system_table(&catalog_name, INFORMATION_SCHEMA, COLUMNS);
builder.add_table(&catalog_name, INFORMATION_SCHEMA, TABLES, TableType::View);
builder.add_table(
&catalog_name,
INFORMATION_SCHEMA,
COLUMNS,
TableType::View,
);
}

let mem_table: MemTable = builder.into();
Expand Down Expand Up @@ -198,11 +209,12 @@ impl InformationSchemaTablesBuilder {
}
}

fn add_base_table(
fn add_table(
&mut self,
catalog_name: impl AsRef<str>,
schema_name: impl AsRef<str>,
table_name: impl AsRef<str>,
table_type: TableType,
) {
// Note: append_value is actually infallable.
self.catalog_names
Expand All @@ -212,24 +224,13 @@ impl InformationSchemaTablesBuilder {
.append_value(schema_name.as_ref())
.unwrap();
self.table_names.append_value(table_name.as_ref()).unwrap();
self.table_types.append_value("BASE TABLE").unwrap();
}

fn add_system_table(
&mut self,
catalog_name: impl AsRef<str>,
schema_name: impl AsRef<str>,
table_name: impl AsRef<str>,
) {
// Note: append_value is actually infallable.
self.catalog_names
.append_value(catalog_name.as_ref())
self.table_types
.append_value(match table_type {
TableType::Base => "BASE TABLE",
TableType::View => "VIEW",
TableType::Temporary => "LOCAL TEMPORARY",
})
.unwrap();
self.schema_names
.append_value(schema_name.as_ref())
.unwrap();
self.table_names.append_value(table_name.as_ref()).unwrap();
self.table_types.append_value("VIEW").unwrap();
}
}

Expand Down
16 changes: 16 additions & 0 deletions datafusion/src/datasource/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ pub enum TableProviderFilterPushDown {
Exact,
}

/// Indicates the type of this table for metadata/catalog purposes.
#[derive(Debug, Clone, Copy)]
pub enum TableType {
/// An ordinary physical table.
Base,
/// A non-materialised table that itself uses a query internally to provide data.
View,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A preview of things to come, perhaps :)

/// A transient table.
Temporary,
}

/// Source table
pub trait TableProvider: Sync + Send {
/// Returns the table provider as [`Any`](std::any::Any) so that it can be
Expand All @@ -75,6 +86,11 @@ pub trait TableProvider: Sync + Send {
/// Get a reference to the schema for this table
fn schema(&self) -> SchemaRef;

/// Get the type of this table for metadata/catalog purposes.
fn table_type(&self) -> TableType {
TableType::Base
}

/// Create an ExecutionPlan that will scan the table.
fn scan(
&self,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ pub mod memory;
pub mod parquet;

pub use self::csv::{CsvFile, CsvReadOptions};
pub use self::datasource::TableProvider;
pub use self::datasource::{TableProvider, TableType};
pub use self::memory::MemTable;
67 changes: 65 additions & 2 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -840,10 +840,11 @@ mod tests {
use crate::variable::VarType;
use crate::{
assert_batches_eq, assert_batches_sorted_eq,
logical_plan::{col, create_udf, sum},
logical_plan::{col, create_udf, sum, Expr},
};
use crate::{
datasource::MemTable, logical_plan::create_udaf,
datasource::{MemTable, TableType},
logical_plan::create_udaf,
physical_plan::expressions::AvgAccumulator,
};
use arrow::array::{
Expand Down Expand Up @@ -2631,6 +2632,68 @@ mod tests {
assert_batches_sorted_eq!(expected, &result);
}

#[tokio::test]
async fn information_schema_tables_table_types() {
struct TestTable(TableType);

impl TableProvider for TestTable {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn table_type(&self) -> TableType {
self.0
}

fn schema(&self) -> SchemaRef {
unimplemented!()
}

fn scan(
&self,
_: &Option<Vec<usize>>,
_: usize,
_: &[Expr],
_: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}

fn statistics(&self) -> crate::datasource::datasource::Statistics {
unimplemented!()
}
}

let mut ctx = ExecutionContext::with_config(
ExecutionConfig::new().with_information_schema(true),
);

ctx.register_table("physical", Arc::new(TestTable(TableType::Base)))
.unwrap();
ctx.register_table("query", Arc::new(TestTable(TableType::View)))
.unwrap();
ctx.register_table("temp", Arc::new(TestTable(TableType::Temporary)))
.unwrap();

let result =
plan_and_collect(&mut ctx, "SELECT * from information_schema.tables")
.await
.unwrap();

let expected = vec![
"+---------------+--------------------+------------+-----------------+",
"| table_catalog | table_schema | table_name | table_type |",
"+---------------+--------------------+------------+-----------------+",
"| datafusion | information_schema | tables | VIEW |",
"| datafusion | information_schema | columns | VIEW |",
"| datafusion | public | physical | BASE TABLE |",
"| datafusion | public | query | VIEW |",
"| datafusion | public | temp | LOCAL TEMPORARY |",
"+---------------+--------------------+------------+-----------------+",
];
assert_batches_sorted_eq!(expected, &result);
}

#[tokio::test]
async fn information_schema_show_tables_no_information_schema() {
let mut ctx = ExecutionContext::with_config(ExecutionConfig::new());
Expand Down