diff --git a/crates/integrations/datafusion/src/system_tables/branches.rs b/crates/integrations/datafusion/src/system_tables/branches.rs index 581fb978..468a9ad7 100644 --- a/crates/integrations/datafusion/src/system_tables/branches.rs +++ b/crates/integrations/datafusion/src/system_tables/branches.rs @@ -79,31 +79,11 @@ impl TableProvider for BranchesTable { _filters: &[Expr], _limit: Option, ) -> DFResult> { - let bm = BranchManager::new( - self.table.file_io().clone(), - self.table.location().to_string(), - ); - let branch_names = bm.list_all().await.map_err(to_datafusion_error)?; - - let n = branch_names.len(); - let mut names: Vec = Vec::with_capacity(n); - let mut create_times = Vec::with_capacity(n); - - for name in branch_names { - let branch_path = bm.branch_path(&name); - let status = self - .table - .file_io() - .get_status(&branch_path) + let table = self.table.clone(); + let (names, create_times) = + crate::runtime::await_with_runtime(async move { collect_branches(&table).await }) .await .map_err(to_datafusion_error)?; - let ts_millis = status - .last_modified - .map(|dt| dt.timestamp_millis()) - .unwrap_or(0); - names.push(name); - create_times.push(ts_millis); - } let schema = branches_schema(); let batch = RecordBatch::try_new( @@ -121,3 +101,20 @@ impl TableProvider for BranchesTable { )?) } } + +async fn collect_branches(table: &Table) -> paimon::Result<(Vec, Vec)> { + let file_io = table.file_io(); + let bm = BranchManager::new(file_io.clone(), table.location().to_string()); + let names = bm.list_all().await?; + let mut create_times = Vec::with_capacity(names.len()); + for name in &names { + let status = file_io.get_status(&bm.branch_path(name)).await?; + create_times.push( + status + .last_modified + .map(|dt| dt.timestamp_millis()) + .unwrap_or(0), + ); + } + Ok((names, create_times)) +} diff --git a/crates/integrations/datafusion/src/system_tables/manifests.rs b/crates/integrations/datafusion/src/system_tables/manifests.rs index bc808051..12b2864d 100644 --- a/crates/integrations/datafusion/src/system_tables/manifests.rs +++ b/crates/integrations/datafusion/src/system_tables/manifests.rs @@ -83,9 +83,11 @@ impl TableProvider for ManifestsTable { _filters: &[Expr], _limit: Option, ) -> DFResult> { - let metas = collect_manifests(&self.table) - .await - .map_err(to_datafusion_error)?; + let table = self.table.clone(); + let metas = + crate::runtime::await_with_runtime(async move { collect_manifests(&table).await }) + .await + .map_err(to_datafusion_error)?; let n = metas.len(); let mut file_names: Vec = Vec::with_capacity(n); diff --git a/crates/integrations/datafusion/src/system_tables/schemas.rs b/crates/integrations/datafusion/src/system_tables/schemas.rs index 03da7933..2ff9d40b 100644 --- a/crates/integrations/datafusion/src/system_tables/schemas.rs +++ b/crates/integrations/datafusion/src/system_tables/schemas.rs @@ -85,10 +85,11 @@ impl TableProvider for SchemasTable { _filters: &[Expr], _limit: Option, ) -> DFResult> { - let schemas = self - .table - .schema_manager() - .list_all() + let table = self.table.clone(); + let schemas = + crate::runtime::await_with_runtime( + async move { table.schema_manager().list_all().await }, + ) .await .map_err(to_datafusion_error)?; diff --git a/crates/integrations/datafusion/src/system_tables/snapshots.rs b/crates/integrations/datafusion/src/system_tables/snapshots.rs index 6d0ae0b5..9751ef35 100644 --- a/crates/integrations/datafusion/src/system_tables/snapshots.rs +++ b/crates/integrations/datafusion/src/system_tables/snapshots.rs @@ -95,7 +95,9 @@ impl TableProvider for SnapshotsTable { self.table.file_io().clone(), self.table.location().to_string(), ); - let snapshots = sm.list_all().await.map_err(to_datafusion_error)?; + let snapshots = crate::runtime::await_with_runtime(async move { sm.list_all().await }) + .await + .map_err(to_datafusion_error)?; let n = snapshots.len(); let mut snapshot_ids = Vec::with_capacity(n); diff --git a/crates/integrations/datafusion/src/system_tables/tags.rs b/crates/integrations/datafusion/src/system_tables/tags.rs index 1559b675..ab0316f8 100644 --- a/crates/integrations/datafusion/src/system_tables/tags.rs +++ b/crates/integrations/datafusion/src/system_tables/tags.rs @@ -94,7 +94,9 @@ impl TableProvider for TagsTable { self.table.file_io().clone(), self.table.location().to_string(), ); - let tags = tm.list_all().await.map_err(to_datafusion_error)?; + let tags = crate::runtime::await_with_runtime(async move { tm.list_all().await }) + .await + .map_err(to_datafusion_error)?; let n = tags.len(); let mut tag_names: Vec = Vec::with_capacity(n);