Skip to content

ExecutionPlan::repartitioned doesn't work when file groups have ranges #18940

@Samyak2

Description

@Samyak2

Describe the bug

I'm trying to use ExecutionPlan::repartitioned to change the partitioning of an already optimized plan. The reason I can't just set target_partitions is that I need to change the partitioning of a subtree of the plan, not the entire plan. So I tried recursively calling ExecutionPlan::repartitioned on every node of the plan and noticed that it did not change a thing.

To Reproduce

Here's the code that shows the before and after plans (uses Datafusion 51):

use std::sync::Arc;

use datafusion::{
    config::ConfigOptions,
    error::DataFusionError,
    physical_plan::{ExecutionPlan, displayable},
    prelude::*,
};

fn repartition_subtree(
    plan: Arc<dyn ExecutionPlan>,
    config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
    let new_children = plan
        .children()
        .into_iter()
        .map(|child| repartition_subtree(Arc::clone(child), config))
        .collect::<Result<_, _>>()?;
    let plan = plan.with_new_children(new_children)?;

    // change partitions to 4
    let repartitioned_plan = plan.repartitioned(4, config)?;
    if let Some(new_plan) = repartitioned_plan {
        Ok(new_plan)
    } else {
        Ok(plan)
    }
}

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
    let config = SessionConfig::new()
        .with_target_partitions(2);

    // register the table
    let ctx = SessionContext::new_with_config(config);
    ctx.register_parquet("example", "./partitioned_output", ParquetReadOptions::new())
        .await?;

    // create a plan to run a SQL query
    let df = ctx
        .sql("SELECT b, MIN(0) FROM example GROUP BY b LIMIT 100")
        .await?;

    let plan = df.create_physical_plan().await.unwrap();

    println!("plan: {}", displayable(plan.as_ref()).indent(false));

    let state = ctx.state();

    let new_plan = repartition_subtree(plan, state.config_options()).unwrap();
    println!("new_plan: {}", displayable(new_plan.as_ref()).indent(false));

    Ok(())
}

Here's the code to generate the data (excuse the code quality, it was written with AI):

use std::sync::Arc;

use arrow::array::{Int32Array, Int64Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::prelude::*;
use rand::Rng;

const NUM_ROWS: usize = 10_000_000;
const BATCH_SIZE: usize = 1_000_000; // Process in batches to manage memory

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
    println!("Creating DataFusion context...");
    let ctx = SessionContext::new();

    // Define schema
    let schema = Arc::new(Schema::new(vec![
        Field::new("a", DataType::Int32, false), // 0 to 1000
        Field::new("b", DataType::Int64, false), // 0 to 1,000,000
    ]));

    println!("Generating {} random rows...", NUM_ROWS);

    // Generate data in batches to avoid memory issues
    let mut batches = Vec::new();
    let mut rng = rand::thread_rng();

    for batch_num in 0..(NUM_ROWS / BATCH_SIZE) {
        let start = batch_num * BATCH_SIZE;
        println!(
            "  Generating batch {} ({} - {} rows)...",
            batch_num + 1,
            start,
            start + BATCH_SIZE
        );

        // Generate random data for column 'a' (0 to 1000)
        let col_a: Vec<i32> = (0..BATCH_SIZE).map(|_| rng.gen_range(0..=1000)).collect();

        // Generate random data for column 'b' (0 to 1,000,000)
        let col_b: Vec<i64> = (0..BATCH_SIZE)
            .map(|_| rng.gen_range(0..=1_000_000))
            .collect();

        let batch = RecordBatch::try_new(
            schema.clone(),
            vec![
                Arc::new(Int32Array::from(col_a)),
                Arc::new(Int64Array::from(col_b)),
            ],
        )?;

        batches.push(batch);
    }

    println!("Creating DataFrame from {} batches...", batches.len());

    // Create a DataFrame from the record batches
    let df = ctx.read_batches(batches)?;

    // Show sample data
    println!("\nSample data (first 10 rows):");
    df.clone().limit(0, Some(10))?.show().await?;

    // Show row count
    let row_count = df.clone().count().await?;
    println!("\nTotal rows: {}", row_count);

    // Write as partitioned parquet files (partitioned by column 'a')
    let output_path = "./partitioned_output";
    println!(
        "\nWriting partitioned parquet files to '{}'...",
        output_path
    );
    println!("Partitioning by column 'a'...");

    df.write_parquet(
        output_path,
        DataFrameWriteOptions::new().with_partition_by(vec!["a".to_string()]),
        None, // Use default parquet writer options
    )
    .await?;

    println!("\n✓ Successfully wrote partitioned parquet files!");
    println!("  Output directory: {}", output_path);

    Ok(())
}

With the latest main of datafusion (commit 48cc4c8af3a5ad500a44c8625ea44d6b4827af1e), we get this:

plan: CoalescePartitionsExec: fetch=100
  AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[min(Int64(0))]
    CoalesceBatchesExec: target_batch_size=8192
      RepartitionExec: partitioning=Hash([b@0], 2), input_partitions=2
        AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[min(Int64(0))]
          DataSourceExec: file_groups={2 groups: [[<removed for brevity>]]}, projection=[b], file_type=parquet

new_plan: CoalescePartitionsExec: fetch=100
  AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[min(Int64(0))]
    CoalesceBatchesExec: target_batch_size=8192
      RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=2
        AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[min(Int64(0))]
          DataSourceExec: file_groups={2 groups: [[<removed for brevity>]]}, projection=[b], file_type=parquet

The partitioning remains the same.

Now, if I remove the range check, we get this:

new_plan: CoalescePartitionsExec: fetch=100
  AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[min(Int64(0))]
    CoalesceBatchesExec: target_batch_size=8192
      RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=4
        AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[min(Int64(0))]
          DataSourceExec: file_groups={4 groups: [[<removed for brevity>]]}, projection=[b], file_type=parquet

The partitioning increases as expected.

Expected behavior

The plan should be repartitioned even if ranges exist on the file groups. In the above example, I don't have any filters, so the whole file is being scanned. But because ranges exist on the file, the partitioning isn't happening.

Additional context

There doesn't seem to be any good reason to skip partitioning when file ranges exist. @Jefffrey also thinks the same: #18924 (reply in thread)

I'm willing to get a PR out for this if it makes sense

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions