Skip to content

fix: drop input plan early in CoalescePartitionsExec#22017

Open
Samyak2 wants to merge 2 commits intoapache:mainfrom
Samyak2:coalesce-part-early-drop
Open

fix: drop input plan early in CoalescePartitionsExec#22017
Samyak2 wants to merge 2 commits intoapache:mainfrom
Samyak2:coalesce-part-early-drop

Conversation

@Samyak2
Copy link
Copy Markdown
Contributor

@Samyak2 Samyak2 commented May 5, 2026

Which issue does this PR close?

Rationale for this change

  • CoalescePartitionsExec uses RecordBatchReceiverStreamBuilder which was holding to an Arc ref of the input plan, until the stream was done.
  • More details in the issue.

What changes are included in this PR?

  • Now we drop the input plan early.
  • The only other usage of input was in debug prints, to display the input plan. We now take the display string early, but only when debug logging is enabled.

Are these changes tested?

  • Added a test that ensures this. Verified that the test fails without the fix.

The reproducer (Samyak2#1) before the fix:

repartition_task_group=0 input_partition=0 kind=pull_from_input drop_elapsed_ms=68
repartition_task_group=1 input_partition=0 kind=pull_from_input drop_elapsed_ms=80
repartition_task_group=1 input_partition=1 kind=pull_from_input drop_elapsed_ms=85
output_partitions=32 input_rows_per_partition=1024000 all_repartition_operator_drop_elapsed_ms=80
all_repartition_task_drop_elapsed_ms=85
all_observed_drop_elapsed_ms=85

~85ms to cancel repartition tasks.

After fix:

repartition_task_group=0 input_partition=0 kind=pull_from_input drop_elapsed_ms=16
repartition_task_group=1 input_partition=0 kind=pull_from_input drop_elapsed_ms=0
repartition_task_group=1 input_partition=1 kind=pull_from_input drop_elapsed_ms=0
output_partitions=32 input_rows_per_partition=1024000 all_repartition_operator_drop_elapsed_ms=0
all_repartition_task_drop_elapsed_ms=16
all_observed_drop_elapsed_ms=16

~16ms to cancel repartition tasks.

Are there any user-facing changes?

CPU/memory gets released earlier on cancellation

- Fixes apache#22016
- `CoalescePartitionsExec` uses `RecordBatchReceiverStreamBuilder` which
  was holding to an Arc ref of the input plan, until the stream was
  done.
- Now we drop it early.
- The only other usage of `input` was in debug prints, to display the
  input plan. We now take the display string early, but only when debug
  logging is enabled.
- Added a test that ensures this. Verified that the test fails without
  the fix.
@github-actions github-actions Bot added the physical-plan Changes to the physical-plan crate label May 5, 2026
Comment thread datafusion/physical-plan/src/stream.rs Outdated
let input_display = if log::log_enabled!(log::Level::Debug) {
displayable(input.as_ref()).one_line().to_string()
} else {
"".to_owned()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use String::new()?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Done!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't resolve comments, I like to go back to them during review.

debug!(
"Stopping execution: error executing input: {}",
displayable(input.as_ref()).one_line()
"Stopping execution: error executing input: {input_display}",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Err branch returns before reaching drop(input) at line 356, so input only drops on scope end via the closure. Moving the drop into the Ok arm makes the intent clearer and removes the misleading appearance that drop runs on both paths.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a return in this Err branch, so the closure ends immediately and drops input, right? So in both cases, we essentially drop it after this match. Or am I missing something?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

CoalescePartitionsExec delays cancellation of child operators

2 participants