Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport #46785 to 23.2: Fix combined PREWHERE column accumulated from multiple steps #46828

Merged
merged 1 commit into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 7 additions & 10 deletions src/Storages/MergeTree/MergeTreeRangeReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1076,7 +1076,7 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar

read_result.checkInternalConsistency();

if (!read_result.can_return_prewhere_column_without_filtering)
if (!read_result.can_return_prewhere_column_without_filtering && last_reader_in_chain)
{
if (!read_result.filterWasApplied())
{
Expand Down Expand Up @@ -1380,17 +1380,14 @@ void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & r
current_step_filter = result.columns[prewhere_column_pos];
}

/// In case when we are returning prewhere column the caller expects it to serve as a final filter:
/// it must contain 0s not only from the current step but also from all the previous steps.
/// One way to achieve this is to apply the final_filter if we know that the final_filter was not applied at
/// several previous steps but was accumulated instead.
result.can_return_prewhere_column_without_filtering = result.filterWasApplied();

if (prewhere_info->remove_column)
result.columns.erase(result.columns.begin() + prewhere_column_pos);
else
{
/// In case when we are not removing prewhere column the caller expects it to serve as a final filter:
/// it must contain 0s not only from the current step but also from all the previous steps.
/// One way to achieve this is to apply the final_filter if we know that the final _filter was not applied at
/// several previous steps but was accumulated instead.
result.can_return_prewhere_column_without_filtering =
(!result.final_filter.present() || result.final_filter.countBytesInFilter() == result.num_rows);
}

FilterWithCachedCount current_filter(current_step_filter);

Expand Down
131 changes: 131 additions & 0 deletions tests/queries/0_stateless/02473_multistep_split_prewhere.python
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#!/usr/bin/env python3
import requests
import os
import sys

CURDIR = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(0, os.path.join(CURDIR, 'helpers'))

from pure_http_client import ClickHouseClient


class Tester:
'''
- Creates test table with multiple integer columns
- Runs read queries with multiple range conditions on different columns in PREWHERE and check that the result is correct
'''
def __init__(self, session, url, index_granularity, total_rows):
self.session = session
self.url = url
self.index_granularity = index_granularity
self.total_rows = total_rows
self.reported_errors = set()
self.repro_queries = []

def report_error(self):
print('Repro steps:', '\n\n\t'.join(self.repro_queries))
exit(1)

def query(self, query_text, include_in_repro_steps = True, expected_data = None):
self.repro_queries.append(query_text)
resp = self.session.post(self.url, data=query_text)
if resp.status_code != 200:
# Group similar errors
error = resp.text[0:40]
if error not in self.reported_errors:
self.reported_errors.add(error)
print('Code:', resp.status_code)
print('Result:', resp.text)
self.report_error()

result = resp.text
# Check that the result is as expected
if ((not expected_data is None) and (int(result) != len(expected_data))):
print('Expected {} rows, got {}'.format(len(expected_data), result))
print('Expected data:' + str(expected_data))
self.report_error()

if not include_in_repro_steps:
self.repro_queries.pop()


def check_data(self, all_data, c_range_start, c_range_end, d_range_start, d_range_end):
for to_select in ['count()', 'sum(e)']: # Test reading with and without column with default value
self.query('SELECT {} FROM tab_02473;'.format(to_select), False, all_data)

delta = 10
for b_range_start in [0, delta]:
for b_range_end in [self.total_rows - delta]: #, self.total_rows]:
expected = all_data[
(all_data.a == 0) &
(all_data.b > b_range_start) &
(all_data.b <= b_range_end)]
self.query('SELECT {} from tab_02473 PREWHERE b > {} AND b <= {} WHERE a == 0;'.format(
to_select, b_range_start, b_range_end), False, expected)

expected = all_data[
(all_data.a == 0) &
(all_data.b > b_range_start) &
(all_data.b <= b_range_end) &
(all_data.c > c_range_start) &
(all_data.c <= c_range_end)]
self.query('SELECT {} from tab_02473 PREWHERE b > {} AND b <= {} AND c > {} AND c <= {} WHERE a == 0;'.format(
to_select, b_range_start, b_range_end, c_range_start, c_range_end), False, expected)

expected = all_data[
(all_data.a == 0) &
(all_data.b > b_range_start) &
(all_data.b <= b_range_end) &
(all_data.c > c_range_start) &
(all_data.c <= c_range_end) &
(all_data.d > d_range_start) &
(all_data.d <= d_range_end)]
self.query('SELECT {} from tab_02473 PREWHERE b > {} AND b <= {} AND c > {} AND c <= {} AND d > {} AND d <= {} WHERE a == 0;'.format(
to_select, b_range_start, b_range_end, c_range_start, c_range_end, d_range_start, d_range_end), False, expected)


def run_test(self, c_range_start, c_range_end, d_range_start, d_range_end):
self.repro_queries = []

self.query('''
CREATE TABLE tab_02473 (a Int8, b Int32, c Int32, d Int32, PRIMARY KEY (a))
ENGINE = MergeTree() ORDER BY (a, b)
SETTINGS min_bytes_for_wide_part = 0, index_granularity = {};'''.format(self.index_granularity))

self.query('INSERT INTO tab_02473 select 0, number+1, number+1, number+1 FROM numbers({});'.format(self.total_rows))

client = ClickHouseClient()
all_data = client.query_return_df("SELECT a, b, c, d, 1 as e FROM tab_02473 FORMAT TabSeparatedWithNames;")

self.query('OPTIMIZE TABLE tab_02473 FINAL SETTINGS mutations_sync=2;')

# After all data has been written add a column with default value
self.query('ALTER TABLE tab_02473 ADD COLUMN e Int64 DEFAULT 1;')

self.check_data(all_data, c_range_start, c_range_end, d_range_start, d_range_end)

self.query('DROP TABLE tab_02473;')



def main():
# Enable multiple prewhere read steps
url = os.environ['CLICKHOUSE_URL'] + '&enable_multiple_prewhere_read_steps=1&move_all_conditions_to_prewhere=0&max_threads=1'

default_index_granularity = 10;
total_rows = 8 * default_index_granularity
step = default_index_granularity
session = requests.Session()
for index_granularity in [default_index_granularity-1, default_index_granularity]:
tester = Tester(session, url, index_granularity, total_rows)
# Test combinations of ranges of columns c and d
for c_range_start in range(0, total_rows, int(2.3 * step)):
for c_range_end in range(c_range_start + 3 * step, total_rows, int(2.1 * step)):
for d_range_start in range(int(0.5 * step), total_rows, int(2.7 * step)):
for d_range_end in range(d_range_start + 3 * step, total_rows, int(2.2 * step)):
tester.run_test(c_range_start, c_range_end, d_range_start, d_range_end)


if __name__ == "__main__":
main()

Empty file.
11 changes: 11 additions & 0 deletions tests/queries/0_stateless/02473_multistep_split_prewhere.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/usr/bin/env bash
# Tags: long

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh

# We should have correct env vars from shell_config.sh to run this test

python3 "$CURDIR"/02473_multistep_split_prewhere.python

Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
DROP TABLE IF EXISTS t_02559;

CREATE TABLE t_02559 (
key UInt64,
value Array(String))
ENGINE = MergeTree
ORDER BY key
SETTINGS index_granularity=400, min_bytes_for_wide_part=0;

INSERT INTO t_02559 SELECT number,
if (number < 100 OR number > 1000,
[toString(number)],
emptyArrayString())
FROM numbers(2000);

SET enable_multiple_prewhere_read_steps=1, move_all_conditions_to_prewhere=1;

SELECT * FROM t_02559
WHERE (key < 5 OR key > 500)
AND NOT has(value, toString(key))
AND length(value) == 1
LIMIT 10
SETTINGS max_block_size = 81,
max_threads = 1;

DROP TABLE IF EXISTS t_02559;