Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/integrations/datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
//! let df = ctx.sql("SELECT * FROM my_table").await?;
//! ```
//!
//! This version does not support write, column projection, or predicate pushdown.
//! This version does not support write or predicate pushdown.

mod error;
mod physical_plan;
Expand Down
89 changes: 78 additions & 11 deletions crates/integrations/datafusion/src/physical_plan/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,24 @@ use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties};
use futures::{StreamExt, TryStreamExt};
use paimon::table::Table;
use paimon::DataSplit;

use crate::error::to_datafusion_error;

/// Execution plan that scans a Paimon table with optional column projection.
///
/// Planning is performed eagerly in [`super::super::table::PaimonTableProvider::scan`],
/// and the resulting splits are distributed across DataFusion execution partitions
/// so that DataFusion can schedule them in parallel.
#[derive(Debug)]
pub struct PaimonTableScan {
table: Table,
/// Projected column names (if None, reads all columns).
projected_columns: Option<Vec<String>>,
/// Pre-planned partition assignments: `planned_partitions[i]` contains the
/// Paimon splits that DataFusion partition `i` will read.
/// Wrapped in `Arc` to avoid deep-cloning `DataSplit` metadata in `execute()`.
planned_partitions: Vec<Arc<[DataSplit]>>,
plan_properties: PlanProperties,
}

Expand All @@ -44,20 +53,18 @@ impl PaimonTableScan {
schema: ArrowSchemaRef,
table: Table,
projected_columns: Option<Vec<String>>,
planned_partitions: Vec<Arc<[DataSplit]>>,
) -> Self {
let plan_properties = PlanProperties::new(
EquivalenceProperties::new(schema.clone()),
// TODO: Currently all Paimon splits are read in a single DataFusion partition,
// which means we lose DataFusion parallelism. A follow-up should expose one
// execution partition per Paimon split so that DataFusion can schedule them
// across threads.
Partitioning::UnknownPartitioning(1),
Partitioning::UnknownPartitioning(planned_partitions.len()),
EmissionType::Incremental,
Boundedness::Bounded,
);
Self {
table,
projected_columns,
planned_partitions,
plan_properties,
}
}
Expand Down Expand Up @@ -93,26 +100,30 @@ impl ExecutionPlan for PaimonTableScan {

fn execute(
&self,
_partition: usize,
partition: usize,
_context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
let splits = Arc::clone(self.planned_partitions.get(partition).ok_or_else(|| {
datafusion::error::DataFusionError::Internal(format!(
"PaimonTableScan: partition index {partition} out of range (total {})",
self.planned_partitions.len()
))
})?);

let table = self.table.clone();
let schema = self.schema();
let projected_columns = self.projected_columns.clone();

let fut = async move {
let mut read_builder = table.new_read_builder();

// Apply projection if specified
if let Some(ref columns) = projected_columns {
let col_refs: Vec<&str> = columns.iter().map(|s| s.as_str()).collect();
read_builder.with_projection(&col_refs);
}

let scan = read_builder.new_scan();
let plan = scan.plan().await.map_err(to_datafusion_error)?;
let read = read_builder.new_read().map_err(to_datafusion_error)?;
let stream = read.to_arrow(plan.splits()).map_err(to_datafusion_error)?;
let stream = read.to_arrow(&splits).map_err(to_datafusion_error)?;
let stream = stream.map(|r| r.map_err(to_datafusion_error));

Ok::<_, datafusion::error::DataFusionError>(RecordBatchStreamAdapter::new(
Expand All @@ -135,6 +146,62 @@ impl DisplayAs for PaimonTableScan {
_t: datafusion::physical_plan::DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
write!(f, "PaimonTableScan")
write!(
f,
"PaimonTableScan: partitions={}",
self.planned_partitions.len()
)
}
}

#[cfg(test)]
mod tests {
use super::*;
use datafusion::arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
use datafusion::physical_plan::ExecutionPlan;

fn test_schema() -> ArrowSchemaRef {
Arc::new(Schema::new(vec![Field::new(
"id",
ArrowDataType::Int32,
false,
)]))
}

#[test]
fn test_partition_count_empty_plan() {
let schema = test_schema();
let scan = PaimonTableScan::new(schema, dummy_table(), None, vec![Arc::from(Vec::new())]);
assert_eq!(scan.properties().output_partitioning().partition_count(), 1);
}

#[test]
fn test_partition_count_multiple_partitions() {
let schema = test_schema();
let planned_partitions = vec![
Arc::from(Vec::new()),
Arc::from(Vec::new()),
Arc::from(Vec::new()),
];
let scan = PaimonTableScan::new(schema, dummy_table(), None, planned_partitions);
assert_eq!(scan.properties().output_partitioning().partition_count(), 3);
}

/// Constructs a minimal Table for testing (no real files needed since we
/// only test PlanProperties, not actual reads).
fn dummy_table() -> Table {
use paimon::catalog::Identifier;
use paimon::io::FileIOBuilder;
use paimon::spec::{Schema, TableSchema};

let file_io = FileIOBuilder::new("file").build().unwrap();
let schema = Schema::builder().build().unwrap();
let table_schema = TableSchema::new(0, &schema);
Table::new(
file_io,
Identifier::new("test_db", "test_table"),
"/tmp/test-table".to_string(),
table_schema,
)
}
}
57 changes: 56 additions & 1 deletion crates/integrations/datafusion/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use datafusion::logical_expr::Expr;
use datafusion::physical_plan::ExecutionPlan;
use paimon::table::Table;

use crate::error::to_datafusion_error;
use crate::physical_plan::PaimonTableScan;
use crate::schema::paimon_schema_to_arrow;

Expand Down Expand Up @@ -57,6 +58,15 @@ impl PaimonTableProvider {
}
}

/// Distribute `items` into `num_buckets` groups using round-robin assignment.
fn bucket_round_robin<T>(items: Vec<T>, num_buckets: usize) -> Vec<Vec<T>> {
let mut buckets: Vec<Vec<T>> = (0..num_buckets).map(|_| Vec::new()).collect();
for (i, item) in items.into_iter().enumerate() {
buckets[i % num_buckets].push(item);
}
buckets
}

#[async_trait]
impl TableProvider for PaimonTableProvider {
fn as_any(&self) -> &dyn Any {
Expand All @@ -73,7 +83,7 @@ impl TableProvider for PaimonTableProvider {

async fn scan(
&self,
_state: &dyn Session,
state: &dyn Session,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
Expand All @@ -90,10 +100,55 @@ impl TableProvider for PaimonTableProvider {
(self.schema.clone(), None)
};

// Plan splits eagerly so we know partition count upfront.
let read_builder = self.table.new_read_builder();
let scan = read_builder.new_scan();
let plan = scan.plan().await.map_err(to_datafusion_error)?;

// Distribute splits across DataFusion partitions, capped by the
// session's target_partitions to avoid over-sharding with many small splits.
// Each partition's splits are wrapped in Arc to avoid deep-cloning in execute().
let splits = plan.splits().to_vec();
let planned_partitions: Vec<Arc<[_]>> = if splits.is_empty() {
// Empty plans get a single empty partition to avoid 0-partition edge cases.
vec![Arc::from(Vec::new())]
} else {
let target = state.config_options().execution.target_partitions;
let num_partitions = splits.len().min(target.max(1));
bucket_round_robin(splits, num_partitions)
.into_iter()
.map(Arc::from)
.collect()
};

Ok(Arc::new(PaimonTableScan::new(
projected_schema,
self.table.clone(),
projected_columns,
planned_partitions,
)))
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_bucket_round_robin_distributes_evenly() {
let result = bucket_round_robin(vec![0, 1, 2, 3, 4], 3);
assert_eq!(result, vec![vec![0, 3], vec![1, 4], vec![2]]);
}

#[test]
fn test_bucket_round_robin_fewer_items_than_buckets() {
let result = bucket_round_robin(vec![10, 20], 2);
assert_eq!(result, vec![vec![10], vec![20]]);
}

#[test]
fn test_bucket_round_robin_single_bucket() {
let result = bucket_round_robin(vec![1, 2, 3], 1);
assert_eq!(result, vec![vec![1, 2, 3]]);
}
}
52 changes: 52 additions & 0 deletions crates/integrations/datafusion/tests/read_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,55 @@ async fn test_projection_via_datafusion() {
"Projected id values should match"
);
}

/// Verifies that `PaimonTableProvider::scan()` produces more than one
/// execution partition for a multi-partition table, and that the reported
/// partition count is still capped by `target_partitions`.
#[tokio::test]
async fn test_scan_partition_count_respects_session_config() {
use datafusion::datasource::TableProvider;
use datafusion::prelude::SessionConfig;

let warehouse = get_test_warehouse();
let catalog = FileSystemCatalog::new(warehouse).expect("Failed to create catalog");
let identifier = Identifier::new("default", "partitioned_log_table");
let table = catalog
.get_table(&identifier)
.await
.expect("Failed to get table");

let provider = PaimonTableProvider::try_new(table).expect("Failed to create table provider");

// With generous target_partitions, the plan should expose more than one partition.
let config = SessionConfig::new().with_target_partitions(8);
let ctx = SessionContext::new_with_config(config);
let state = ctx.state();
let plan = provider
.scan(&state, None, &[], None)
.await
.expect("scan() should succeed");

let partition_count = plan.properties().output_partitioning().partition_count();
assert!(
partition_count > 1,
"partitioned_log_table should produce >1 partitions, got {partition_count}"
);

// With target_partitions=1, all splits must be coalesced into a single partition
let config_single = SessionConfig::new().with_target_partitions(1);
let ctx_single = SessionContext::new_with_config(config_single);
let state_single = ctx_single.state();
let plan_single = provider
.scan(&state_single, None, &[], None)
.await
.expect("scan() should succeed with target_partitions=1");

assert_eq!(
plan_single
.properties()
.output_partitioning()
.partition_count(),
1,
"target_partitions=1 should coalesce all splits into exactly 1 partition"
);
}
Loading