Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions crates/catalog-metastore/src/metastore_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,16 @@ struct VolumeEntry {
volume: Volume,
#[serde(default)]
database: Option<String>,
#[serde(default)]
should_refresh: bool,
}

#[derive(Debug, Deserialize, Clone)]
struct DatabaseEntry {
ident: String,
volume: VolumeIdent,
#[serde(default)]
should_refresh: bool,
}

#[derive(Debug, Deserialize, Clone)]
Expand Down Expand Up @@ -114,7 +118,7 @@ impl MetastoreBootstrapConfig {
}

for db in &self.databases {
self.ensure_database(metastore.clone(), &db.ident, &db.volume)
self.ensure_database(metastore.clone(), &db.ident, &db.volume, db.should_refresh)
.await?;
}

Expand Down Expand Up @@ -157,8 +161,13 @@ impl MetastoreBootstrapConfig {
}

if let Some(database) = &entry.database {
self.ensure_database(metastore, database, &entry.volume.ident)
.await?;
self.ensure_database(
metastore,
database,
&entry.volume.ident,
entry.should_refresh,
)
.await?;
}

Ok(())
Expand All @@ -169,6 +178,7 @@ impl MetastoreBootstrapConfig {
metastore: Arc<dyn Metastore>,
ident: &str,
volume: &str,
should_refresh: bool,
) -> Result<(), ConfigError> {
if metastore
.get_database(&ident.to_string())
Expand All @@ -184,6 +194,7 @@ impl MetastoreBootstrapConfig {
ident: ident.to_string(),
volume: volume.to_string(),
properties: None,
should_refresh,
},
)
.await
Expand Down
2 changes: 2 additions & 0 deletions crates/catalog-metastore/src/models/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub struct Database {
pub properties: Option<HashMap<String, String>>,
/// Volume identifier
pub volume: VolumeIdent,
pub should_refresh: bool,
}

impl Database {
Expand All @@ -35,6 +36,7 @@ mod tests {
ident: "db".to_string(),
properties: None,
volume: "vol".to_string(),
should_refresh: false,
};
assert_eq!(db.prefix("parent"), "parent/db");
}
Expand Down
56 changes: 27 additions & 29 deletions crates/catalog/src/catalog_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::error::{
};
use crate::schema::CachingSchema;
use crate::table::CachingTable;
use crate::utils::fetch_table_providers;
use aws_config::{BehaviorVersion, Region};
use aws_credential_types::Credentials;
use aws_credential_types::provider::SharedCredentialsProvider;
Expand All @@ -20,7 +21,6 @@ use datafusion::{
execution::object_store::ObjectStoreRegistry,
};
use datafusion_iceberg::catalog::catalog::IcebergCatalog as DataFusionIcebergCatalog;
use futures::future::join_all;
use iceberg_rust::catalog::Catalog;
use iceberg_rust::object_store::ObjectStoreBuilder;
use iceberg_s3tables_catalog::S3TablesCatalog;
Expand All @@ -40,16 +40,23 @@ pub struct EmbucketCatalogList {
pub metastore: Arc<dyn Metastore>,
pub table_object_store: Arc<DashMap<String, Arc<dyn ObjectStore>>>,
pub catalogs: DashMap<String, Arc<CachingCatalog>>,
pub config: CatalogListConfig,
}

#[derive(Default, Clone)]
pub struct CatalogListConfig {
pub max_concurrent_table_fetches: usize,
}

impl EmbucketCatalogList {
pub fn new(metastore: Arc<dyn Metastore>) -> Self {
pub fn new(metastore: Arc<dyn Metastore>, config: CatalogListConfig) -> Self {
let table_object_store: DashMap<String, Arc<dyn ObjectStore>> = DashMap::new();
table_object_store.insert("file://".to_string(), Arc::new(LocalFileSystem::new()));
Self {
metastore,
table_object_store: Arc::new(table_object_store),
catalogs: DashMap::default(),
config,
}
}

Expand Down Expand Up @@ -101,6 +108,7 @@ impl EmbucketCatalogList {
ident: catalog_name.to_owned(),
volume: volume_ident.to_owned(),
properties: None,
should_refresh: false,
};
let database = self
.metastore
Expand All @@ -113,7 +121,7 @@ impl EmbucketCatalogList {
VolumeType::Memory => self
.get_embucket_catalog(&database)?
.with_catalog_type(CatalogType::Memory),
VolumeType::S3Tables(vol) => self.s3tables_catalog(vol.clone(), catalog_name).await?,
VolumeType::S3Tables(vol) => self.s3tables_catalog(vol.clone(), &database).await?,
};
self.catalogs
.insert(catalog_name.to_owned(), Arc::new(catalog));
Expand Down Expand Up @@ -172,7 +180,7 @@ impl EmbucketCatalogList {
})?;
// Create catalog depending on the volume type
let catalog = match &volume.volume {
VolumeType::S3Tables(vol) => self.s3tables_catalog(vol.clone(), &db.ident).await?,
VolumeType::S3Tables(vol) => self.s3tables_catalog(vol.clone(), &db).await?,
_ => self.get_embucket_catalog(&db)?,
};
catalogs.push(catalog);
Expand All @@ -192,7 +200,7 @@ impl EmbucketCatalogList {
));
Ok(
CachingCatalog::new(catalog_provider, db.ident.clone(), Some(iceberg_catalog))
.with_refresh(true)
.with_refresh(db.should_refresh)
.with_properties(Properties {
created_at: db.created_at,
updated_at: db.created_at,
Expand All @@ -210,7 +218,7 @@ impl EmbucketCatalogList {
pub async fn s3tables_catalog(
&self,
volume: S3TablesVolume,
name: &str,
db: &RwObject<Database>,
) -> Result<CachingCatalog> {
let (ak, sk, token) = match volume.credentials {
AwsCredentials::AccessKey(ref creds) => (
Expand Down Expand Up @@ -238,11 +246,13 @@ impl EmbucketCatalogList {
let catalog = DataFusionIcebergCatalog::new(iceberg_catalog.clone(), None)
.await
.context(catalog_error::DataFusionSnafu)?;
Ok(
CachingCatalog::new(Arc::new(catalog), name.to_string(), Some(iceberg_catalog))
.with_refresh(false)
.with_catalog_type(CatalogType::S3tables),
Ok(CachingCatalog::new(
Arc::new(catalog),
db.ident.to_string(),
Some(iceberg_catalog),
)
.with_refresh(db.should_refresh)
.with_catalog_type(CatalogType::S3tables))
}

#[allow(clippy::as_conversions, clippy::too_many_lines)]
Expand Down Expand Up @@ -278,26 +288,14 @@ impl EmbucketCatalogList {
name: schema.clone(),
iceberg_catalog: catalog.iceberg_catalog.clone(),
};
let tables = schema.schema.table_names();

let futs = tables
.iter()
.map(|table_name| async {
let tp = schema
.schema
.table(table_name)
.await
.context(catalog_error::DataFusionSnafu)
.ok()?
.map(Arc::new)?;

Some((table_name.clone(), tp))
})
.collect::<Vec<_>>();
let table_providers = fetch_table_providers(
Arc::clone(&schema.schema),
self.config.max_concurrent_table_fetches,
)
.await
.context(catalog_error::DataFusionSnafu)?;

let results = join_all(futs).await;
for res in results.into_iter().flatten() {
let (table_name, table_provider) = res;
for (table_name, table_provider) in table_providers {
schema.tables_cache.insert(
table_name.clone(),
Arc::new(CachingTable::new_with_schema(
Expand Down
70 changes: 34 additions & 36 deletions crates/catalog/src/information_schema/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::catalog::CachingCatalog;
use crate::df_error;
use crate::information_schema::databases::InformationSchemaDatabasesBuilder;
use crate::information_schema::navigation_tree::InformationSchemaNavigationTreeBuilder;
use crate::utils::fetch_table_providers;
use dashmap::DashMap;
use datafusion::arrow::datatypes::Schema;
use datafusion::catalog::CatalogProviderList;
Expand All @@ -31,6 +32,7 @@ pub struct InformationSchemaConfig {
pub(crate) catalog_list: Arc<dyn CatalogProviderList>,
pub(crate) catalog_name: Arc<str>,
pub(crate) views_schemas: DashMap<String, Arc<Schema>>,
pub(crate) max_concurrency: usize,
pub(crate) target_reference: Option<TableReference>,
}

Expand Down Expand Up @@ -60,15 +62,14 @@ impl InformationSchemaConfig {
let Some(schema) = catalog.schema(&schema_name) else {
continue;
};
for table_name in schema.table_names() {
if let Some(table) = schema.table(&table_name).await? {
builder.add_table(
&self.catalog_name,
&schema_name,
&table_name,
table.table_type(),
);
}
let table_providers = fetch_table_providers(schema, self.max_concurrency).await?;
for (table_name, table) in table_providers {
builder.add_table(
&self.catalog_name,
&schema_name,
&table_name,
table.table_type(),
);
}
}

Expand Down Expand Up @@ -106,15 +107,14 @@ impl InformationSchemaConfig {
let Some(schema) = catalog.schema(&schema_name) else {
continue;
};
for table_name in schema.table_names() {
if let Some(table) = schema.table(&table_name).await? {
builder.add_navigation_tree(
&catalog_name,
Some(schema_name.clone()),
Some(table_name),
Some(table.table_type()),
);
}
let table_providers = fetch_table_providers(schema, self.max_concurrency).await?;
for (table_name, table) in table_providers {
builder.add_navigation_tree(
&catalog_name,
Some(schema_name.clone()),
Some(table_name),
Some(table.table_type()),
);
}
}

Expand Down Expand Up @@ -167,10 +167,10 @@ impl InformationSchemaConfig {

for schema_name in schema_names {
if let Some(schema) = catalog.schema(&schema_name) {
for table_name in schema.table_names() {
if let Some(table) = schema.table(&table_name).await?
&& table.table_type() == TableType::View
{
let table_providers =
fetch_table_providers(schema, self.max_concurrency).await?;
for (table_name, table) in table_providers {
if table.table_type() == TableType::View {
builder.add_view(
&self.catalog_name,
&schema_name,
Expand Down Expand Up @@ -217,20 +217,18 @@ impl InformationSchemaConfig {

for schema_name in schema_names {
if let Some(schema) = catalog.schema(&schema_name) {
for table_name in schema.table_names() {
if let Some(table) = schema.table(&table_name).await? {
for (field_position, field) in
table.schema().fields().iter().enumerate()
{
builder.add_column(
&self.catalog_name,
&schema_name,
&table_name,
table.table_type(),
field_position,
field,
);
}
let table_providers =
fetch_table_providers(schema, self.max_concurrency).await?;
for (table_name, table) in table_providers {
for (field_position, field) in table.schema().fields().iter().enumerate() {
builder.add_column(
&self.catalog_name,
&schema_name,
&table_name,
table.table_type(),
field_position,
field,
);
Comment thread
DanCodedThis marked this conversation as resolved.
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions crates/catalog/src/information_schema/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ impl InformationSchemaProvider {
pub fn new(
catalog_list: Arc<dyn CatalogProviderList>,
catalog_name: Arc<str>,
max_concurrency: usize,
target_reference: Option<TableReference>,
) -> Self {
let views_schemas = {
Expand All @@ -76,6 +77,7 @@ impl InformationSchemaProvider {
catalog_list,
catalog_name,
views_schemas,
max_concurrency,
target_reference,
},
}
Expand Down
1 change: 1 addition & 0 deletions crates/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub mod information_schema;
pub mod schema;
pub mod snowflake_table;
pub mod table;
pub mod utils;

#[cfg(test)]
pub mod tests;
Expand Down
10 changes: 7 additions & 3 deletions crates/catalog/src/tests/information_schema.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::catalog_list::{DEFAULT_CATALOG, EmbucketCatalogList};
use crate::catalog_list::{CatalogListConfig, DEFAULT_CATALOG, EmbucketCatalogList};
use crate::information_schema::information_schema::{
INFORMATION_SCHEMA, InformationSchemaProvider,
};
use crate::test_utils::sort_record_batch_by_sortable_columns;
use catalog_metastore::InMemoryMetastore;
use catalog_metastore::{Config, InMemoryMetastore};
use datafusion::execution::SessionStateBuilder;
use datafusion::execution::context::SessionContext;
use datafusion::prelude::SessionConfig;
Expand All @@ -12,7 +12,10 @@ use std::sync::Arc;
#[allow(clippy::unwrap_used)]
async fn create_session_context() -> Arc<SessionContext> {
let metastore = Arc::new(InMemoryMetastore::new());
let catalog_list_impl = Arc::new(EmbucketCatalogList::new(metastore));
let catalog_list_impl = Arc::new(EmbucketCatalogList::new(
metastore,
CatalogListConfig::default(),
));
let state = SessionStateBuilder::new()
.with_config(
SessionConfig::new()
Expand All @@ -30,6 +33,7 @@ async fn create_session_context() -> Arc<SessionContext> {
Arc::new(InformationSchemaProvider::new(
catalog_list_impl.clone(),
Arc::from(DEFAULT_CATALOG),
1,
None,
)),
)
Expand Down
Loading