Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 108 additions & 18 deletions datafusion/core/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use super::{

const INFORMATION_SCHEMA: &str = "information_schema";
const TABLES: &str = "tables";
const VIEWS: &str = "views";
const COLUMNS: &str = "columns";

/// Wraps another [`CatalogProvider`] and adds a "information_schema"
Expand Down Expand Up @@ -124,28 +125,21 @@ impl InformationSchemaProvider {
builder.add_table(
&catalog_name,
&schema_name,
table_name,
&table_name,
table.table_type(),
table.get_table_definition(),
);
}
}
}

// Add a final list for the information schema tables themselves
builder.add_table(
&catalog_name,
INFORMATION_SCHEMA,
TABLES,
TableType::View,
None::<&str>,
);
builder.add_table(&catalog_name, INFORMATION_SCHEMA, TABLES, TableType::View);
builder.add_table(&catalog_name, INFORMATION_SCHEMA, VIEWS, TableType::View);
builder.add_table(
&catalog_name,
INFORMATION_SCHEMA,
COLUMNS,
TableType::View,
None::<&str>,
);
}

Expand All @@ -154,6 +148,32 @@ impl InformationSchemaProvider {
Arc::new(mem_table)
}

fn make_views(&self) -> Arc<dyn TableProvider> {
let mut builder = InformationSchemaViewBuilder::new();

for catalog_name in self.catalog_list.catalog_names() {
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();

for schema_name in catalog.schema_names() {
if schema_name != INFORMATION_SCHEMA {
let schema = catalog.schema(&schema_name).unwrap();
for table_name in schema.table_names() {
let table = schema.table(&table_name).unwrap();
builder.add_view(
&catalog_name,
&schema_name,
&table_name,
table.get_table_definition(),
)
}
}
}
}

let mem_table: MemTable = builder.into();
Arc::new(mem_table)
}

/// Construct the `information_schema.columns` virtual table
fn make_columns(&self) -> Arc<dyn TableProvider> {
let mut builder = InformationSchemaColumnsBuilder::new();
Expand Down Expand Up @@ -194,21 +214,23 @@ impl SchemaProvider for InformationSchemaProvider {
}

fn table_names(&self) -> Vec<String> {
vec![TABLES.to_string(), COLUMNS.to_string()]
vec![TABLES.to_string(), VIEWS.to_string(), COLUMNS.to_string()]
}

fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
if name.eq_ignore_ascii_case("tables") {
Some(self.make_tables())
} else if name.eq_ignore_ascii_case("columns") {
Some(self.make_columns())
} else if name.eq_ignore_ascii_case("views") {
Some(self.make_views())
} else {
None
}
}

fn table_exist(&self, name: &str) -> bool {
return matches!(name.to_ascii_lowercase().as_str(), TABLES | COLUMNS);
return matches!(name.to_ascii_lowercase().as_str(), TABLES | VIEWS | COLUMNS);
}
}

Expand All @@ -220,7 +242,6 @@ struct InformationSchemaTablesBuilder {
schema_names: StringBuilder,
table_names: StringBuilder,
table_types: StringBuilder,
definitions: StringBuilder,
}

impl InformationSchemaTablesBuilder {
Expand All @@ -234,7 +255,6 @@ impl InformationSchemaTablesBuilder {
schema_names: StringBuilder::new(default_capacity),
table_names: StringBuilder::new(default_capacity),
table_types: StringBuilder::new(default_capacity),
definitions: StringBuilder::new(default_capacity),
}
}

Expand All @@ -244,7 +264,6 @@ impl InformationSchemaTablesBuilder {
schema_name: impl AsRef<str>,
table_name: impl AsRef<str>,
table_type: TableType,
definition: Option<impl AsRef<str>>,
) {
// Note: append_value is actually infallable.
self.catalog_names
Expand All @@ -261,7 +280,6 @@ impl InformationSchemaTablesBuilder {
TableType::Temporary => "LOCAL TEMPORARY",
})
.unwrap();
self.definitions.append_option(definition.as_ref()).unwrap();
}
}

Expand All @@ -272,15 +290,13 @@ impl From<InformationSchemaTablesBuilder> for MemTable {
Field::new("table_schema", DataType::Utf8, false),
Field::new("table_name", DataType::Utf8, false),
Field::new("table_type", DataType::Utf8, false),
Field::new("definition", DataType::Utf8, true),
]);

let InformationSchemaTablesBuilder {
mut catalog_names,
mut schema_names,
mut table_names,
mut table_types,
mut definitions,
} = value;

let schema = Arc::new(schema);
Expand All @@ -291,6 +307,80 @@ impl From<InformationSchemaTablesBuilder> for MemTable {
Arc::new(schema_names.finish()),
Arc::new(table_names.finish()),
Arc::new(table_types.finish()),
],
)
.unwrap();

MemTable::try_new(schema, vec![vec![batch]]).unwrap()
}
}

/// Builds the `information_schema.VIEWS` table row by row
///
/// Columns are based on <https://www.postgresql.org/docs/current/infoschema-columns.html>
struct InformationSchemaViewBuilder {
catalog_names: StringBuilder,
schema_names: StringBuilder,
table_names: StringBuilder,
definitions: StringBuilder,
}

impl InformationSchemaViewBuilder {
fn new() -> Self {
// StringBuilder requires providing an initial capacity, so
// pick 10 here arbitrarily as this is not performance
// critical code and the number of tables is unavailable here.
let default_capacity = 10;
Self {
catalog_names: StringBuilder::new(default_capacity),
schema_names: StringBuilder::new(default_capacity),
table_names: StringBuilder::new(default_capacity),
definitions: StringBuilder::new(default_capacity),
}
}

fn add_view(
&mut self,
catalog_name: impl AsRef<str>,
schema_name: impl AsRef<str>,
table_name: impl AsRef<str>,
definition: Option<impl AsRef<str>>,
) {
// Note: append_value is actually infallable.
self.catalog_names
.append_value(catalog_name.as_ref())
.unwrap();
self.schema_names
.append_value(schema_name.as_ref())
.unwrap();
self.table_names.append_value(table_name.as_ref()).unwrap();
self.definitions.append_option(definition.as_ref()).unwrap();
}
}

impl From<InformationSchemaViewBuilder> for MemTable {
fn from(value: InformationSchemaViewBuilder) -> Self {
let schema = Schema::new(vec![
Field::new("table_catalog", DataType::Utf8, false),
Field::new("table_schema", DataType::Utf8, false),
Field::new("table_name", DataType::Utf8, false),
Field::new("definition", DataType::Utf8, true),
]);

let InformationSchemaViewBuilder {
mut catalog_names,
mut schema_names,
mut table_names,
mut definitions,
} = value;

let schema = Arc::new(schema);
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(catalog_names.finish()),
Arc::new(schema_names.finish()),
Arc::new(table_names.finish()),
Arc::new(definitions.finish()),
],
)
Expand Down
Loading