Skip to content

Commit

Permalink
feat: tables stream with CatalogManager (#3180)
Browse files Browse the repository at this point in the history
* feat: add tables for CatalogManager

* feat: replace table with tables

* Update src/catalog/src/information_schema/columns.rs

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* Update src/catalog/src/information_schema/columns.rs

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* Update src/catalog/src/information_schema/tables.rs

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* Update src/catalog/src/information_schema/tables.rs

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* feat: tables for MemoryCatalogManager

---------

Co-authored-by: Weny Xu <wenymedia@gmail.com>
  • Loading branch information
fengjiachun and WenyXu committed Jan 17, 2024
1 parent 7a1b856 commit c6c4ea5
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 111 deletions.
51 changes: 22 additions & 29 deletions src/catalog/src/information_schema/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, VectorRef};
use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};

Expand Down Expand Up @@ -183,37 +184,29 @@ impl InformationSchemaColumnsBuilder {
continue;
}

for table_name in catalog_manager
.table_names(&catalog_name, &schema_name)
.await?
{
if let Some(table) = catalog_manager
.table(&catalog_name, &schema_name, &table_name)
.await?
{
let keys = &table.table_info().meta.primary_key_indices;
let schema = table.schema();
let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await;

while let Some(table) = stream.try_next().await? {
let keys = &table.table_info().meta.primary_key_indices;
let schema = table.schema();

for (idx, column) in schema.column_schemas().iter().enumerate() {
let semantic_type = if column.is_time_index() {
SEMANTIC_TYPE_TIME_INDEX
} else if keys.contains(&idx) {
SEMANTIC_TYPE_PRIMARY_KEY
} else {
SEMANTIC_TYPE_FIELD
};
for (idx, column) in schema.column_schemas().iter().enumerate() {
let semantic_type = if column.is_time_index() {
SEMANTIC_TYPE_TIME_INDEX
} else if keys.contains(&idx) {
SEMANTIC_TYPE_PRIMARY_KEY
} else {
SEMANTIC_TYPE_FIELD
};

self.add_column(
&predicates,
&catalog_name,
&schema_name,
&table_name,
semantic_type,
column,
);
}
} else {
unreachable!();
self.add_column(
&predicates,
&catalog_name,
&schema_name,
&table.table_info().name,
semantic_type,
column,
);
}
}
}
Expand Down
35 changes: 14 additions & 21 deletions src/catalog/src/information_schema/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder};
use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
use table::metadata::TableType;
Expand Down Expand Up @@ -166,27 +167,19 @@ impl InformationSchemaTablesBuilder {
continue;
}

for table_name in catalog_manager
.table_names(&catalog_name, &schema_name)
.await?
{
if let Some(table) = catalog_manager
.table(&catalog_name, &schema_name, &table_name)
.await?
{
let table_info = table.table_info();
self.add_table(
&predicates,
&catalog_name,
&schema_name,
&table_name,
table.table_type(),
Some(table_info.ident.table_id),
Some(&table_info.meta.engine),
);
} else {
unreachable!();
}
let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await;

while let Some(table) = stream.try_next().await? {
let table_info = table.table_info();
self.add_table(
&predicates,
&catalog_name,
&schema_name,
&table_info.name,
table.table_type(),
Some(table_info.ident.table_id),
Some(&table_info.meta.engine),
);
}
}

Expand Down
82 changes: 68 additions & 14 deletions src/catalog/src/kvbackend/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@ use std::any::Any;
use std::collections::BTreeSet;
use std::sync::{Arc, Weak};

use async_stream::try_stream;
use common_catalog::consts::{DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID};
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::{CacheInvalidator, CacheInvalidatorRef, Context};
use common_meta::error::Result as MetaResult;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::table_name::TableName;
use futures_util::TryStreamExt;
use futures_util::stream::BoxStream;
use futures_util::{StreamExt, TryStreamExt};
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use snafu::prelude::*;
use table::dist_table::DistTable;
Expand Down Expand Up @@ -58,17 +61,25 @@ pub struct KvBackendCatalogManager {
system_catalog: SystemCatalog,
}

fn make_table(table_info_value: TableInfoValue) -> CatalogResult<TableRef> {
let table_info = table_info_value
.table_info
.try_into()
.context(catalog_err::InvalidTableInfoInCatalogSnafu)?;
Ok(DistTable::table(Arc::new(table_info)))
}

#[async_trait::async_trait]
impl CacheInvalidator for KvBackendCatalogManager {
async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> {
async fn invalidate_table_id(&self, ctx: &Context, table_id: TableId) -> MetaResult<()> {
self.cache_invalidator
.invalidate_table_name(ctx, table_name)
.invalidate_table_id(ctx, table_id)
.await
}

async fn invalidate_table_id(&self, ctx: &Context, table_id: TableId) -> MetaResult<()> {
async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> {
self.cache_invalidator
.invalidate_table_id(ctx, table_id)
.invalidate_table_name(ctx, table_name)
.await
}
}
Expand Down Expand Up @@ -101,6 +112,10 @@ impl KvBackendCatalogManager {

#[async_trait::async_trait]
impl CatalogManager for KvBackendCatalogManager {
fn as_any(&self) -> &dyn Any {
self
}

async fn catalog_names(&self) -> CatalogResult<Vec<String>> {
let stream = self
.table_metadata_manager
Expand Down Expand Up @@ -219,17 +234,56 @@ impl CatalogManager for KvBackendCatalogManager {
else {
return Ok(None);
};
let table_info = Arc::new(
table_info_value
.table_info
.try_into()
.context(catalog_err::InvalidTableInfoInCatalogSnafu)?,
);
Ok(Some(DistTable::table(table_info)))
make_table(table_info_value).map(Some)
}

fn as_any(&self) -> &dyn Any {
self
async fn tables<'a>(
&'a self,
catalog: &'a str,
schema: &'a str,
) -> BoxStream<'a, CatalogResult<TableRef>> {
let sys_tables = try_stream!({
// System tables
let sys_table_names = self.system_catalog.table_names(schema);
for table_name in sys_table_names {
if let Some(table) = self.system_catalog.table(catalog, schema, &table_name) {
yield table;
}
}
});

let table_id_stream = self
.table_metadata_manager
.table_name_manager()
.tables(catalog, schema)
.await
.map_ok(|(_, v)| v.table_id());
const BATCH_SIZE: usize = 128;
let user_tables = try_stream!({
// Split table ids into chunks
let mut table_id_chunks = table_id_stream.ready_chunks(BATCH_SIZE);

while let Some(table_ids) = table_id_chunks.next().await {
let table_ids = table_ids
.into_iter()
.collect::<Result<Vec<_>, _>>()
.map_err(BoxedError::new)
.context(ListTablesSnafu { catalog, schema })?;

let table_info_values = self
.table_metadata_manager
.table_info_manager()
.batch_get(&table_ids)
.await
.context(TableMetadataManagerSnafu)?;

for table_info_value in table_info_values.into_values() {
yield make_table(table_info_value)?;
}
}
});

Box::pin(sys_tables.chain(user_tables))
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use futures::future::BoxFuture;
use futures_util::stream::BoxStream;
use table::metadata::TableId;
use table::requests::CreateTableRequest;
use table::TableRef;
Expand Down Expand Up @@ -56,6 +57,13 @@ pub trait CatalogManager: Send + Sync {
schema: &str,
table_name: &str,
) -> Result<Option<TableRef>>;

/// Returns all tables with a stream by catalog and schema.
async fn tables<'a>(
&'a self,
catalog: &'a str,
schema: &'a str,
) -> BoxStream<'a, Result<TableRef>>;
}

pub type CatalogManagerRef = Arc<dyn CatalogManager>;
Expand Down

0 comments on commit c6c4ea5

Please sign in to comment.