Skip to content

Use local coalescers in repartition#23269

Closed
Rachelint wants to merge 1 commit into
apache:mainfrom
Rachelint:support-local-coalesce-in-repartition
Closed

Use local coalescers in repartition#23269
Rachelint wants to merge 1 commit into
apache:mainfrom
Rachelint:support-local-coalesce-in-repartition

Conversation

@Rachelint

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

  • Closes #.

Rationale for this change

The current producer-side repartition coalescer is shared by all input tasks for each output partition. That adds synchronization around every coalesced batch path when multiple input tasks target the same output partition.

What changes are included in this PR?

This PR replaces the shared per-output-partition coalescer with local per-producer-channel coalescers in RepartitionExec:

  • each non-preserve-order output channel owns its own LimitedBatchCoalescer
  • preserve-order mode still skips producer-side coalescing and relies on StreamingMergeBuilder
  • local coalescers are finalized by their owning input task at end of input
  • the shared Arc<Mutex<LimitedBatchCoalescer>> and active-sender tracking are removed

Are these changes tested?

Ran:

  • cargo fmt --all
  • cargo check -p datafusion-physical-plan
  • cargo clippy -p datafusion-physical-plan --all-targets --all-features -- -D warnings
  • cargo clippy --all-targets --all-features -- -D warnings

Existing repartition tests cover the coalescing and spilling paths.

Are there any user-facing changes?

No user-facing API changes.

@github-actions github-actions Bot added the physical-plan Changes to the physical-plan crate label Jun 30, 2026
@Rachelint

Copy link
Copy Markdown
Contributor Author

run benchmarks clickbench_partitioned

@adriangbot

Copy link
Copy Markdown

🤖 Benchmark running (GKE) | trigger
Instance: c4a-highmem-16 (12 vCPU / 65 GiB) | Linux bench-c4848229894-763-tjcqz 6.12.85+ #1 SMP Mon May 11 08:17:35 UTC 2026 aarch64 GNU/Linux

CPU Details (lscpu)
Architecture:                            aarch64
CPU op-mode(s):                          64-bit
Byte Order:                              Little Endian
CPU(s):                                  16
On-line CPU(s) list:                     0-15
Vendor ID:                               ARM
Model name:                              Neoverse-V2
Model:                                   1
Thread(s) per core:                      1
Core(s) per cluster:                     16
Socket(s):                               -
Cluster(s):                              1
Stepping:                                r0p1
BogoMIPS:                                2000.00
Flags:                                   fp asimd evtstrm aes pmull sha1 sha2 crc32 atomics fphp asimdhp cpuid asimdrdm jscvt fcma lrcpc dcpop sha3 sm3 sm4 asimddp sha512 sve asimdfhm dit uscat ilrcpc flagm sb paca pacg dcpodp sve2 sveaes svepmull svebitperm svesha3 svesm4 flagm2 frint svei8mm svebf16 i8mm bf16 dgh rng bti
L1d cache:                               1 MiB (16 instances)
L1i cache:                               1 MiB (16 instances)
L2 cache:                                32 MiB (16 instances)
L3 cache:                                80 MiB (1 instance)
NUMA node(s):                            1
NUMA node0 CPU(s):                       0-15
Vulnerability Gather data sampling:      Not affected
Vulnerability Indirect target selection: Not affected
Vulnerability Itlb multihit:             Not affected
Vulnerability L1tf:                      Not affected
Vulnerability Mds:                       Not affected
Vulnerability Meltdown:                  Not affected
Vulnerability Mmio stale data:           Not affected
Vulnerability Reg file data sampling:    Not affected
Vulnerability Retbleed:                  Not affected
Vulnerability Spec rstack overflow:      Not affected
Vulnerability Spec store bypass:         Mitigation; Speculative Store Bypass disabled via prctl
Vulnerability Spectre v1:                Mitigation; __user pointer sanitization
Vulnerability Spectre v2:                Mitigation; CSV2, BHB
Vulnerability Srbds:                     Not affected
Vulnerability Tsa:                       Not affected
Vulnerability Tsx async abort:           Not affected
Vulnerability Vmscape:                   Not affected

Comparing support-local-coalesce-in-repartition (51559c5) to 742361b (merge-base) diff using: clickbench_partitioned
Results will be posted here when complete


File an issue against this benchmark runner

@Rachelint Rachelint marked this pull request as draft June 30, 2026 22:09
@adriangbot

Copy link
Copy Markdown

🤖 Benchmark completed (GKE) | trigger

Instance: c4a-highmem-16 (12 vCPU / 65 GiB)

CPU Details (lscpu)
Architecture:                            aarch64
CPU op-mode(s):                          64-bit
Byte Order:                              Little Endian
CPU(s):                                  16
On-line CPU(s) list:                     0-15
Vendor ID:                               ARM
Model name:                              Neoverse-V2
Model:                                   1
Thread(s) per core:                      1
Core(s) per cluster:                     16
Socket(s):                               -
Cluster(s):                              1
Stepping:                                r0p1
BogoMIPS:                                2000.00
Flags:                                   fp asimd evtstrm aes pmull sha1 sha2 crc32 atomics fphp asimdhp cpuid asimdrdm jscvt fcma lrcpc dcpop sha3 sm3 sm4 asimddp sha512 sve asimdfhm dit uscat ilrcpc flagm sb paca pacg dcpodp sve2 sveaes svepmull svebitperm svesha3 svesm4 flagm2 frint svei8mm svebf16 i8mm bf16 dgh rng bti
L1d cache:                               1 MiB (16 instances)
L1i cache:                               1 MiB (16 instances)
L2 cache:                                32 MiB (16 instances)
L3 cache:                                80 MiB (1 instance)
NUMA node(s):                            1
NUMA node0 CPU(s):                       0-15
Vulnerability Gather data sampling:      Not affected
Vulnerability Indirect target selection: Not affected
Vulnerability Itlb multihit:             Not affected
Vulnerability L1tf:                      Not affected
Vulnerability Mds:                       Not affected
Vulnerability Meltdown:                  Not affected
Vulnerability Mmio stale data:           Not affected
Vulnerability Reg file data sampling:    Not affected
Vulnerability Retbleed:                  Not affected
Vulnerability Spec rstack overflow:      Not affected
Vulnerability Spec store bypass:         Mitigation; Speculative Store Bypass disabled via prctl
Vulnerability Spectre v1:                Mitigation; __user pointer sanitization
Vulnerability Spectre v2:                Mitigation; CSV2, BHB
Vulnerability Srbds:                     Not affected
Vulnerability Tsa:                       Not affected
Vulnerability Tsx async abort:           Not affected
Vulnerability Vmscape:                   Not affected
Details

Comparing HEAD and support-local-coalesce-in-repartition
--------------------
Benchmark clickbench_partitioned.json
--------------------
┏━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query     ┃                                  HEAD ┃ support-local-coalesce-in-repartition ┃        Change ┃
┡━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0  │          1.23 / 4.11 ±5.58 / 15.26 ms │          1.22 / 4.07 ±5.57 / 15.21 ms │     no change │
│ QQuery 1  │        12.76 / 13.46 ±0.55 / 14.36 ms │        12.47 / 12.97 ±0.31 / 13.29 ms │     no change │
│ QQuery 2  │        37.11 / 37.49 ±0.41 / 38.26 ms │        35.90 / 36.31 ±0.33 / 36.84 ms │     no change │
│ QQuery 3  │        32.33 / 32.91 ±0.50 / 33.61 ms │        31.14 / 31.46 ±0.42 / 32.25 ms │     no change │
│ QQuery 4  │     229.87 / 232.49 ±2.33 / 235.30 ms │     227.01 / 230.47 ±2.70 / 234.29 ms │     no change │
│ QQuery 5  │     276.75 / 279.45 ±1.93 / 282.62 ms │     275.40 / 279.59 ±3.85 / 286.05 ms │     no change │
│ QQuery 6  │           1.31 / 1.45 ±0.22 / 1.88 ms │           1.28 / 1.42 ±0.23 / 1.88 ms │     no change │
│ QQuery 7  │        14.17 / 14.33 ±0.14 / 14.55 ms │        13.78 / 14.14 ±0.24 / 14.45 ms │     no change │
│ QQuery 8  │     328.91 / 333.41 ±3.45 / 338.63 ms │     328.94 / 333.72 ±4.46 / 339.64 ms │     no change │
│ QQuery 9  │    463.91 / 478.12 ±13.61 / 503.11 ms │    425.86 / 444.07 ±11.87 / 462.79 ms │ +1.08x faster │
│ QQuery 10 │        73.22 / 74.53 ±1.28 / 76.69 ms │        69.49 / 71.04 ±1.74 / 74.38 ms │     no change │
│ QQuery 11 │        85.36 / 87.15 ±2.65 / 92.40 ms │        80.57 / 83.74 ±3.75 / 90.84 ms │     no change │
│ QQuery 12 │     270.43 / 276.90 ±5.42 / 286.41 ms │     270.87 / 277.26 ±5.08 / 283.72 ms │     no change │
│ QQuery 13 │    367.34 / 390.27 ±16.00 / 413.92 ms │    386.01 / 396.90 ±13.39 / 423.04 ms │     no change │
│ QQuery 14 │     285.18 / 292.97 ±5.23 / 300.69 ms │     288.13 / 293.61 ±6.08 / 304.25 ms │     no change │
│ QQuery 15 │    276.04 / 285.21 ±11.02 / 304.87 ms │     278.81 / 285.58 ±7.65 / 299.42 ms │     no change │
│ QQuery 16 │     618.90 / 629.43 ±7.27 / 641.22 ms │     624.17 / 633.93 ±9.26 / 650.77 ms │     no change │
│ QQuery 17 │     623.78 / 631.37 ±6.50 / 640.50 ms │     623.20 / 632.19 ±5.41 / 639.85 ms │     no change │
│ QQuery 18 │ 1259.99 / 1285.93 ±15.91 / 1306.43 ms │ 1277.08 / 1302.57 ±22.96 / 1335.55 ms │     no change │
│ QQuery 19 │        29.12 / 29.43 ±0.33 / 30.07 ms │        28.17 / 30.64 ±4.49 / 39.62 ms │     no change │
│ QQuery 20 │     518.44 / 528.20 ±9.34 / 542.74 ms │     522.30 / 533.16 ±8.64 / 543.18 ms │     no change │
│ QQuery 21 │     517.52 / 525.12 ±4.61 / 530.28 ms │     520.09 / 529.01 ±7.03 / 538.13 ms │     no change │
│ QQuery 22 │ 1002.71 / 1020.44 ±12.10 / 1035.34 ms │  1002.38 / 1009.02 ±3.95 / 1013.00 ms │     no change │
│ QQuery 23 │ 3142.19 / 3163.85 ±15.61 / 3190.84 ms │ 3092.14 / 3133.91 ±39.11 / 3205.94 ms │     no change │
│ QQuery 24 │        42.02 / 45.65 ±7.05 / 59.74 ms │        41.74 / 45.21 ±3.26 / 49.94 ms │     no change │
│ QQuery 25 │     115.31 / 123.12 ±8.02 / 135.32 ms │     111.92 / 113.37 ±0.76 / 114.02 ms │ +1.09x faster │
│ QQuery 26 │        42.38 / 42.89 ±0.48 / 43.74 ms │        41.89 / 46.22 ±7.14 / 60.43 ms │  1.08x slower │
│ QQuery 27 │     671.48 / 678.56 ±3.79 / 682.40 ms │     677.63 / 687.20 ±8.87 / 701.02 ms │     no change │
│ QQuery 28 │ 3052.10 / 3076.77 ±20.28 / 3109.31 ms │ 3048.93 / 3068.36 ±10.94 / 3078.72 ms │     no change │
│ QQuery 29 │        41.70 / 44.41 ±5.14 / 54.69 ms │       41.07 / 58.42 ±15.62 / 77.04 ms │  1.32x slower │
│ QQuery 30 │     304.77 / 311.95 ±4.66 / 319.23 ms │     302.48 / 313.53 ±8.74 / 327.78 ms │     no change │
│ QQuery 31 │     289.43 / 298.39 ±8.93 / 311.24 ms │    283.61 / 295.83 ±10.97 / 311.69 ms │     no change │
│ QQuery 32 │   949.28 / 982.80 ±29.76 / 1028.01 ms │  963.97 / 1001.63 ±22.65 / 1030.92 ms │     no change │
│ QQuery 33 │ 1458.77 / 1495.81 ±28.45 / 1533.08 ms │ 1441.13 / 1476.80 ±26.98 / 1516.56 ms │     no change │
│ QQuery 34 │ 1467.64 / 1525.19 ±47.28 / 1588.63 ms │ 1447.95 / 1480.89 ±25.02 / 1504.87 ms │     no change │
│ QQuery 35 │    285.81 / 321.27 ±42.09 / 397.81 ms │    286.51 / 307.68 ±24.89 / 354.54 ms │     no change │
│ QQuery 36 │       64.76 / 75.08 ±10.80 / 92.58 ms │        68.07 / 74.59 ±3.48 / 77.55 ms │     no change │
│ QQuery 37 │        35.98 / 40.70 ±4.87 / 48.31 ms │        35.91 / 37.47 ±2.06 / 41.47 ms │ +1.09x faster │
│ QQuery 38 │        40.44 / 43.77 ±4.55 / 52.66 ms │        41.46 / 45.98 ±5.30 / 56.12 ms │  1.05x slower │
│ QQuery 39 │     149.77 / 152.76 ±3.57 / 159.51 ms │     147.09 / 155.24 ±6.71 / 165.94 ms │     no change │
│ QQuery 40 │        14.40 / 14.76 ±0.27 / 15.07 ms │        14.36 / 17.13 ±5.21 / 27.56 ms │  1.16x slower │
│ QQuery 41 │        13.96 / 14.07 ±0.10 / 14.22 ms │        14.01 / 14.42 ±0.47 / 15.33 ms │     no change │
│ QQuery 42 │        13.69 / 13.71 ±0.02 / 13.73 ms │        13.62 / 15.72 ±3.88 / 23.47 ms │  1.15x slower │
└───────────┴───────────────────────────────────────┴───────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                                    ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (HEAD)                                    │ 19949.71ms │
│ Total Time (support-local-coalesce-in-repartition)   │ 19856.47ms │
│ Average Time (HEAD)                                  │   463.95ms │
│ Average Time (support-local-coalesce-in-repartition) │   461.78ms │
│ Queries Faster                                       │          3 │
│ Queries Slower                                       │          5 │
│ Queries with No Change                               │         35 │
│ Queries with Failure                                 │          0 │
└──────────────────────────────────────────────────────┴────────────┘

Resource Usage

clickbench_partitioned — base (merge-base)

Metric Value
Wall time 105.0s
Peak memory 13.4 GiB
Avg memory 4.6 GiB
CPU user 1023.0s
CPU sys 71.8s
Peak spill 0 B

clickbench_partitioned — branch

Metric Value
Wall time 100.0s
Peak memory 11.8 GiB
Avg memory 4.8 GiB
CPU user 1019.0s
CPU sys 73.3s
Peak spill 0 B

File an issue against this benchmark runner

@Rachelint Rachelint closed this Jul 1, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants