Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve RepartitionExec for better query performance #7001

Open
Tracked by #7000
alamb opened this issue Jul 17, 2023 · 17 comments
Open
Tracked by #7000

Improve RepartitionExec for better query performance #7001

alamb opened this issue Jul 17, 2023 · 17 comments
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Jul 17, 2023

Summary

The key to using multiple cores efficiently is related to query plan is paralleized. In DataFusion this often involves repartitioning data between partitions. This ticket describes ideas for improving the RepartitionExec operator so that DataFusion's performance scales more linearly with the number of cores.

Experiments @YjyJeff in #6928 show that some part of RepartitionExec can be the bottleneck in certain queries. While we likely can't use the unbounded buffering approach of #6929 for the reasons @crepererum explains in #6929 (comment), there is clearly room to improve. Also, we have some earlier evidence from @andygrove in #5999 that scaling with core counts is a challenge.

Background

DataFusion (and most other commercial systems) uses the "Exchange Operator" 1 based approach to parallelism. I believe the exchange approach is so commonly implemented because it is simple to reason about, and works quite well in practice with sufficient engineering care.

DataFusion's planner picks the target number of partitions and then RepartionExec redistributes RecordBatches to that number of output partitions.

For example, if DataFusion has target_partitions=3 (trying to us 3 cores) but scanning an input with 2 partitions, it makes a plan like this:

        ▲                  ▲                  ▲
        │                  │                  │
        │                  │                  │
        │                  │                  │
┌───────────────┐  ┌───────────────┐  ┌───────────────┐
│    GroupBy    │  │    GroupBy    │  │    GroupBy    │
│   (Partial)   │  │   (Partial)   │  │   (Partial)   │
└───────────────┘  └───────────────┘  └───────────────┘
        ▲                  ▲                  ▲
        └──────────────────┼──────────────────┘
                           │
              ┌─────────────────────────┐
              │     RepartitionExec     │
              │   (hash/round robin)    │
              └─────────────────────────┘
                         ▲   ▲
             ┌───────────┘   └───────────┐
             │                           │
             │                           │
        .─────────.                 .─────────.
     ,─'           '─.           ,─'           '─.
    ;      Input      :         ;      Input      :
    :   Partition 0   ;         :   Partition 1   ;
     ╲               ╱           ╲               ╱
      '─.         ,─'             '─.         ,─'
         `───────'                   `───────'

Note there There are alternative approaches, such as described in the Morsel-Driven Parallelism Paper, but other than the notable exception of DuckDB I don't know of any widely used engine that takes this approach. The major benefit to the morsel driven approach that I understand is that it can, in theory, respond better to dynamic resource changes (e.g. throttle down to use 2 cores when a high priority job comes in and then go back up to 8 cores when done).2

The engineering challenge of the RepartitionExec is twofold:

Synchronization and Stalls

Given RepartitionExec is a synchronization point between multiple threads, without careful engineering the lock contention can become the bottleneck.

Also, without careful scheduling the consumers may "stall" waiting for data from a producer.

NUMA

Non Uniform Memory Access (NUMA) basically means that data that is "closer" to a core is faster to access. In practice what this means is that optimal performance is achieved when the same thread /core that produces data (e.g. decodes a RecordBatch from parquet), also then process it until the next pipeline breaking operation (e..g update the hash table). If one core produces the RecordBatch and another consumes it, additional memory latency is incurred.

Since the RepartitionExec is designed to shuffle data, it is very easy to destroy NUMA locality if care is not taken.

I believe the current RoundRobin approach is not NUMA friendly. It will very likely decode some parquet data, put that batch in a queue, and then go and decode more parquet data (rather than return control to the operator that was waiting for it)

Idea 1: Buffering to the RepartitionExec

One idea, suggested by @Dandandan and @ozankabak in #6929 (comment) is to introduce some more (but not unbounded) buffering into the Repartition operator

For example, perhaps we could extend the existing DistributionSender to have a queue of RecordBatches (2 or 3 for example) rather than just a single Option<> so that it was possible to start fetching the next input immediately

https://github.com/apache/arrow-datafusion/blob/d316702722e6c301fdb23a9698f7ec415ef548e9/datafusion/core/src/physical_plan/repartition/distributor_channels.rs#L180-L182

Idea 2: Make RoundRobin more adaptive

Another idea would be to restructure the "RoundRobin" repartition strategy to be more adaptive. Currently each input evenly sends RecordBatches to each output partition and will return control via tx.send(Some(Ok(batch))).await.is_err() if any output partition is full.

We could potentially improve this logic rather than giving up control by attempting to find an output partition that is empty and filling it (aka tx.try_send and if no space, try some other partitions)

Footnotes

  1. This model was first described in the 1989 paper Encapsulation of parallelism in the Volcano query processing system Paper which uses the term "Exchange" for the concept of repartitioning data across threads.

  2. We actually had a version of this approach as an experimental feature in DataFusion for a while but removed it as we found it didn't offer compelling enough performance improvement to justify the engineering effort.

@scsmithr
Copy link
Contributor

👍 for this.

I poked around with removing the gate in the distribution channel implementation. My challenges with the gate implementation was high lock contention, as well as high memory usage when inputs are highly skewed (e.g. one channel received the bulk of the batches, but the gate still let batches in since other channels were empty).

Some experimentation here: https://github.com/GlareDB/arrow-datafusion/blob/repart-perf/datafusion/core/src/physical_plan/repartition/distributor_channels.rs. Note that this hangs on some of the repartition tests, so there's likely a bug in the futures logic.

With a high enough channel buffer size, we start to see similar performance increases seen in the flume pr:

┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃     main ┃ repart-perf ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │ 309.43ms │    309.24ms │     no change │
│ QQuery 2     │  47.65ms │     48.91ms │     no change │
│ QQuery 3     │ 129.55ms │    109.39ms │ +1.18x faster │
│ QQuery 4     │  81.07ms │     51.09ms │ +1.59x faster │
│ QQuery 5     │ 158.61ms │    123.25ms │ +1.29x faster │
│ QQuery 6     │  79.99ms │     80.40ms │     no change │
│ QQuery 7     │ 218.47ms │    202.31ms │ +1.08x faster │
│ QQuery 8     │ 184.88ms │    170.81ms │ +1.08x faster │
│ QQuery 9     │ 266.72ms │    212.60ms │ +1.25x faster │
│ QQuery 10    │ 207.46ms │    149.29ms │ +1.39x faster │
│ QQuery 11    │  44.44ms │     44.40ms │     no change │
│ QQuery 12    │ 150.27ms │    119.97ms │ +1.25x faster │
│ QQuery 13    │ 227.68ms │    229.70ms │     no change │
│ QQuery 14    │ 113.26ms │    111.97ms │     no change │
│ QQuery 15    │  81.23ms │     81.10ms │     no change │
│ QQuery 16    │  51.57ms │     51.50ms │     no change │
│ QQuery 17    │ 338.65ms │    305.00ms │ +1.11x faster │
│ QQuery 18    │ 379.12ms │    370.23ms │     no change │
│ QQuery 19    │ 233.23ms │    234.30ms │     no change │
│ QQuery 20    │ 132.11ms │    129.32ms │     no change │
│ QQuery 21    │ 324.30ms │    256.37ms │ +1.26x faster │
│ QQuery 22    │  50.06ms │     43.18ms │ +1.16x faster │
└──────────────┴──────────┴─────────────┴───────────────┘

(On an M2 Macbook Air)

Not trying to push for any one approach, and this was mostly just an experiment.

@alamb
Copy link
Contributor Author

alamb commented Jul 22, 2023

I have been dreaming about this -- I may try and bash out a prototype in the coming weeks

I find it a very interesting intellectual challenge "can we make a tokio based scheduler build a system that take into account NUMA and cache locality" -- I think we can and this would be an excellent opportunity to show it

@Dandandan
Copy link
Contributor

Dandandan commented Jul 23, 2023

That would be amazing @alamb very interested to see what you come up with :)

@alamb
Copy link
Contributor Author

alamb commented Jul 31, 2023

Also @wolffcm noted elsewhere that

I see a lot of DF plans that contain something like this:

    RepartitionExec: partitioning=Hash(...), input_partitions=12
      RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=2

At first I thought this was an unintentional artifact of how physical planning happens, but now it seems to me that the intention of the RoundRobin repartitioning is to ensure we paralellize the Hash repartitioning, which can be expensive because it looks at each row. Do I have that right?

I believe this is correct, and perhaps would be something to improve in any improvement

@wolffcm
Copy link
Contributor

wolffcm commented Jul 31, 2023

To add a little bit more context to the above, the RoundRobin repartitioning is added as part of the Repartition optimizer pass, and then the Hash repartitioning is added by EnforceDistribution (because a subsequent operator requires it)

@alamb
Copy link
Contributor Author

alamb commented Jul 31, 2023

In my ideal world, the repartition operation would handle effectively keeping all cores busy as well as hashing. I will keep dreaming until I have time

@Dandandan
Copy link
Contributor

The current plan to do round robin before hashing seems optimal to me, like @wolffcm I think also suggests? The other way around would reduce the parallelism of hashing.

@alamb
Copy link
Contributor Author

alamb commented Jul 31, 2023

I was thinking of a unified roundrobin / hashing repartitioner that was in terms of inputs / outputs. I may have mislead myself in my mind and will have to see how the code plays out

@alamb
Copy link
Contributor Author

alamb commented Aug 5, 2023

Some other notes from a DM I am bringing here for visibility:

DataFusions pattern can be described as a "classic pull scheduler with volcano style exchange operators":

  • pull scheduler == Tokio
  • and exchange operators == RepartitionExec

My theory (which I would say not everyone agrees with yet) is that you can get all the claimed benefits of "morsel driven parallelism" (aka a push scheduler) from a "classic pull scheduler with exchange operators

The claimed benefits of morsel driven parallelism of the classic approach are:

  1. Better NUMA / cache locality
  2. Near linear scaling (so if you double core count to 32 to 64 expect to see a linear 2x speed up)
  3. Better runtime optimization by scaling up/down cores, but I think this is less important in the real world (as you most often run out of memory long before you run out of CPU)

Basically I think it is totally reasonable for DataFusion to scale linearly to high core counts -- like 128 / 256. I just need to prove that is the case and I suspect it will take some finagling with the current Repartitioning code

@scsmithr
Copy link
Contributor

With the current architecture, I think the easiest thing to do here is probably to not try to have a one-to-one mapping of task to core, and lean into tokio for scheduling to try to get good resource utilization.

I did a quick experiment for this by increasing the number of partitions to use in the repartition optimization rule. Currently it gets set to target_partitions, and I just multiplied it by 3.

The diff:

diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index aa48fd77a..b4ef8c3a2 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -288,7 +288,7 @@ impl PhysicalOptimizerRule for Repartition {
         plan: Arc<dyn ExecutionPlan>,
         config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        let target_partitions = config.execution.target_partitions;
+        let target_partitions = config.execution.target_partitions * 3;
         let enabled = config.optimizer.enable_round_robin_repartition;
         let repartition_file_scans = config.optimizer.repartition_file_scans;
         let repartition_file_min_size = config.optimizer.repartition_file_min_size;

And some benchmarks using TPCH SF=10

Macbook air m2:

Comparing main and three
--------------------
Benchmark tpch.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃      main ┃     three ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │ 3615.39ms │ 1745.94ms │ +2.07x faster │
│ QQuery 2     │  447.68ms │  450.67ms │     no change │
│ QQuery 3     │ 1320.39ms │  730.22ms │ +1.81x faster │
│ QQuery 4     │  726.73ms │  394.54ms │ +1.84x faster │
│ QQuery 5     │ 1619.80ms │ 1064.71ms │ +1.52x faster │
│ QQuery 6     │  741.31ms │  371.64ms │ +1.99x faster │
│ QQuery 7     │ 3269.57ms │ 2528.63ms │ +1.29x faster │
│ QQuery 8     │ 1972.34ms │ 1200.81ms │ +1.64x faster │
│ QQuery 9     │ 3032.73ms │ 2252.82ms │ +1.35x faster │
│ QQuery 10    │ 2045.88ms │ 1381.35ms │ +1.48x faster │
│ QQuery 11    │  466.09ms │  484.14ms │     no change │
│ QQuery 12    │ 1400.27ms │  652.58ms │ +2.15x faster │
│ QQuery 13    │ 2088.00ms │ 1181.92ms │ +1.77x faster │
│ QQuery 14    │ 1047.22ms │  551.92ms │ +1.90x faster │
│ QQuery 15    │  802.09ms │  650.86ms │ +1.23x faster │
│ QQuery 16    │  429.19ms │  418.61ms │     no change │
│ QQuery 17    │ 4392.38ms │ 4516.79ms │     no change │
│ QQuery 18    │ 7849.27ms │ 7245.33ms │ +1.08x faster │
│ QQuery 19    │ 2180.33ms │ 1012.43ms │ +2.15x faster │
│ QQuery 20    │ 1657.01ms │ 1110.12ms │ +1.49x faster │
│ QQuery 21    │ 3867.34ms │ 3671.96ms │ +1.05x faster │
│ QQuery 22    │  490.25ms │  443.54ms │ +1.11x faster │
└──────────────┴───────────┴───────────┴───────────────┘

GCP n1 vm (8 cores/16 threads, 60G memory):

Comparing main and three
--------------------
Benchmark tpch.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃       main ┃      three ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │  8947.17ms │  3274.31ms │ +2.73x faster │
│ QQuery 2     │  1562.68ms │  1519.65ms │     no change │
│ QQuery 3     │  3839.36ms │  1758.32ms │ +2.18x faster │
│ QQuery 4     │  1995.52ms │   967.04ms │ +2.06x faster │
│ QQuery 5     │  4906.24ms │  2933.39ms │ +1.67x faster │
│ QQuery 6     │  1941.19ms │   710.94ms │ +2.73x faster │
│ QQuery 7     │  9391.94ms │  6423.27ms │ +1.46x faster │
│ QQuery 8     │  5503.67ms │  2735.65ms │ +2.01x faster │
│ QQuery 9     │  8840.15ms │  6129.22ms │ +1.44x faster │
│ QQuery 10    │  5807.23ms │  3804.27ms │ +1.53x faster │
│ QQuery 11    │  1699.21ms │  1744.50ms │     no change │
│ QQuery 12    │  2993.60ms │  1277.92ms │ +2.34x faster │
│ QQuery 13    │  4907.55ms │  2585.08ms │ +1.90x faster │
│ QQuery 14    │  2633.90ms │  1114.95ms │ +2.36x faster │
│ QQuery 15    │  2085.41ms │  1176.97ms │ +1.77x faster │
│ QQuery 16    │  1683.76ms │  1572.89ms │ +1.07x faster │
│ QQuery 17    │  7322.44ms │  6412.73ms │ +1.14x faster │
│ QQuery 18    │ 15950.64ms │ 16131.98ms │     no change │
│ QQuery 19    │  4936.98ms │  1834.62ms │ +2.69x faster │
│ QQuery 20    │  5126.77ms │  3039.32ms │ +1.69x faster │
│ QQuery 21    │ 14114.41ms │  9196.05ms │ +1.53x faster │
│ QQuery 22    │  1449.16ms │  1206.74ms │ +1.20x faster │
└──────────────┴────────────┴────────────┴───────────────┘

@alamb
Copy link
Contributor Author

alamb commented Aug 12, 2023

@scsmithr this is great data -- thank you -- and it does imply there is substantial headroom for performance improvement by using more cores. t is interesting that some queries go 2x as fast -- it is almost like the default value for target_partitions was 1/2 the actual available cores

However, the config seems to be doing the right thing:

https://github.com/apache/arrow-datafusion/blob/ba51b6a334de813a2b90998b72f14281460e8f06/datafusion/common/src/config.rs#L218

🤔

@ozankabak
Copy link
Contributor

That's very interesting, do you have an idea what may be going on @alamb?

@scsmithr
Copy link
Contributor

Much of the improvement here is from repartitioning in the ParquetExec. Since that's io bound, having more tasks than cores improves resource utilization if that's what the rest of query is waiting on. It might be reasonable to introduce a io_target_partitions config variable specifically for repartitioning at the io layer.

@ozankabak
Copy link
Contributor

Makes sense 👍

@alamb
Copy link
Contributor Author

alamb commented Aug 13, 2023

@scsmithr how did you get the numbers you report in #7001 (comment) ?

I believe the benchmarks default to using only 2 cores: https://github.com/apache/arrow-datafusion/blob/ed85abbb878ef3d60e43797376cb9a40955cd89a/benchmarks/src/options.rs#L29-L31

So if you ran the tests without specifying --partitions 16 or whatever they probably are only using 2 cores

I wonder if we should increase the default number of partitions to the core count in the benchmarks 🤔

@scsmithr
Copy link
Contributor

So if you ran the tests without specifying --partitions 16 or whatever they probably are only using 2 cores

Yep completely missed that option. I was running ./benchmarks/bench.sh run tpch10 without setting this flag. Setting --partitions 8 on my macbook negates any improvement.

I'm coming at this with trying to speed up reading unpartitioned parquet files from GCS, and there's improvements when setting target partitions beyond number of cores. The above benchmark numbers were me trying to see if that applied generally, and not just in this specific case. Turns out, it doesn't apply generally... still investigating why I'm seeing speed ups with GCS.

I wonder if we should increase the default number of partitions to the core count in the benchmarks 🤔

Would making partitions an Option make sense? If unset, default to whatever is the default in the session config. I had incorrectly assumed that this is what would happen when running bench.sh.

@alamb
Copy link
Contributor Author

alamb commented Aug 14, 2023

Would making partitions an Option make sense? If unset, default to whatever is the default in the session config. I had incorrectly assumed that this is what would happen when running bench.sh.

I think it would make sense as it better reflects the default configuration DataFusion is run in. I wonder if the benchmarks originally were trying to isolate the specific algorithms and avoid the additional noise that multi-cores brought. However, given that they run with 2 cores now it seems they get the worst of both worlds

@scsmithr do you have time to make a PR with that change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

5 participants