Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WHERE clauses optimization leveraging Projection feature #63207

Open
wants to merge 14 commits into
base: master
Choose a base branch
from

Conversation

SuzyWangIBMer
Copy link
Contributor

@SuzyWangIBMer SuzyWangIBMer commented Apr 30, 2024

Refer to original PR #57216, porting code to analyzer passes to align with other ClickHouse optimization features.

This is a proposal to optimize select query by leveraging projection feature.
Since PROJECTION can effectively create a new primary key for the table, it can increase the searching speed if using properly.
This implementation inserts a new subquery to the original where condition, in order to use the projection feature in more general cases.
For example,
select * from table where secondary_key='-42';
will now become
select * from table where primary_key in (select primary_key from test_a where secondary_key='-42') and secondary_key='-42';
This implementation is tested and proved increasing query execution speed vastly.

Thanks to @UnamedRus 's suggestion, indexHint() is also added to further optimize the query. Now the query is going to be re-write to select * from table where indexHint(primary_key in (select primary_key from test_a where secondary_key='-42')) and secondary_key='-42';
It is proved that this will also optimize query execution speed. Tests results are attached below.

Pseudo Code

for each query (including subqueries):
   for each predicate in WHERE clause in the query:
	 1. check if it's a simple condition <column>=<value>;
	 2. check if <column> is NOT the primary key of the table;
	 3. check if <column> is the primary key of a projection that contains table primary keys;
	 4. if all condition met, rewrite the query as mentioned above and break

Restrictions

  1. Currently the re-write only works for WHERE condition with no subquery. (e.x Select * Where src in () will not be optimized).
  2. The re-write only support WHERE condition that is using =, !=, in, notIn function
  3. The re-write can work with Alias but not function wrapping around columns.
    For example,
SELECT
      user_account AS my_user,
      user_id
FROM events
WHERE my_user = 'admin'

can be re-written to

SELECT                                                     
     user_account AS my_user,                    
     user_id
 FROM events                                                                          
 WHERE (my_user = 'admin') AND indexHint( primary_key IN (
     SELECT primary_key 
     FROM events                                           
     WHERE (user_account AS my_user) = 'admin'
 )) 

However, query like the following will not be recognized, and will not be re-written.

SELECT
      If(user_id = 0, NULL, user_account) AS user_id AS my_user,
      user_id
FROM events
WHERE my_user = 'admin'

  1. Aggregation Function projection is not supported.
  2. This re-written is implemented on top of projection feature, hence anything that Projection feature currently does not support will not be supported with this optimization.

Test result

CREATE TABLE test_a
(
    `src` String,
    `dst` String,
    `other_cols` String,
    PROJECTION p1
    (
        SELECT
            src,
            dst
        ORDER BY dst
    )
)
ENGINE = MergeTree
ORDER BY src

/* Query using primary key (fastest) */

palmtops1.fyre.ibm.com :) select * from test_a where src='42';

SELECT *
FROM test_a
WHERE src = '42'

Query id: 9220dadb-bdbc-4fe5-bf70-b4c04b3e7ddd

┌─src─┬─dst─┬─other_cols───┐
│ 42  │ -42 │ other_col 42 │
└─────┴─────┴──────────────┘

1 row in set. Elapsed: 0.042 sec. Processed 16.38 thousand rows, 619.85 KB (388.58 thousand rows/s., 14.70 MB/s.)
Peak memory usage: 44.66 KiB.

/* Query does not using projection */

palmtops1.fyre.ibm.com :) select * from test_a where dst='-42';

SELECT *
FROM test_a
WHERE dst = '-42'

Query id: 7835b759-e894-4e34-b51b-f54fe3d1645c

┌─src─┬─dst─┬─other_cols───┐
│ 42  │ -42 │ other_col 42 │
└─────┴─────┴──────────────┘

1 row in set. Elapsed: 2.177 sec. Processed 100.00 million rows, 1.79 GB (45.94 million rows/s., 821.86 MB/s.)
Peak memory usage: 1021.77 KiB.

/* Optimized non-projection query now can leveraging projection feature */

palmtops1.fyre.ibm.com :) select * from test_a where src in (select src from test_a where dst='-42') and dst='-42';

SELECT *
FROM test_a
WHERE (src IN (
    SELECT src
    FROM test_a
    WHERE dst = '-42'
)) AND (dst = '-42')

Query id: 68ca1b70-940f-41d2-983b-4df7c06df5ae

┌─src─┬─dst─┬─other_cols───┐
│ 42  │ -42 │ other_col 42 │
└─────┴─────┴──────────────┘

1 row in set. Elapsed: 0.101 sec. Processed 32.77 thousand rows, 1.18 MB (325.38 thousand rows/s., 11.75 MB/s.)
Peak memory usage: 275.28 KiB

/* Optimized query with indexHint */

palmtops1.fyre.ibm.com :) select * from test_a where indexHint(src in (select src from test_a where dst='-42')) and dst='-42';

SELECT *
FROM test_a
WHERE indexHint(src IN (
    SELECT src
    FROM test_a
    WHERE dst = '-42'
)) AND (dst = '-42')

Query id: 365d4fae-93dc-48e5-b0c0-42dc2ba9904e

┌─src─┬─dst─┬─other_cols───┐
│ 42  │ -42 │ other_col 42 │
└─────┴─────┴──────────────┘

1 row in set. Elapsed: 0.094 sec. Processed 32.77 thousand rows, 1.18 MB (350.31 thousand rows/s., 12.65 MB/s.)
Peak memory usage: 223.92 KiB.

Query using primary key (fastest) : 0.042 sec, Peak memory usage: 44.66 KiB
Query does not using projection : 2.177 sec, Peak memory usage: 1021.77 KiB.
Optimized non-projection query : 0.101 sec, Peak memory usage: 275.28 KiB
Optimized query with indexHint : 0.094 sec, Peak memory usage: 223.92 KiB.

Changelog category (leave one):

  • Performance Improvement

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):

Rewrite where clause in order to leverage PROJECTION feature.

Documentation entry for user-facing changes

  • Documentation is written (mandatory for new features)

Information about CI checks: https://clickhouse.com/docs/en/development/continuous-integration/


Modify your CI run:

NOTE: If your merge the PR with modified CI you MUST KNOW what you are doing
NOTE: Checked options will be applied if set before CI RunConfig/PrepareRunConfig step

Include tests (required builds will be added automatically):

  • Fast test
  • Integration Tests
  • Stateless tests
  • Stateful tests
  • Unit tests
  • Performance tests
  • All with ASAN
  • All with TSAN
  • All with Analyzer
  • Add your option here

Exclude tests:

  • Fast test
  • Integration Tests
  • Stateless tests
  • Stateful tests
  • Performance tests
  • All with ASAN
  • All with TSAN
  • All with MSAN
  • All with UBSAN
  • All with Coverage
  • All with Aarch64
  • Add your option here

Extra options:

  • do not test (only style check)
  • disable merge-commit (no merge from master before tests)
  • disable CI cache (job reuse)

Only specified batches in multi-batch jobs:

  • 1
  • 2
  • 3
  • 4

@thevar1able thevar1able added the can be tested Allows running workflows for external contributors label Apr 30, 2024
@robot-clickhouse-ci-2 robot-clickhouse-ci-2 added the pr-performance Pull request with some performance improvements label Apr 30, 2024
@robot-clickhouse-ci-2
Copy link
Contributor

robot-clickhouse-ci-2 commented Apr 30, 2024

This is an automated comment for commit cf38f59 with description of existing statuses. It's updated for the latest CI running

❌ Click here to open a full report in a separate page

Check nameDescriptionStatus
A SyncThere's no description for the check yet, please add it to tests/ci/ci_config.py:CHECK_DESCRIPTIONS⏳ pending
CI runningA meta-check that indicates the running CI. Normally, it's in success or pending state. The failed status indicates some problems with the PR⏳ pending
ClickHouse build checkBuilds ClickHouse in various configurations for use in further steps. You have to fix the builds that fail. Build logs often has enough information to fix the error, but you might have to reproduce the failure locally. The cmake options can be found in the build log, grepping for cmake. Use these options and follow the general build process❌ failure
Mergeable CheckChecks if all other necessary checks are successful❌ failure
Successful checks
Check nameDescriptionStatus
Docs checkBuilds and tests the documentation✅ success
Fast testNormally this is the first check that is ran for a PR. It builds ClickHouse and runs most of stateless functional tests, omitting some. If it fails, further checks are not started until it is fixed. Look at the report to see which tests fail, then reproduce the failure locally as described here✅ success
Integration testsThe integration tests report. In parenthesis the package type is given, and in square brackets are the optional part/total tests✅ success
PR CheckThere's no description for the check yet, please add it to tests/ci/ci_config.py:CHECK_DESCRIPTIONS✅ success
Stateful testsRuns stateful functional tests for ClickHouse binaries built in various configurations -- release, debug, with sanitizers, etc✅ success
Stateless testsRuns stateless functional tests for ClickHouse binaries built in various configurations -- release, debug, with sanitizers, etc✅ success
Style checkRuns a set of checks to keep the code style clean. If some of tests failed, see the related log from the report✅ success
Unit testsRuns the unit tests for different release types✅ success

@alexey-milovidov
Copy link
Member

Thank you! This looks interesting... I didn't check the code yet.
One question - how could it work better than the PREWHERE optimization?

According to my expectations, PREWHERE will scan the column for filtering and then read the remaining data from the positions that passed the filter.

@SuzyWangIBMer
Copy link
Contributor Author

Thank you! This looks interesting... I didn't check the code yet. One question - how could it work better than the PREWHERE optimization?

According to my expectations, PREWHERE will scan the column for filtering and then read the remaining data from the positions that passed the filter.

Hi Alexey, thanks for the response.

First, this is a add-on to PREWHERE not a replacement to PREWHERE.
If I do explain actions, I can still see PREWHERE kicks in.

EXPLAIN actions = 1
SELECT count()
FROM events
WHERE ipv4_addr_dst___value = '9.45.30.22'
SETTINGS optimize_project_query = 1

...
 Prewhere filter column: equals(__table1.ipv4_addr_dst___value, '9.45.30.22'_String)
...

Secondly, on top of PREWHERE, this optimization utilize the projection feature by including a sub select query select primary_key from table where key=value, this allows the query to use projection table. The code will not kick in if key is not a projection table primary key. Comparing the following example:

With optimize_project_query = 1, it filter down to Parts: 1/3, Granules: 1/1480. Processed 32.77 thousand rows
Where with this feature disabled, it was only get down to Parts: 3/3 , Granules: 1480/1480. Processed 12.00 million rows

EXPLAIN indexes = 1
SELECT count()
FROM events
WHERE (user_account___user_id = 'malware') OR ((ipv4_addr_dst___value = '9.45.30.22') AND (original_time > (now() - toIntervalDay(1))))
SETTINGS ignore_data_skipping_indices = 'idx_user_account_user_id, idx_all_ioc', optimize_project_query = 1

Query id: 8626e0e4-a8f8-4de5-ada8-b5d889e96942

    ┌─explain───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
 1. │ CreatingSets (Create sets before main query execution)                                                                                        │
 2. │   Expression ((Project names + Projection))                                                                                                   │
 3. │     Aggregating                                                                                                                               │
 4. │       Expression (Before GROUP BY)                                                                                                            │
 5. │         Filter ((WHERE + Change column names to column identifiers))                                                                          │
 6. │           ReadFromMergeTree (default.events)                                                                                                  │
 7. │           Indexes:                                                                                                                            │
 8. │             MinMax                                                                                                                            │
 9. │               Condition: true                                                                                                                 │
10. │               Parts: 3/3                                                                                                                      │
11. │               Granules: 1480/1480                                                                                                             │
12. │             Partition                                                                                                                         │
13. │               Condition: true                                                                                                                 │
14. │               Parts: 3/3                                                                                                                      │
15. │               Granules: 1480/1480                                                                                                             │
16. │             PrimaryKey                                                                                                                        │
17. │               Keys:                                                                                                                           │
18. │                 original_time                                                                                                                 │
19. │               Condition: or(and((original_time in ('1715115754', +Inf)), (original_time in 1-element set)), (original_time in 1-element set)) │
20. │               Parts: 1/3                                                                                                                      │
21. │               Granules: 1/1480                                                                                                                │
    └───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘


SELECT count()
FROM events
WHERE (user_account___user_id = 'malware') OR ((ipv4_addr_dst___value = '9.45.30.22') AND (original_time > (now() - toIntervalDay(1))))
SETTINGS ignore_data_skipping_indices = 'idx_user_account_user_id, idx_all_ioc', optimize_project_query = 1

Query id: 6230ae14-053e-4995-ab88-b19cf20b5729

   ┌─count()─┐
1. │       1 │
   └─────────┘

**1 row in set. Elapsed: 0.467 sec. Processed 32.77 thousand rows**, 689.30 KB (70.22 thousand rows/s., 1.48 MB/s.)
Peak memory usage: 171.15 KiB.
EXPLAIN indexes = 1
SELECT count()
FROM events
WHERE (user_account___user_id = 'malware') OR ((ipv4_addr_dst___value = '9.45.30.22') AND (original_time > (now() - toIntervalDay(1))))
SETTINGS ignore_data_skipping_indices = 'idx_user_account_user_id, idx_all_ioc', optimize_project_query = 0

Query id: e0fcc0c2-b83c-4abc-9aed-9d92742e7836

    ┌─explain────────────────────────────────────────────────────────────┐
 1. │ Expression ((Project names + Projection))                          │
 2. │   Aggregating                                                      │
 3. │     Expression (Before GROUP BY)                                   │
 4. │       Filter ((WHERE + Change column names to column identifiers)) │
 5. │         ReadFromMergeTree (default.events)                         │
 6. │         Indexes:                                                   │
 7. │           MinMax                                                   │
 8. │             Condition: true                                        │
 9. │             Parts: 3/3                                             │
10. │             Granules: 1480/1480                                    │
11. │           Partition                                                │
12. │             Condition: true                                        │
13. │             Parts: 3/3                                             │
14. │             Granules: 1480/1480                                    │
15. │           PrimaryKey                                               │
16. │             Condition: true                                        │
17. │             Parts: 3/3                                             │
18. │             Granules: 1480/1480                                    │
    └────────────────────────────────────────────────────────────────────┘

SELECT count()
FROM events
WHERE (user_account___user_id = 'malware') OR ((ipv4_addr_dst___value = '9.45.30.22') AND (original_time > (now() - toIntervalDay(1))))
SETTINGS ignore_data_skipping_indices = 'idx_user_account_user_id, idx_all_ioc', optimize_project_query = 0

Query id: abc6e313-b5f8-4a38-b846-eb6e13653e6d

   ┌─count()─┐
1. │       1 │
   └─────────┘

**1 row in set. Elapsed: 0.882 sec. Processed 12.00 million rows**, 372.05 MB (13.60 million rows/s., 421.70 MB/s.)
Peak memory usage: 922.42 KiB.

@nickitat nickitat self-assigned this May 8, 2024
@UnamedRus
Copy link
Contributor

With optimize_project_query = 1, it filter down to Parts: 1/3, Granules: 1/1480. Processed 32.77 thousand rows
Where with this feature disabled, it was only get down to Parts: 3/3 , Granules: 1480/1480. Processed 12.00 million rows

Actually, i guess it make sense to include part name as well, because it will allow to filter out parts which doesn't have interesting rows for us.

select * from table where indexHint((_part, primary_key) in (select _part, primary_key from test_a where secondary_key='-42')) and secondary_key='-42';

@SuzyWangIBMer
Copy link
Contributor Author

With optimize_project_query = 1, it filter down to Parts: 1/3, Granules: 1/1480. Processed 32.77 thousand rows
Where with this feature disabled, it was only get down to Parts: 3/3 , Granules: 1480/1480. Processed 12.00 million rows

Actually, i guess it make sense to include part name as well, because it will allow to filter out parts which doesn't have interesting rows for us.

select * from table where indexHint((_part, primary_key) in (select _part, primary_key from test_a where secondary_key='-42')) and secondary_key='-42';

Hi @UnamedRus , I think we discussed this in the original thread. Do we still need to re-calculate projection table if we try to include _part and _part_offset?

@UnamedRus
Copy link
Contributor

Do we still need to re-calculate projection table if we try to include _part and _part_offset?

_part_offset was different topic, it was more like: what if we will materialize and store _part_offset column from part in projection.

But here i'm talking about "virtual" column _part, which just have meaning of current part being queried, it should be always up to date, because it's virtual

@SuzyWangIBMer
Copy link
Contributor Author

Actually, i guess it make sense to include part name as well, because it will allow to filter out parts which doesn't have interesting rows for us.

select * from table where indexHint((_part, primary_key) in (select _part, primary_key from test_a where secondary_key='-42')) and secondary_key='-42';

Can you elaborate on this? I dont see any differences by including _part into the query.

EXPLAIN indexes = 1
SELECT count()
FROM events
WHERE ((user_account___user_id = 'malware') AND indexHint((event_type, original_time) IN (
    SELECT
        event_type,
        original_time
    FROM events
    WHERE (original_time > subtractHours(now(), 24)) AND (user_account___user_id = 'malware')
))) OR ((ipv4_addr_dst___value = '9.45.30.22') AND indexHint((event_type, original_time) IN (
    SELECT
        event_type,
        original_time
    FROM events
    WHERE (original_time > subtractHours(now(), 24)) AND (ipv4_addr_dst___value = '9.45.30.22')
)) AND (original_time > (now() - toIntervalDay(9))))
SETTINGS ignore_data_skipping_indices = 'idx_user_account_user_id, idx_all_ioc', optimize_project_query = 0

Query id: 2acdf344-1119-4829-970c-388c3507cc1e

    ┌─explain───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
 1. │ CreatingSets (Create sets before main query execution)                                                                                        │
 2. │   Expression ((Project names + Projection))                                                                                                   │
 3. │     Aggregating                                                                                                                               │
 4. │       Expression (Before GROUP BY)                                                                                                            │
 5. │         Filter ((WHERE + Change column names to column identifiers))                                                                          │
 6. │           ReadFromMergeTree (default.events)                                                                                                  │
 7. │           Indexes:                                                                                                                            │
 8. │             MinMax                                                                                                                            │
 9. │               Condition: true                                                                                                                 │
10. │               Parts: 7/7                                                                                                                      │
11. │               Granules: 1498/1498                                                                                                             │
12. │             Partition                                                                                                                         │
13. │               Condition: true                                                                                                                 │
14. │               Parts: 7/7                                                                                                                      │
15. │               Granules: 1498/1498                                                                                                             │
16. │             PrimaryKey                                                                                                                        │
17. │               Keys:                                                                                                                           │
18. │                 original_time                                                                                                                 │
19. │               Condition: or(and((original_time in ('1714502247', +Inf)), (original_time in 2-element set)), (original_time in 1-element set)) │
20. │               Parts: 2/7                                                                                                                      │
21. │               Granules: 2/1498                                                                                                                │
    └───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

21 rows in set. Elapsed: 0.642 sec. Processed 54.60 thousand rows, 1.18 MB (85.04 thousand rows/s., 1.84 MB/s.)
Peak memory usage: 321.88 KiB.


EXPLAIN indexes = 1
SELECT count()
FROM events
WHERE ((user_account___user_id = 'malware') AND indexHint((_part, event_type, original_time) IN (
    SELECT
        _part,
        event_type,
        original_time
    FROM events
    WHERE (original_time > subtractHours(now(), 24)) AND (user_account___user_id = 'malware')
))) OR ((ipv4_addr_dst___value = '9.45.30.22') AND indexHint((_part, event_type, original_time) IN (
    SELECT
        _part,
        event_type,
        original_time
    FROM events
    WHERE (original_time > subtractHours(now(), 24)) AND (ipv4_addr_dst___value = '9.45.30.22')
)) AND (original_time > (now() - toIntervalDay(9))))
SETTINGS ignore_data_skipping_indices = 'idx_user_account_user_id, idx_all_ioc', optimize_project_query = 0

Query id: 406fb88b-b00e-4c58-906c-b37fadfc305e

    ┌─explain───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
 1. │ CreatingSets (Create sets before main query execution)                                                                                        │
 2. │   Expression ((Project names + Projection))                                                                                                   │
 3. │     Aggregating                                                                                                                               │
 4. │       Expression (Before GROUP BY)                                                                                                            │
 5. │         Filter ((WHERE + Change column names to column identifiers))                                                                          │
 6. │           ReadFromMergeTree (default.events)                                                                                                  │
 7. │           Indexes:                                                                                                                            │
 8. │             MinMax                                                                                                                            │
 9. │               Condition: true                                                                                                                 │
10. │               Parts: 7/7                                                                                                                      │
11. │               Granules: 1498/1498                                                                                                             │
12. │             Partition                                                                                                                         │
13. │               Condition: true                                                                                                                 │
14. │               Parts: 7/7                                                                                                                      │
15. │               Granules: 1498/1498                                                                                                             │
16. │             PrimaryKey                                                                                                                        │
17. │               Keys:                                                                                                                           │
18. │                 original_time                                                                                                                 │
19. │               Condition: or(and((original_time in ('1714502284', +Inf)), (original_time in 2-element set)), (original_time in 1-element set)) │
20. │               Parts: 2/7                                                                                                                      │
21. │               Granules: 2/1498                                                                                                                │
    └───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

@UnamedRus
Copy link
Contributor

I dont see any differences by including _part into the query.

Difference could be visible event_type, original_time combination is not unique across parts

Imagine, if your order by is something like:

ORDER BY (tenant_id, event_id)

Because index is sparse, it does meant for

select * from table where indexHint((tenant_id, event_id) in (select tenant_id, event_id from test_a where secondary_key='-42')) and secondary_key='-42';

It will read at least 1 granule from each part. (it's still be win from perf improvement, but add unnecessary slowness)

@novikd novikd self-assigned this May 23, 2024
@novikd novikd added the analyzer Issues and pull-requests related to new analyzer label May 23, 2024
@novikd
Copy link
Member

novikd commented May 23, 2024

To begin with code review it requires to add a performance test.

@SuzyWangIBMer
Copy link
Contributor Author

To begin with code review it requires to add a performance test.

How to add performance test?

@novikd
Copy link
Member

novikd commented May 23, 2024

How to add performance test?

Please see the documentation: https://clickhouse.com/docs/en/development/tests#performance-tests

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
analyzer Issues and pull-requests related to new analyzer can be tested Allows running workflows for external contributors pr-performance Pull request with some performance improvements
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants