Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More parallel execution for queries with FINAL #36396

Merged
merged 30 commits into from
Jun 15, 2022

Conversation

nickitat
Copy link
Member

@nickitat nickitat commented Apr 18, 2022

Changelog category (leave one):

  • Performance Improvement

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):

Now we split data parts into layers and distribute them among threads instead of whole parts to make the execution of queries with FINAL more data-parallel.

an example of query pipeline
(Expression)
ExpressionTransform × 8
  (SettingQuotaAndLimits)
    (ReadFromMergeTree)
    ExpressionTransform × 8
      ReplacingSorted 4 → 1
        ExpressionTransform × 4
          FilterTransform × 4
          Description: filter values in [(4382720), +inf)
            MergeTreeInOrder × 4 0 → 1
              ReplacingSorted 4 → 1
                ExpressionTransform × 4
                  FilterTransform × 4
                  Description: filter values in [(3751936), (4382720))
                    MergeTreeInOrder × 4 0 → 1
                      ReplacingSorted 4 → 1
                        ExpressionTransform × 4
                          FilterTransform × 4
                          Description: filter values in [(3129344), (3751936))
                            MergeTreeInOrder × 4 0 → 1
                              ReplacingSorted 4 → 1
                                ExpressionTransform × 4
                                  FilterTransform × 4
                                  Description: filter values in [(2498560), (3129344))
                                    MergeTreeInOrder × 4 0 → 1
                                      ReplacingSorted 4 → 1
                                        ExpressionTransform × 4
                                          FilterTransform × 4
                                          Description: filter values in [(1875968), (2498560))
                                            MergeTreeInOrder × 4 0 → 1
                                              ReplacingSorted 4 → 1
                                                ExpressionTransform × 4
                                                  FilterTransform × 4
                                                  Description: filter values in [(1245184), (1875968))
                                                    MergeTreeInOrder × 4 0 → 1
                                                      ReplacingSorted 4 → 1
                                                        ExpressionTransform × 4
                                                          FilterTransform × 4
                                                          Description: filter values in [(622592), (1245184))
                                                            MergeTreeInOrder × 4 0 → 1
                                                              ReplacingSorted 4 → 1
                                                                ExpressionTransform × 4
                                                                  FilterTransform × 4
                                                                  Description: filter values in [-inf, (622592))
                                                                    MergeTreeInOrder × 4 0 → 1

perf tests

@nickitat nickitat added the can be tested Allows running workflows for external contributors label Apr 18, 2022
@robot-clickhouse robot-clickhouse added the pr-not-for-changelog This PR should not be mentioned in the changelog label Apr 18, 2022
@nickitat nickitat added pr-performance Pull request with some performance improvements and removed pr-not-for-changelog This PR should not be mentioned in the changelog labels Apr 21, 2022
@nickitat nickitat changed the title [WIP] More data parallel final [WIP] More data parallel execution for queries with FINAL Apr 21, 2022
@nickitat nickitat changed the title [WIP] More data parallel execution for queries with FINAL More data parallel execution for queries with FINAL Apr 21, 2022
@nickitat nickitat changed the title More data parallel execution for queries with FINAL More parallel execution for queries with FINAL Apr 21, 2022
@nickitat nickitat marked this pull request as ready for review April 22, 2022 11:31
@nickitat
Copy link
Member Author

@Mergifyio update

@mergify
Copy link
Contributor

mergify bot commented Apr 23, 2022

update

✅ Branch has been successfully updated

@KochetovNicolai KochetovNicolai self-assigned this Apr 28, 2022
extern const int LOGICAL_ERROR;
}

struct IIndexAccess
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, it's good to write a comment to

  • every class (both interface and implementation)
  • all virtual methods
  • and, actually, every method which purpose is not obvious from the first glance

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this iface has single implementation.
Do we actually need it? I think that, generally, you should not create an interfece if you don't plan to have > 1 implementations...


struct IIndexAccess
{
struct Value : std::vector<Field>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inheritance from std containers is suspicious. Maybe, composition?

Comment on lines 70 to 71
return std::accumulate(
parts.begin(), parts.end(), static_cast<size_t>(0), [](size_t sum, const auto & part) { return sum + part.getRowsCount(); });
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, but I would prefer for loop :)

@UnamedRus
Copy link
Contributor

UnamedRus commented Apr 28, 2022

Hm, can we use it in order to lower amount of memory usage during GROUP BY column_from_order_by.

If each thread will have it's own range of column_from_order_by values, it does mean, that threads will not have all possible values in their own hash tables.

For example optimize_aggregation_in_order doesn't do such parallelization as it could be.

Comment on lines 56 to 58
// NULL_LAST
if (value[i].isNull())
value[i] = POSITIVE_INFINITY;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not clear - why this is needed?

enum class Type
{
Border,
RangeBeginning,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can also add RangeEnd event to remove some ranges from event_queue

@nickitat nickitat marked this pull request as ready for review May 11, 2022 18:33
@jorisgio
Copy link
Contributor

jorisgio commented Jun 3, 2022

We have been running this code for 2 weeks, and from my perspective, it is very promising and delivering huge performance gain 🎉 . Here are some numbers (unfortunately i did not get any number before patch to compare, but i can tell you it was an order of magnitude slower).

311.49 billion rows, 13.44 TB (33.43 billion rows/s., 1.44 TB/s.)

And it seems to scale really well.

Mostly the two pain point i have are not directly related to this pr :

  1. number of ranges directly depends on max_final_thread, it is like max_threads for normal queries. But for distributed queries, would love to have a way to tweak threads per shard based on data size, some setting like max_range_size so that it ajusts to data to process
  2. Minor and not directly related it seems it cannot stack final with optimze_aggregation_in_order, and keep using the ranges in upper layer (but this can somehow be worked around with smarter queries)

@nickitat
Copy link
Member Author

nickitat commented Jun 3, 2022

We have been running this code for 2 weeks, and from my perspective, it is very promising and delivering huge performance gain tada . Here are some numbers (unfortunately i did not get any number before patch to compare, but i can tell you it was an order of magnitude slower).

311.49 billion rows, 13.44 TB (33.43 billion rows/s., 1.44 TB/s.)

And it seems to scale really well.

Mostly the two pain point i have are not directly related to this pr :

  1. number of ranges directly depends on max_final_thread, it is like max_threads for normal queries. But for distributed queries, would love to have a way to tweak threads per shard based on data size, some setting like max_range_size so that it ajusts to data to process
  2. Minor and not directly related it seems it cannot stack final with optimze_aggregation_in_order, and keep using the ranges in upper layer (but this can somehow be worked around with smarter queries)

thank you for the feedback!
regarding your points:

  1. each node will do the splitting on its own (in the best way the current implementation could provide), so in this sense, each node will adjust to the data it is looking into. what do you want to achieve by asking a node to split the data into more ranges than it has CPU cores?
  2. we definitely want to integrate this splitting functionality with aggregation in order. without cross-thread merging, AIO should perform significantly better

@UnamedRus
Copy link
Contributor

And it can make sense to integrate with parallel replicas processing: #26748

@jorisgio
Copy link
Contributor

jorisgio commented Jun 10, 2022

each node will do the splitting on its own (in the best way the current implementation could provide), so in this sense, each node will adjust to the data it is looking into. what do you want to achieve by asking a node to split the data into more ranges than it has CPU cores?

if you have a server with high core count, but many load. It makes sense to run with max_final_threads=16 to process many requests in parallel. But if there is some imbalance in data distribution, one server might have twice more data, and this could benefit of having max_final_threads=32 only there. So having a setting saying 'max_ranges_size' instead would achieve that, with max_final_threads as an upper bound ?

we definitely want to integrate this splitting functionality with aggregation in order. without cross-thread merging, AIO should perform significantly better

That would be great 💯 my question specifically though is do you plan to make it stack ? For cases like :

SELECT * FROM table FINAL WHERE somefilters LIMIT 1 BY some_prefix_of_pk

a good usecase to get first/last entry in time log data
Or stuff like

SELECT count(events) FROM table FINAL WHERE somefilters GROUP BY some_prefix_of_pk

In those case it makes a lot of sense to have GROUP BY operating on the same ranges of final ? though that is not totally trivial actually, because the ranges are computed for full primary key, not the prefix so some aggregation are not local, without using the prefix only to compute ranges.

@nickitat
Copy link
Member Author

if you have a server with high core count, but many load. It makes sense to run with max_final_threads=16 to process many requests in parallel. But if there is some imbalance in data distribution, one server might have twice more data, and this could benefit of having max_final_threads=32 only there. So having a setting saying 'max_ranges_size' instead would achieve that, with max_final_threads as an upper bound ?

final won't run faster if you set max_final_threads to a value higher than amount of cores, because each final thread utilizes it's core (assuming data distributed evenly) and in practice it works as if it won't let any other thread to execute smth until it finishes. memory consumption also won't decrease, if you increase the number of final threads.

In those case it makes a lot of sense to have GROUP BY operating on the same ranges of final

yep, AIO should be able to use splitting regardless of presence of final in query

src/QueryPipeline/printPipeline.h Show resolved Hide resolved
src/Processors/QueryPlan/PartsSplitter.cpp Show resolved Hide resolved
src/Processors/QueryPlan/PartsSplitter.cpp Show resolved Hide resolved
src/Processors/QueryPlan/PartsSplitter.cpp Show resolved Hide resolved
src/Processors/QueryPlan/PartsSplitter.cpp Show resolved Hide resolved
src/Processors/QueryPlan/PartsSplitter.cpp Show resolved Hide resolved
@nickitat
Copy link
Member Author

@rschu1ze Robert, thank you for the review. I will make changes in a separate pr, to let this one to lend into the release.

@nickitat nickitat merged commit c8afeaf into ClickHouse:master Jun 15, 2022
tavplubix added a commit that referenced this pull request Jun 15, 2022
@tavplubix
Copy link
Member

@nickitat, you forgot to take a look at failed tests before merging

nickitat added a commit to nickitat/ClickHouse that referenced this pull request Jun 15, 2022
@tavplubix tavplubix mentioned this pull request Jun 16, 2022
amosbird pushed a commit to amosbird/ClickHouse that referenced this pull request Jun 22, 2022
nickitat added a commit that referenced this pull request Jun 22, 2022
* Revert "Revert "More parallel execution for queries with `FINAL` (#36396)""

This reverts commit 5bfb152.

* fix tests

* fix review suggestions

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
can be tested Allows running workflows for external contributors pr-performance Pull request with some performance improvements
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Speeding up Final by 500% by splitting query into UNION ALL of non overlapping PK ranges
7 participants