diff --git a/crates/catalog-metastore/src/metastore_config.rs b/crates/catalog-metastore/src/metastore_config.rs index 8c787582..4433edcb 100644 --- a/crates/catalog-metastore/src/metastore_config.rs +++ b/crates/catalog-metastore/src/metastore_config.rs @@ -31,12 +31,16 @@ struct VolumeEntry { volume: Volume, #[serde(default)] database: Option, + #[serde(default)] + should_refresh: bool, } #[derive(Debug, Deserialize, Clone)] struct DatabaseEntry { ident: String, volume: VolumeIdent, + #[serde(default)] + should_refresh: bool, } #[derive(Debug, Deserialize, Clone)] @@ -114,7 +118,7 @@ impl MetastoreBootstrapConfig { } for db in &self.databases { - self.ensure_database(metastore.clone(), &db.ident, &db.volume) + self.ensure_database(metastore.clone(), &db.ident, &db.volume, db.should_refresh) .await?; } @@ -157,8 +161,13 @@ impl MetastoreBootstrapConfig { } if let Some(database) = &entry.database { - self.ensure_database(metastore, database, &entry.volume.ident) - .await?; + self.ensure_database( + metastore, + database, + &entry.volume.ident, + entry.should_refresh, + ) + .await?; } Ok(()) @@ -169,6 +178,7 @@ impl MetastoreBootstrapConfig { metastore: Arc, ident: &str, volume: &str, + should_refresh: bool, ) -> Result<(), ConfigError> { if metastore .get_database(&ident.to_string()) @@ -184,6 +194,7 @@ impl MetastoreBootstrapConfig { ident: ident.to_string(), volume: volume.to_string(), properties: None, + should_refresh, }, ) .await diff --git a/crates/catalog-metastore/src/models/database.rs b/crates/catalog-metastore/src/models/database.rs index af173c93..f18e95cf 100644 --- a/crates/catalog-metastore/src/models/database.rs +++ b/crates/catalog-metastore/src/models/database.rs @@ -16,6 +16,7 @@ pub struct Database { pub properties: Option>, /// Volume identifier pub volume: VolumeIdent, + pub should_refresh: bool, } impl Database { @@ -35,6 +36,7 @@ mod tests { ident: "db".to_string(), properties: None, volume: "vol".to_string(), + should_refresh: false, }; assert_eq!(db.prefix("parent"), "parent/db"); } diff --git a/crates/catalog/src/catalog_list.rs b/crates/catalog/src/catalog_list.rs index b92095de..914c7b78 100644 --- a/crates/catalog/src/catalog_list.rs +++ b/crates/catalog/src/catalog_list.rs @@ -8,6 +8,7 @@ use crate::error::{ }; use crate::schema::CachingSchema; use crate::table::CachingTable; +use crate::utils::fetch_table_providers; use aws_config::{BehaviorVersion, Region}; use aws_credential_types::Credentials; use aws_credential_types::provider::SharedCredentialsProvider; @@ -20,7 +21,6 @@ use datafusion::{ execution::object_store::ObjectStoreRegistry, }; use datafusion_iceberg::catalog::catalog::IcebergCatalog as DataFusionIcebergCatalog; -use futures::future::join_all; use iceberg_rust::catalog::Catalog; use iceberg_rust::object_store::ObjectStoreBuilder; use iceberg_s3tables_catalog::S3TablesCatalog; @@ -40,16 +40,23 @@ pub struct EmbucketCatalogList { pub metastore: Arc, pub table_object_store: Arc>>, pub catalogs: DashMap>, + pub config: CatalogListConfig, +} + +#[derive(Default, Clone)] +pub struct CatalogListConfig { + pub max_concurrent_table_fetches: usize, } impl EmbucketCatalogList { - pub fn new(metastore: Arc) -> Self { + pub fn new(metastore: Arc, config: CatalogListConfig) -> Self { let table_object_store: DashMap> = DashMap::new(); table_object_store.insert("file://".to_string(), Arc::new(LocalFileSystem::new())); Self { metastore, table_object_store: Arc::new(table_object_store), catalogs: DashMap::default(), + config, } } @@ -101,6 +108,7 @@ impl EmbucketCatalogList { ident: catalog_name.to_owned(), volume: volume_ident.to_owned(), properties: None, + should_refresh: false, }; let database = self .metastore @@ -113,7 +121,7 @@ impl EmbucketCatalogList { VolumeType::Memory => self .get_embucket_catalog(&database)? .with_catalog_type(CatalogType::Memory), - VolumeType::S3Tables(vol) => self.s3tables_catalog(vol.clone(), catalog_name).await?, + VolumeType::S3Tables(vol) => self.s3tables_catalog(vol.clone(), &database).await?, }; self.catalogs .insert(catalog_name.to_owned(), Arc::new(catalog)); @@ -172,7 +180,7 @@ impl EmbucketCatalogList { })?; // Create catalog depending on the volume type let catalog = match &volume.volume { - VolumeType::S3Tables(vol) => self.s3tables_catalog(vol.clone(), &db.ident).await?, + VolumeType::S3Tables(vol) => self.s3tables_catalog(vol.clone(), &db).await?, _ => self.get_embucket_catalog(&db)?, }; catalogs.push(catalog); @@ -192,7 +200,7 @@ impl EmbucketCatalogList { )); Ok( CachingCatalog::new(catalog_provider, db.ident.clone(), Some(iceberg_catalog)) - .with_refresh(true) + .with_refresh(db.should_refresh) .with_properties(Properties { created_at: db.created_at, updated_at: db.created_at, @@ -210,7 +218,7 @@ impl EmbucketCatalogList { pub async fn s3tables_catalog( &self, volume: S3TablesVolume, - name: &str, + db: &RwObject, ) -> Result { let (ak, sk, token) = match volume.credentials { AwsCredentials::AccessKey(ref creds) => ( @@ -238,11 +246,13 @@ impl EmbucketCatalogList { let catalog = DataFusionIcebergCatalog::new(iceberg_catalog.clone(), None) .await .context(catalog_error::DataFusionSnafu)?; - Ok( - CachingCatalog::new(Arc::new(catalog), name.to_string(), Some(iceberg_catalog)) - .with_refresh(false) - .with_catalog_type(CatalogType::S3tables), + Ok(CachingCatalog::new( + Arc::new(catalog), + db.ident.to_string(), + Some(iceberg_catalog), ) + .with_refresh(db.should_refresh) + .with_catalog_type(CatalogType::S3tables)) } #[allow(clippy::as_conversions, clippy::too_many_lines)] @@ -278,26 +288,14 @@ impl EmbucketCatalogList { name: schema.clone(), iceberg_catalog: catalog.iceberg_catalog.clone(), }; - let tables = schema.schema.table_names(); - - let futs = tables - .iter() - .map(|table_name| async { - let tp = schema - .schema - .table(table_name) - .await - .context(catalog_error::DataFusionSnafu) - .ok()? - .map(Arc::new)?; - - Some((table_name.clone(), tp)) - }) - .collect::>(); + let table_providers = fetch_table_providers( + Arc::clone(&schema.schema), + self.config.max_concurrent_table_fetches, + ) + .await + .context(catalog_error::DataFusionSnafu)?; - let results = join_all(futs).await; - for res in results.into_iter().flatten() { - let (table_name, table_provider) = res; + for (table_name, table_provider) in table_providers { schema.tables_cache.insert( table_name.clone(), Arc::new(CachingTable::new_with_schema( diff --git a/crates/catalog/src/information_schema/config.rs b/crates/catalog/src/information_schema/config.rs index 9440ebdd..dc2034c3 100644 --- a/crates/catalog/src/information_schema/config.rs +++ b/crates/catalog/src/information_schema/config.rs @@ -13,6 +13,7 @@ use crate::catalog::CachingCatalog; use crate::df_error; use crate::information_schema::databases::InformationSchemaDatabasesBuilder; use crate::information_schema::navigation_tree::InformationSchemaNavigationTreeBuilder; +use crate::utils::fetch_table_providers; use dashmap::DashMap; use datafusion::arrow::datatypes::Schema; use datafusion::catalog::CatalogProviderList; @@ -31,6 +32,7 @@ pub struct InformationSchemaConfig { pub(crate) catalog_list: Arc, pub(crate) catalog_name: Arc, pub(crate) views_schemas: DashMap>, + pub(crate) max_concurrency: usize, pub(crate) target_reference: Option, } @@ -60,15 +62,14 @@ impl InformationSchemaConfig { let Some(schema) = catalog.schema(&schema_name) else { continue; }; - for table_name in schema.table_names() { - if let Some(table) = schema.table(&table_name).await? { - builder.add_table( - &self.catalog_name, - &schema_name, - &table_name, - table.table_type(), - ); - } + let table_providers = fetch_table_providers(schema, self.max_concurrency).await?; + for (table_name, table) in table_providers { + builder.add_table( + &self.catalog_name, + &schema_name, + &table_name, + table.table_type(), + ); } } @@ -106,15 +107,14 @@ impl InformationSchemaConfig { let Some(schema) = catalog.schema(&schema_name) else { continue; }; - for table_name in schema.table_names() { - if let Some(table) = schema.table(&table_name).await? { - builder.add_navigation_tree( - &catalog_name, - Some(schema_name.clone()), - Some(table_name), - Some(table.table_type()), - ); - } + let table_providers = fetch_table_providers(schema, self.max_concurrency).await?; + for (table_name, table) in table_providers { + builder.add_navigation_tree( + &catalog_name, + Some(schema_name.clone()), + Some(table_name), + Some(table.table_type()), + ); } } @@ -167,10 +167,10 @@ impl InformationSchemaConfig { for schema_name in schema_names { if let Some(schema) = catalog.schema(&schema_name) { - for table_name in schema.table_names() { - if let Some(table) = schema.table(&table_name).await? - && table.table_type() == TableType::View - { + let table_providers = + fetch_table_providers(schema, self.max_concurrency).await?; + for (table_name, table) in table_providers { + if table.table_type() == TableType::View { builder.add_view( &self.catalog_name, &schema_name, @@ -217,20 +217,18 @@ impl InformationSchemaConfig { for schema_name in schema_names { if let Some(schema) = catalog.schema(&schema_name) { - for table_name in schema.table_names() { - if let Some(table) = schema.table(&table_name).await? { - for (field_position, field) in - table.schema().fields().iter().enumerate() - { - builder.add_column( - &self.catalog_name, - &schema_name, - &table_name, - table.table_type(), - field_position, - field, - ); - } + let table_providers = + fetch_table_providers(schema, self.max_concurrency).await?; + for (table_name, table) in table_providers { + for (field_position, field) in table.schema().fields().iter().enumerate() { + builder.add_column( + &self.catalog_name, + &schema_name, + &table_name, + table.table_type(), + field_position, + field, + ); } } } diff --git a/crates/catalog/src/information_schema/information_schema.rs b/crates/catalog/src/information_schema/information_schema.rs index f4ce1bd5..41d61b0b 100644 --- a/crates/catalog/src/information_schema/information_schema.rs +++ b/crates/catalog/src/information_schema/information_schema.rs @@ -50,6 +50,7 @@ impl InformationSchemaProvider { pub fn new( catalog_list: Arc, catalog_name: Arc, + max_concurrency: usize, target_reference: Option, ) -> Self { let views_schemas = { @@ -76,6 +77,7 @@ impl InformationSchemaProvider { catalog_list, catalog_name, views_schemas, + max_concurrency, target_reference, }, } diff --git a/crates/catalog/src/lib.rs b/crates/catalog/src/lib.rs index 53e99765..6c6e27a6 100644 --- a/crates/catalog/src/lib.rs +++ b/crates/catalog/src/lib.rs @@ -15,6 +15,7 @@ pub mod information_schema; pub mod schema; pub mod snowflake_table; pub mod table; +pub mod utils; #[cfg(test)] pub mod tests; diff --git a/crates/catalog/src/tests/information_schema.rs b/crates/catalog/src/tests/information_schema.rs index 6ad6cf2a..15537ca5 100644 --- a/crates/catalog/src/tests/information_schema.rs +++ b/crates/catalog/src/tests/information_schema.rs @@ -1,9 +1,9 @@ -use crate::catalog_list::{DEFAULT_CATALOG, EmbucketCatalogList}; +use crate::catalog_list::{CatalogListConfig, DEFAULT_CATALOG, EmbucketCatalogList}; use crate::information_schema::information_schema::{ INFORMATION_SCHEMA, InformationSchemaProvider, }; use crate::test_utils::sort_record_batch_by_sortable_columns; -use catalog_metastore::InMemoryMetastore; +use catalog_metastore::{Config, InMemoryMetastore}; use datafusion::execution::SessionStateBuilder; use datafusion::execution::context::SessionContext; use datafusion::prelude::SessionConfig; @@ -12,7 +12,10 @@ use std::sync::Arc; #[allow(clippy::unwrap_used)] async fn create_session_context() -> Arc { let metastore = Arc::new(InMemoryMetastore::new()); - let catalog_list_impl = Arc::new(EmbucketCatalogList::new(metastore)); + let catalog_list_impl = Arc::new(EmbucketCatalogList::new( + metastore, + CatalogListConfig::default(), + )); let state = SessionStateBuilder::new() .with_config( SessionConfig::new() @@ -30,6 +33,7 @@ async fn create_session_context() -> Arc { Arc::new(InformationSchemaProvider::new( catalog_list_impl.clone(), Arc::from(DEFAULT_CATALOG), + 1, None, )), ) diff --git a/crates/catalog/src/utils.rs b/crates/catalog/src/utils.rs new file mode 100644 index 00000000..675e7123 --- /dev/null +++ b/crates/catalog/src/utils.rs @@ -0,0 +1,49 @@ +use datafusion::catalog::{SchemaProvider, TableProvider}; +use datafusion_common::Result as DataFusionResult; +use futures::stream::{self, StreamExt}; +use std::sync::Arc; +use tracing::warn; + +/// Fetch the table providers of a schema with bounded concurrency. +/// +/// Tables that fail to resolve (e.g. removed mid-refresh) are skipped, while +/// errors from the underlying schema provider are propagated. +#[allow(clippy::type_complexity)] +pub async fn fetch_table_providers( + schema_provider: Arc, + max_concurrent_fetches: usize, +) -> DataFusionResult)>> { + let concurrency = max_concurrent_fetches.max(1); + let table_names = schema_provider.table_names(); + + let results: Vec)>>> = + stream::iter(table_names.into_iter()) + .map(|table_name| { + let schema_provider = Arc::clone(&schema_provider); + async move { + match schema_provider.table(&table_name).await { + Ok(table) => Ok(table.map(|table_provider| (table_name, table_provider))), + Err(err) => { + warn!( + table = %table_name, + error = %err, + "Failed to fetch table provider; skipping table" + ); + Ok(None) + } + } + } + }) + .buffer_unordered(concurrency) + .collect() + .await; + + let mut tables = Vec::new(); + for result in results { + if let Some(table) = result? { + tables.push(table); + } + } + + Ok(tables) +} diff --git a/crates/embucket-lambda/src/config.rs b/crates/embucket-lambda/src/config.rs index 78dda9ed..e46248f0 100644 --- a/crates/embucket-lambda/src/config.rs +++ b/crates/embucket-lambda/src/config.rs @@ -19,6 +19,7 @@ pub struct EnvConfig { pub metastore_config: Option, pub jwt_secret: Option, pub read_only: bool, + pub max_concurrent_table_fetches: usize, } impl EnvConfig { @@ -42,6 +43,7 @@ impl EnvConfig { metastore_config: env::var("METASTORE_CONFIG").ok().map(PathBuf::from), jwt_secret: env::var("JWT_SECRET").ok(), read_only: parse_env("READ_ONLY").unwrap_or(true), + max_concurrent_table_fetches: parse_env("MAX_CONCURRENT_TABLE_FETCHES").unwrap_or(5), } } @@ -59,6 +61,7 @@ impl EnvConfig { disk_pool_size_mb: self.disk_pool_size_mb, query_history_rows_limit: self.query_history_rows_limit, read_only: self.read_only, + max_concurrent_table_fetches: self.max_concurrent_table_fetches, } } } diff --git a/crates/embucketd/src/cli.rs b/crates/embucketd/src/cli.rs index 2f674117..b7e64d9d 100644 --- a/crates/embucketd/src/cli.rs +++ b/crates/embucketd/src/cli.rs @@ -104,7 +104,6 @@ pub struct CliOpts { #[arg( long, env = "ALLOC_TRACING", - default_value = "true", help = "Enable memory tracing functionality" )] pub alloc_tracing: Option, @@ -177,6 +176,14 @@ pub struct CliOpts { help = "If the service should only accept read only commands (selects)" )] pub read_only: bool, + + #[arg( + long, + env = "MAX_CONCURRENT_TABLE_FETCHES", + default_value = "2", + help = "The maximum number of concurrent requests to get tables details" + )] + pub max_concurrent_table_fetches: usize, } impl CliOpts { diff --git a/crates/embucketd/src/main.rs b/crates/embucketd/src/main.rs index f8664444..89b59629 100644 --- a/crates/embucketd/src/main.rs +++ b/crates/embucketd/src/main.rs @@ -146,6 +146,7 @@ async fn async_main( disk_pool_size_mb: opts.disk_pool_size_mb, query_history_rows_limit: opts.query_history_rows_limit, read_only: opts.read_only, + max_concurrent_table_fetches: opts.max_concurrent_table_fetches, }; let host = opts.host.clone().unwrap(); let port = opts.port.unwrap(); diff --git a/crates/executor/src/query.rs b/crates/executor/src/query.rs index 7c009d64..f9d4f6c2 100644 --- a/crates/executor/src/query.rs +++ b/crates/executor/src/query.rs @@ -2367,6 +2367,7 @@ impl UserQuery { return Ok(Arc::new(InformationSchemaProvider::new( Arc::clone(self.session.ctx.state().catalog_list()), resolved_ref.catalog, + self.session.config.max_concurrent_table_fetches, target_reference, ))); } diff --git a/crates/executor/src/service.rs b/crates/executor/src/service.rs index f7a6ebda..a95d6514 100644 --- a/crates/executor/src/service.rs +++ b/crates/executor/src/service.rs @@ -166,7 +166,7 @@ impl CoreExecutionService { Self::initialize_datafusion_tracer(); - let catalog_list = Self::catalog_list(metastore.clone()).await?; + let catalog_list = Self::catalog_list(metastore.clone(), &config).await?; let runtime_env = Self::runtime_env(&config, catalog_list.clone())?; Ok(Self { metastore, @@ -207,6 +207,7 @@ impl CoreExecutionService { ident: ident.clone(), properties: None, volume: ident.clone(), + should_refresh: false, }, ) .await @@ -237,8 +238,11 @@ impl CoreExecutionService { skip(metastore), err )] - pub async fn catalog_list(metastore: Arc) -> Result> { - let catalog_list = Arc::new(EmbucketCatalogList::new(metastore.clone())); + pub async fn catalog_list( + metastore: Arc, + config: &Config, + ) -> Result> { + let catalog_list = Arc::new(EmbucketCatalogList::new(metastore.clone(), config.into())); catalog_list .register_catalogs() .await diff --git a/crates/executor/src/tests/query.rs b/crates/executor/src/tests/query.rs index f363ade4..13abbe92 100644 --- a/crates/executor/src/tests/query.rs +++ b/crates/executor/src/tests/query.rs @@ -96,6 +96,7 @@ pub async fn create_df_session() -> Arc { ident: "embucket".to_string(), properties: None, volume: "test_volume".to_string(), + should_refresh: false, }, ) .await @@ -116,7 +117,7 @@ pub async fn create_df_session() -> Arc { .await .expect("Failed to create schema"); let config = Arc::new(Config::default()); - let catalog_list = CoreExecutionService::catalog_list(metastore.clone()) + let catalog_list = CoreExecutionService::catalog_list(metastore.clone(), &config) .await .expect("Failed to create catalog list"); let runtime_env = CoreExecutionService::runtime_env(&config, catalog_list.clone()) diff --git a/crates/executor/src/tests/service.rs b/crates/executor/src/tests/service.rs index 41d7004c..829039ae 100644 --- a/crates/executor/src/tests/service.rs +++ b/crates/executor/src/tests/service.rs @@ -58,6 +58,7 @@ async fn test_service_upload_file() { ident: "embucket".to_string(), properties: None, volume: "test_volume".to_string(), + should_refresh: false, }, ) .await @@ -188,6 +189,7 @@ async fn test_service_create_table_file_volume() { ident: "embucket".to_string(), properties: None, volume: "test_volume".to_string(), + should_refresh: false, }, ) .await diff --git a/crates/executor/src/utils.rs b/crates/executor/src/utils.rs index b6f9f270..edfeea7b 100644 --- a/crates/executor/src/utils.rs +++ b/crates/executor/src/utils.rs @@ -1,5 +1,6 @@ use super::models::QueryResult; use crate::error::{ArrowSnafu, CantCastToSnafu, Result}; +use catalog::catalog_list::CatalogListConfig; use catalog_metastore::SchemaIdent as MetastoreSchemaIdent; use catalog_metastore::TableIdent as MetastoreTableIdent; use chrono::{DateTime, FixedOffset, Offset, TimeZone}; @@ -44,6 +45,15 @@ pub struct Config { pub disk_pool_size_mb: Option, pub query_history_rows_limit: usize, pub read_only: bool, + pub max_concurrent_table_fetches: usize, +} + +impl From<&Config> for CatalogListConfig { + fn from(value: &Config) -> Self { + Self { + max_concurrent_table_fetches: value.max_concurrent_table_fetches, + } + } } impl Default for Config { @@ -60,6 +70,7 @@ impl Default for Config { disk_pool_size_mb: None, query_history_rows_limit: DEFAULT_QUERY_HISTORY_ROWS_LIMIT, read_only: false, + max_concurrent_table_fetches: 5, } } }