diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs index edfe1ed7..6dd52728 100644 --- a/crates/integrations/datafusion/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -33,7 +33,7 @@ //! let df = ctx.sql("SELECT * FROM my_table").await?; //! ``` //! -//! This version does not support write, column projection, or predicate pushdown. +//! This version does not support write or predicate pushdown. mod error; mod physical_plan; diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index fcb1497e..dd27612c 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -27,15 +27,24 @@ use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties}; use futures::{StreamExt, TryStreamExt}; use paimon::table::Table; +use paimon::DataSplit; use crate::error::to_datafusion_error; /// Execution plan that scans a Paimon table with optional column projection. +/// +/// Planning is performed eagerly in [`super::super::table::PaimonTableProvider::scan`], +/// and the resulting splits are distributed across DataFusion execution partitions +/// so that DataFusion can schedule them in parallel. #[derive(Debug)] pub struct PaimonTableScan { table: Table, /// Projected column names (if None, reads all columns). projected_columns: Option>, + /// Pre-planned partition assignments: `planned_partitions[i]` contains the + /// Paimon splits that DataFusion partition `i` will read. + /// Wrapped in `Arc` to avoid deep-cloning `DataSplit` metadata in `execute()`. + planned_partitions: Vec>, plan_properties: PlanProperties, } @@ -44,20 +53,18 @@ impl PaimonTableScan { schema: ArrowSchemaRef, table: Table, projected_columns: Option>, + planned_partitions: Vec>, ) -> Self { let plan_properties = PlanProperties::new( EquivalenceProperties::new(schema.clone()), - // TODO: Currently all Paimon splits are read in a single DataFusion partition, - // which means we lose DataFusion parallelism. A follow-up should expose one - // execution partition per Paimon split so that DataFusion can schedule them - // across threads. - Partitioning::UnknownPartitioning(1), + Partitioning::UnknownPartitioning(planned_partitions.len()), EmissionType::Incremental, Boundedness::Bounded, ); Self { table, projected_columns, + planned_partitions, plan_properties, } } @@ -93,9 +100,16 @@ impl ExecutionPlan for PaimonTableScan { fn execute( &self, - _partition: usize, + partition: usize, _context: Arc, ) -> DFResult { + let splits = Arc::clone(self.planned_partitions.get(partition).ok_or_else(|| { + datafusion::error::DataFusionError::Internal(format!( + "PaimonTableScan: partition index {partition} out of range (total {})", + self.planned_partitions.len() + )) + })?); + let table = self.table.clone(); let schema = self.schema(); let projected_columns = self.projected_columns.clone(); @@ -103,16 +117,13 @@ impl ExecutionPlan for PaimonTableScan { let fut = async move { let mut read_builder = table.new_read_builder(); - // Apply projection if specified if let Some(ref columns) = projected_columns { let col_refs: Vec<&str> = columns.iter().map(|s| s.as_str()).collect(); read_builder.with_projection(&col_refs); } - let scan = read_builder.new_scan(); - let plan = scan.plan().await.map_err(to_datafusion_error)?; let read = read_builder.new_read().map_err(to_datafusion_error)?; - let stream = read.to_arrow(plan.splits()).map_err(to_datafusion_error)?; + let stream = read.to_arrow(&splits).map_err(to_datafusion_error)?; let stream = stream.map(|r| r.map_err(to_datafusion_error)); Ok::<_, datafusion::error::DataFusionError>(RecordBatchStreamAdapter::new( @@ -135,6 +146,62 @@ impl DisplayAs for PaimonTableScan { _t: datafusion::physical_plan::DisplayFormatType, f: &mut std::fmt::Formatter, ) -> std::fmt::Result { - write!(f, "PaimonTableScan") + write!( + f, + "PaimonTableScan: partitions={}", + self.planned_partitions.len() + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use datafusion::arrow::datatypes::{DataType as ArrowDataType, Field, Schema}; + use datafusion::physical_plan::ExecutionPlan; + + fn test_schema() -> ArrowSchemaRef { + Arc::new(Schema::new(vec![Field::new( + "id", + ArrowDataType::Int32, + false, + )])) + } + + #[test] + fn test_partition_count_empty_plan() { + let schema = test_schema(); + let scan = PaimonTableScan::new(schema, dummy_table(), None, vec![Arc::from(Vec::new())]); + assert_eq!(scan.properties().output_partitioning().partition_count(), 1); + } + + #[test] + fn test_partition_count_multiple_partitions() { + let schema = test_schema(); + let planned_partitions = vec![ + Arc::from(Vec::new()), + Arc::from(Vec::new()), + Arc::from(Vec::new()), + ]; + let scan = PaimonTableScan::new(schema, dummy_table(), None, planned_partitions); + assert_eq!(scan.properties().output_partitioning().partition_count(), 3); + } + + /// Constructs a minimal Table for testing (no real files needed since we + /// only test PlanProperties, not actual reads). + fn dummy_table() -> Table { + use paimon::catalog::Identifier; + use paimon::io::FileIOBuilder; + use paimon::spec::{Schema, TableSchema}; + + let file_io = FileIOBuilder::new("file").build().unwrap(); + let schema = Schema::builder().build().unwrap(); + let table_schema = TableSchema::new(0, &schema); + Table::new( + file_io, + Identifier::new("test_db", "test_table"), + "/tmp/test-table".to_string(), + table_schema, + ) } } diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 1ba06f4b..2e0a49eb 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -29,6 +29,7 @@ use datafusion::logical_expr::Expr; use datafusion::physical_plan::ExecutionPlan; use paimon::table::Table; +use crate::error::to_datafusion_error; use crate::physical_plan::PaimonTableScan; use crate::schema::paimon_schema_to_arrow; @@ -57,6 +58,15 @@ impl PaimonTableProvider { } } +/// Distribute `items` into `num_buckets` groups using round-robin assignment. +fn bucket_round_robin(items: Vec, num_buckets: usize) -> Vec> { + let mut buckets: Vec> = (0..num_buckets).map(|_| Vec::new()).collect(); + for (i, item) in items.into_iter().enumerate() { + buckets[i % num_buckets].push(item); + } + buckets +} + #[async_trait] impl TableProvider for PaimonTableProvider { fn as_any(&self) -> &dyn Any { @@ -73,7 +83,7 @@ impl TableProvider for PaimonTableProvider { async fn scan( &self, - _state: &dyn Session, + state: &dyn Session, projection: Option<&Vec>, _filters: &[Expr], _limit: Option, @@ -90,10 +100,55 @@ impl TableProvider for PaimonTableProvider { (self.schema.clone(), None) }; + // Plan splits eagerly so we know partition count upfront. + let read_builder = self.table.new_read_builder(); + let scan = read_builder.new_scan(); + let plan = scan.plan().await.map_err(to_datafusion_error)?; + + // Distribute splits across DataFusion partitions, capped by the + // session's target_partitions to avoid over-sharding with many small splits. + // Each partition's splits are wrapped in Arc to avoid deep-cloning in execute(). + let splits = plan.splits().to_vec(); + let planned_partitions: Vec> = if splits.is_empty() { + // Empty plans get a single empty partition to avoid 0-partition edge cases. + vec![Arc::from(Vec::new())] + } else { + let target = state.config_options().execution.target_partitions; + let num_partitions = splits.len().min(target.max(1)); + bucket_round_robin(splits, num_partitions) + .into_iter() + .map(Arc::from) + .collect() + }; + Ok(Arc::new(PaimonTableScan::new( projected_schema, self.table.clone(), projected_columns, + planned_partitions, ))) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_bucket_round_robin_distributes_evenly() { + let result = bucket_round_robin(vec![0, 1, 2, 3, 4], 3); + assert_eq!(result, vec![vec![0, 3], vec![1, 4], vec![2]]); + } + + #[test] + fn test_bucket_round_robin_fewer_items_than_buckets() { + let result = bucket_round_robin(vec![10, 20], 2); + assert_eq!(result, vec![vec![10], vec![20]]); + } + + #[test] + fn test_bucket_round_robin_single_bucket() { + let result = bucket_round_robin(vec![1, 2, 3], 1); + assert_eq!(result, vec![vec![1, 2, 3]]); + } +} diff --git a/crates/integrations/datafusion/tests/read_tables.rs b/crates/integrations/datafusion/tests/read_tables.rs index 97bad768..dd1cad76 100644 --- a/crates/integrations/datafusion/tests/read_tables.rs +++ b/crates/integrations/datafusion/tests/read_tables.rs @@ -156,3 +156,55 @@ async fn test_projection_via_datafusion() { "Projected id values should match" ); } + +/// Verifies that `PaimonTableProvider::scan()` produces more than one +/// execution partition for a multi-partition table, and that the reported +/// partition count is still capped by `target_partitions`. +#[tokio::test] +async fn test_scan_partition_count_respects_session_config() { + use datafusion::datasource::TableProvider; + use datafusion::prelude::SessionConfig; + + let warehouse = get_test_warehouse(); + let catalog = FileSystemCatalog::new(warehouse).expect("Failed to create catalog"); + let identifier = Identifier::new("default", "partitioned_log_table"); + let table = catalog + .get_table(&identifier) + .await + .expect("Failed to get table"); + + let provider = PaimonTableProvider::try_new(table).expect("Failed to create table provider"); + + // With generous target_partitions, the plan should expose more than one partition. + let config = SessionConfig::new().with_target_partitions(8); + let ctx = SessionContext::new_with_config(config); + let state = ctx.state(); + let plan = provider + .scan(&state, None, &[], None) + .await + .expect("scan() should succeed"); + + let partition_count = plan.properties().output_partitioning().partition_count(); + assert!( + partition_count > 1, + "partitioned_log_table should produce >1 partitions, got {partition_count}" + ); + + // With target_partitions=1, all splits must be coalesced into a single partition + let config_single = SessionConfig::new().with_target_partitions(1); + let ctx_single = SessionContext::new_with_config(config_single); + let state_single = ctx_single.state(); + let plan_single = provider + .scan(&state_single, None, &[], None) + .await + .expect("scan() should succeed with target_partitions=1"); + + assert_eq!( + plan_single + .properties() + .output_partitioning() + .partition_count(), + 1, + "target_partitions=1 should coalesce all splits into exactly 1 partition" + ); +}