Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: tables stream with CatalogManager #3180

Merged
merged 7 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 23 additions & 29 deletions src/catalog/src/information_schema/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, VectorRef};
use futures_util::StreamExt;
fengjiachun marked this conversation as resolved.
Show resolved Hide resolved
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};

Expand Down Expand Up @@ -183,37 +184,30 @@ impl InformationSchemaColumnsBuilder {
continue;
}

for table_name in catalog_manager
.table_names(&catalog_name, &schema_name)
.await?
{
if let Some(table) = catalog_manager
.table(&catalog_name, &schema_name, &table_name)
.await?
{
let keys = &table.table_info().meta.primary_key_indices;
let schema = table.schema();
let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await;

while let Some(table) = stream.next().await {
let table = table?;
fengjiachun marked this conversation as resolved.
Show resolved Hide resolved
let keys = &table.table_info().meta.primary_key_indices;
let schema = table.schema();

for (idx, column) in schema.column_schemas().iter().enumerate() {
let semantic_type = if column.is_time_index() {
SEMANTIC_TYPE_TIME_INDEX
} else if keys.contains(&idx) {
SEMANTIC_TYPE_PRIMARY_KEY
} else {
SEMANTIC_TYPE_FIELD
};
for (idx, column) in schema.column_schemas().iter().enumerate() {
let semantic_type = if column.is_time_index() {
SEMANTIC_TYPE_TIME_INDEX
} else if keys.contains(&idx) {
SEMANTIC_TYPE_PRIMARY_KEY
} else {
SEMANTIC_TYPE_FIELD
};

self.add_column(
&predicates,
&catalog_name,
&schema_name,
&table_name,
semantic_type,
column,
);
}
} else {
unreachable!();
self.add_column(
&predicates,
&catalog_name,
&schema_name,
&table.table_info().name,
semantic_type,
column,
);
}
}
}
Expand Down
36 changes: 15 additions & 21 deletions src/catalog/src/information_schema/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder};
use futures_util::StreamExt;
fengjiachun marked this conversation as resolved.
Show resolved Hide resolved
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
use table::metadata::TableType;
Expand Down Expand Up @@ -166,27 +167,20 @@ impl InformationSchemaTablesBuilder {
continue;
}

for table_name in catalog_manager
.table_names(&catalog_name, &schema_name)
.await?
{
if let Some(table) = catalog_manager
.table(&catalog_name, &schema_name, &table_name)
.await?
{
let table_info = table.table_info();
self.add_table(
&predicates,
&catalog_name,
&schema_name,
&table_name,
table.table_type(),
Some(table_info.ident.table_id),
Some(&table_info.meta.engine),
);
} else {
unreachable!();
}
let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await;

while let Some(table) = stream.next().await {
let table = table?;
fengjiachun marked this conversation as resolved.
Show resolved Hide resolved
let table_info = table.table_info();
self.add_table(
&predicates,
&catalog_name,
&schema_name,
&table_info.name,
table.table_type(),
Some(table_info.ident.table_id),
Some(&table_info.meta.engine),
);
}
}

Expand Down
83 changes: 69 additions & 14 deletions src/catalog/src/kvbackend/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@ use std::any::Any;
use std::collections::BTreeSet;
use std::sync::{Arc, Weak};

use async_stream::try_stream;
use common_catalog::consts::{DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID};
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::{CacheInvalidator, CacheInvalidatorRef, Context};
use common_meta::error::Result as MetaResult;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::table_name::TableName;
use futures_util::TryStreamExt;
use futures_util::stream::BoxStream;
use futures_util::{StreamExt, TryStreamExt};
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use snafu::prelude::*;
use table::dist_table::DistTable;
Expand Down Expand Up @@ -58,17 +61,25 @@ pub struct KvBackendCatalogManager {
system_catalog: SystemCatalog,
}

fn make_table(table_info_value: TableInfoValue) -> CatalogResult<TableRef> {
let table_info = table_info_value
.table_info
.try_into()
.context(catalog_err::InvalidTableInfoInCatalogSnafu)?;
Ok(DistTable::table(Arc::new(table_info)))
}

#[async_trait::async_trait]
impl CacheInvalidator for KvBackendCatalogManager {
async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> {
async fn invalidate_table_id(&self, ctx: &Context, table_id: TableId) -> MetaResult<()> {
self.cache_invalidator
.invalidate_table_name(ctx, table_name)
.invalidate_table_id(ctx, table_id)
.await
}

async fn invalidate_table_id(&self, ctx: &Context, table_id: TableId) -> MetaResult<()> {
async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> {
self.cache_invalidator
.invalidate_table_id(ctx, table_id)
.invalidate_table_name(ctx, table_name)
.await
}
}
Expand Down Expand Up @@ -101,6 +112,10 @@ impl KvBackendCatalogManager {

#[async_trait::async_trait]
impl CatalogManager for KvBackendCatalogManager {
fn as_any(&self) -> &dyn Any {
self
}

async fn catalog_names(&self) -> CatalogResult<Vec<String>> {
let stream = self
.table_metadata_manager
Expand Down Expand Up @@ -219,17 +234,57 @@ impl CatalogManager for KvBackendCatalogManager {
else {
return Ok(None);
};
let table_info = Arc::new(
table_info_value
.table_info
.try_into()
.context(catalog_err::InvalidTableInfoInCatalogSnafu)?,
);
Ok(Some(DistTable::table(table_info)))
make_table(table_info_value).map(Some)
}

fn as_any(&self) -> &dyn Any {
self
async fn tables<'a>(
&'a self,
catalog: &'a str,
schema: &'a str,
) -> BoxStream<'a, CatalogResult<TableRef>> {
let sys_tables = try_stream!({
// System tables
let sys_table_names = self.system_catalog.table_names(schema);
for table_name in sys_table_names {
if let Some(table) = self.system_catalog.table(catalog, schema, &table_name) {
yield table;
}
}
});

let table_id_stream = self
.table_metadata_manager
.table_name_manager()
.tables(catalog, schema)
.await
.map_ok(|(_, v)| v.table_id());

const BATCH_SIZE: usize = 128;
let user_tables = try_stream!({
// Split table ids into chunks
let mut table_id_chunks = table_id_stream.ready_chunks(BATCH_SIZE);

while let Some(table_ids) = table_id_chunks.next().await {
let table_ids = table_ids
.into_iter()
.collect::<Result<Vec<_>, _>>()
.map_err(BoxedError::new)
.context(ListTablesSnafu { catalog, schema })?;

let table_info_values = self
.table_metadata_manager
.table_info_manager()
.batch_get(&table_ids)
.await
.context(TableMetadataManagerSnafu)?;

for table_info_value in table_info_values.into_values() {
yield make_table(table_info_value)?;
}
}
});

Box::pin(sys_tables.chain(user_tables))
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use futures::future::BoxFuture;
use futures_util::stream::BoxStream;
use table::metadata::TableId;
use table::requests::CreateTableRequest;
use table::TableRef;
Expand Down Expand Up @@ -56,6 +57,13 @@ pub trait CatalogManager: Send + Sync {
schema: &str,
table_name: &str,
) -> Result<Option<TableRef>>;

/// Returns all tables with a stream by catalog and schema.
async fn tables<'a>(
&'a self,
catalog: &'a str,
schema: &'a str,
) -> BoxStream<'a, Result<TableRef>>;
}

pub type CatalogManagerRef = Arc<dyn CatalogManager>;
Expand Down
81 changes: 43 additions & 38 deletions src/catalog/src/memory/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use common_catalog::build_db_string;
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME,
};
use futures_util::stream::BoxStream;
use snafu::OptionExt;
use table::TableRef;

Expand All @@ -39,42 +40,8 @@ pub struct MemoryCatalogManager {

#[async_trait::async_trait]
impl CatalogManager for MemoryCatalogManager {
async fn schema_exists(&self, catalog: &str, schema: &str) -> Result<bool> {
self.schema_exist_sync(catalog, schema)
}

async fn table(
&self,
catalog: &str,
schema: &str,
table_name: &str,
) -> Result<Option<TableRef>> {
let result = try {
self.catalogs
.read()
.unwrap()
.get(catalog)?
.get(schema)?
.get(table_name)
.cloned()?
};
Ok(result)
}

async fn catalog_exists(&self, catalog: &str) -> Result<bool> {
self.catalog_exist_sync(catalog)
}

async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> Result<bool> {
let catalogs = self.catalogs.read().unwrap();
Ok(catalogs
.get(catalog)
.with_context(|| CatalogNotFoundSnafu {
catalog_name: catalog,
})?
.get(schema)
.with_context(|| SchemaNotFoundSnafu { catalog, schema })?
.contains_key(table))
fn as_any(&self) -> &dyn Any {
self
}

async fn catalog_names(&self) -> Result<Vec<String>> {
Expand Down Expand Up @@ -110,8 +77,46 @@ impl CatalogManager for MemoryCatalogManager {
.collect())
}

fn as_any(&self) -> &dyn Any {
self
async fn catalog_exists(&self, catalog: &str) -> Result<bool> {
self.catalog_exist_sync(catalog)
}

async fn schema_exists(&self, catalog: &str, schema: &str) -> Result<bool> {
self.schema_exist_sync(catalog, schema)
}

async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> Result<bool> {
let catalogs = self.catalogs.read().unwrap();
Ok(catalogs
.get(catalog)
.with_context(|| CatalogNotFoundSnafu {
catalog_name: catalog,
})?
.get(schema)
.with_context(|| SchemaNotFoundSnafu { catalog, schema })?
.contains_key(table))
}

async fn table(
&self,
catalog: &str,
schema: &str,
table_name: &str,
) -> Result<Option<TableRef>> {
let result = try {
self.catalogs
.read()
.unwrap()
.get(catalog)?
.get(schema)?
.get(table_name)
.cloned()?
};
Ok(result)
}

async fn tables<'a>(&'a self, _: &'a str, _: &'a str) -> BoxStream<'a, Result<TableRef>> {
unimplemented!("`MemoryCatalogManager` does not support tables()")
}
}

Expand Down