Skip to content

Commit

Permalink
feat: replace table with tables
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Jan 16, 2024
1 parent 1f3317c commit b87bb80
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 50 deletions.
52 changes: 23 additions & 29 deletions src/catalog/src/information_schema/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, VectorRef};
use futures_util::StreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};

Expand Down Expand Up @@ -183,37 +184,30 @@ impl InformationSchemaColumnsBuilder {
continue;
}

for table_name in catalog_manager
.table_names(&catalog_name, &schema_name)
.await?
{
if let Some(table) = catalog_manager
.table(&catalog_name, &schema_name, &table_name)
.await?
{
let keys = &table.table_info().meta.primary_key_indices;
let schema = table.schema();
let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await;

while let Some(table) = stream.next().await {
let table = table?;
let keys = &table.table_info().meta.primary_key_indices;
let schema = table.schema();

for (idx, column) in schema.column_schemas().iter().enumerate() {
let semantic_type = if column.is_time_index() {
SEMANTIC_TYPE_TIME_INDEX
} else if keys.contains(&idx) {
SEMANTIC_TYPE_PRIMARY_KEY
} else {
SEMANTIC_TYPE_FIELD
};
for (idx, column) in schema.column_schemas().iter().enumerate() {
let semantic_type = if column.is_time_index() {
SEMANTIC_TYPE_TIME_INDEX
} else if keys.contains(&idx) {
SEMANTIC_TYPE_PRIMARY_KEY
} else {
SEMANTIC_TYPE_FIELD
};

self.add_column(
&predicates,
&catalog_name,
&schema_name,
&table_name,
semantic_type,
column,
);
}
} else {
unreachable!();
self.add_column(
&predicates,
&catalog_name,
&schema_name,
&table.table_info().name,
semantic_type,
column,
);
}
}
}
Expand Down
36 changes: 15 additions & 21 deletions src/catalog/src/information_schema/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder};
use futures_util::StreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
use table::metadata::TableType;
Expand Down Expand Up @@ -166,27 +167,20 @@ impl InformationSchemaTablesBuilder {
continue;
}

for table_name in catalog_manager
.table_names(&catalog_name, &schema_name)
.await?
{
if let Some(table) = catalog_manager
.table(&catalog_name, &schema_name, &table_name)
.await?
{
let table_info = table.table_info();
self.add_table(
&predicates,
&catalog_name,
&schema_name,
&table_name,
table.table_type(),
Some(table_info.ident.table_id),
Some(&table_info.meta.engine),
);
} else {
unreachable!();
}
let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await;

while let Some(table) = stream.next().await {
let table = table?;
let table_info = table.table_info();
self.add_table(
&predicates,
&catalog_name,
&schema_name,
&table_info.name,
table.table_type(),
Some(table_info.ident.table_id),
Some(&table_info.meta.engine),
);
}
}

Expand Down

0 comments on commit b87bb80

Please sign in to comment.