From ad36fb88b5c21301f9dca82398e4bd0ea22a03c4 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Sun, 14 Sep 2025 14:21:28 +0800 Subject: [PATCH] refactor: update CatalogInfo signatures --- .../src/pg_catalog/catalog_info.rs | 29 ++++++++++++------- .../src/pg_catalog/pg_attribute.rs | 6 ++-- .../src/pg_catalog/pg_class.rs | 6 ++-- .../src/pg_catalog/pg_database.rs | 2 +- .../src/pg_catalog/pg_namespace.rs | 4 +-- .../src/pg_catalog/pg_tables.rs | 6 ++-- 6 files changed, 31 insertions(+), 22 deletions(-) diff --git a/datafusion-postgres/src/pg_catalog/catalog_info.rs b/datafusion-postgres/src/pg_catalog/catalog_info.rs index cf58bd9..cf576e1 100644 --- a/datafusion-postgres/src/pg_catalog/catalog_info.rs +++ b/datafusion-postgres/src/pg_catalog/catalog_info.rs @@ -10,11 +10,15 @@ use datafusion::{ /// Define the interface for retrieve catalog data for pg_catalog tables #[async_trait] pub trait CatalogInfo: Clone + Send + Sync + Debug + 'static { - fn catalog_names(&self) -> Vec; + fn catalog_names(&self) -> Result, DataFusionError>; - fn schema_names(&self, catalog_name: &str) -> Option>; + fn schema_names(&self, catalog_name: &str) -> Result>, DataFusionError>; - fn table_names(&self, catalog_name: &str, schema_name: &str) -> Option>; + fn table_names( + &self, + catalog_name: &str, + schema_name: &str, + ) -> Result>, DataFusionError>; async fn table_schema( &self, @@ -33,18 +37,23 @@ pub trait CatalogInfo: Clone + Send + Sync + Debug + 'static { #[async_trait] impl CatalogInfo for Arc { - fn catalog_names(&self) -> Vec { - CatalogProviderList::catalog_names(self.as_ref()) + fn catalog_names(&self) -> Result, DataFusionError> { + Ok(CatalogProviderList::catalog_names(self.as_ref())) } - fn schema_names(&self, catalog_name: &str) -> Option> { - self.catalog(catalog_name).map(|c| c.schema_names()) + fn schema_names(&self, catalog_name: &str) -> Result>, DataFusionError> { + Ok(self.catalog(catalog_name).map(|c| c.schema_names())) } - fn table_names(&self, catalog_name: &str, schema_name: &str) -> Option> { - self.catalog(catalog_name) + fn table_names( + &self, + catalog_name: &str, + schema_name: &str, + ) -> Result>, DataFusionError> { + Ok(self + .catalog(catalog_name) .and_then(|c| c.schema(schema_name)) - .map(|s| s.table_names()) + .map(|s| s.table_names())) } async fn table_schema( diff --git a/datafusion-postgres/src/pg_catalog/pg_attribute.rs b/datafusion-postgres/src/pg_catalog/pg_attribute.rs index dbc951c..9727f84 100644 --- a/datafusion-postgres/src/pg_catalog/pg_attribute.rs +++ b/datafusion-postgres/src/pg_catalog/pg_attribute.rs @@ -105,11 +105,11 @@ impl PgAttributeTable { // original one in case that schemas or tables were dropped. let mut swap_cache = HashMap::new(); - for catalog_name in this.catalog_list.catalog_names() { - if let Some(schema_names) = this.catalog_list.schema_names(&catalog_name) { + for catalog_name in this.catalog_list.catalog_names()? { + if let Some(schema_names) = this.catalog_list.schema_names(&catalog_name)? { for schema_name in schema_names { if let Some(table_names) = - this.catalog_list.table_names(&catalog_name, &schema_name) + this.catalog_list.table_names(&catalog_name, &schema_name)? { // Process all tables in this schema for table_name in table_names { diff --git a/datafusion-postgres/src/pg_catalog/pg_class.rs b/datafusion-postgres/src/pg_catalog/pg_class.rs index 7284111..6e76920 100644 --- a/datafusion-postgres/src/pg_catalog/pg_class.rs +++ b/datafusion-postgres/src/pg_catalog/pg_class.rs @@ -117,7 +117,7 @@ impl PgClassTable { let mut swap_cache = HashMap::new(); // Iterate through all catalogs and schemas - for catalog_name in this.catalog_list.catalog_names() { + for catalog_name in this.catalog_list.catalog_names()? { let cache_key = OidCacheKey::Catalog(catalog_name.clone()); let catalog_oid = if let Some(oid) = oid_cache.get(&cache_key) { *oid @@ -126,7 +126,7 @@ impl PgClassTable { }; swap_cache.insert(cache_key, catalog_oid); - if let Some(schema_names) = this.catalog_list.schema_names(&catalog_name) { + if let Some(schema_names) = this.catalog_list.schema_names(&catalog_name)? { for schema_name in schema_names { let cache_key = OidCacheKey::Schema(catalog_name.clone(), schema_name.clone()); let schema_oid = if let Some(oid) = oid_cache.get(&cache_key) { @@ -141,7 +141,7 @@ impl PgClassTable { // Now process all tables in this schema if let Some(table_names) = - this.catalog_list.table_names(&catalog_name, &schema_name) + this.catalog_list.table_names(&catalog_name, &schema_name)? { for table_name in table_names { let cache_key = OidCacheKey::Table( diff --git a/datafusion-postgres/src/pg_catalog/pg_database.rs b/datafusion-postgres/src/pg_catalog/pg_database.rs index ccc27e6..6b6071c 100644 --- a/datafusion-postgres/src/pg_catalog/pg_database.rs +++ b/datafusion-postgres/src/pg_catalog/pg_database.rs @@ -80,7 +80,7 @@ impl PgDatabaseTable { let mut oid_cache = this.oid_cache.write().await; // Add a record for each catalog (treating catalogs as "databases") - for catalog_name in this.catalog_list.catalog_names() { + for catalog_name in this.catalog_list.catalog_names()? { let cache_key = OidCacheKey::Catalog(catalog_name.clone()); let catalog_oid = if let Some(oid) = oid_cache.get(&cache_key) { *oid diff --git a/datafusion-postgres/src/pg_catalog/pg_namespace.rs b/datafusion-postgres/src/pg_catalog/pg_namespace.rs index c423b7b..fd10523 100644 --- a/datafusion-postgres/src/pg_catalog/pg_namespace.rs +++ b/datafusion-postgres/src/pg_catalog/pg_namespace.rs @@ -62,8 +62,8 @@ impl PgNamespaceTable { let mut oid_cache = this.oid_cache.write().await; // Now add all schemas from DataFusion catalogs - for catalog_name in this.catalog_list.catalog_names() { - if let Some(schema_names) = this.catalog_list.schema_names(&catalog_name) { + for catalog_name in this.catalog_list.catalog_names()? { + if let Some(schema_names) = this.catalog_list.schema_names(&catalog_name)? { for schema_name in schema_names { let cache_key = OidCacheKey::Schema(catalog_name.clone(), schema_name.clone()); let schema_oid = if let Some(oid) = oid_cache.get(&cache_key) { diff --git a/datafusion-postgres/src/pg_catalog/pg_tables.rs b/datafusion-postgres/src/pg_catalog/pg_tables.rs index 155f68b..2d4ce7f 100644 --- a/datafusion-postgres/src/pg_catalog/pg_tables.rs +++ b/datafusion-postgres/src/pg_catalog/pg_tables.rs @@ -49,11 +49,11 @@ impl PgTablesTable { let mut row_security = Vec::new(); // Iterate through all catalogs and schemas - for catalog_name in this.catalog_list.catalog_names() { - if let Some(catalog_schema_names) = this.catalog_list.schema_names(&catalog_name) { + for catalog_name in this.catalog_list.catalog_names()? { + if let Some(catalog_schema_names) = this.catalog_list.schema_names(&catalog_name)? { for schema_name in catalog_schema_names { if let Some(catalog_table_names) = - this.catalog_list.table_names(&catalog_name, &schema_name) + this.catalog_list.table_names(&catalog_name, &schema_name)? { // Now process all tables in this schema for table_name in catalog_table_names {