Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithreading after window functions #50771

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
ae28b0d
feat: Preserve number of streams after evaluation the window function…
frinkr Jun 9, 2023
af18f0c
Merge remote-tracking branch 'my/master' into feat-preserve-num-strea…
frinkr Jun 9, 2023
5901866
fix style
frinkr Jun 11, 2023
040ed74
Merge branch 'master' into feat-preserve-num-streams-window-function
frinkr Jun 11, 2023
e89cb68
fix style
frinkr Jun 12, 2023
9b709f2
Merge remote-tracking branch 'refs/remotes/my/feat-preserve-num-strea…
frinkr Jun 12, 2023
3e14c9d
Merge branch 'master' into feat-preserve-num-streams-window-function
frinkr Jun 12, 2023
ef256dd
fix style
frinkr Jun 12, 2023
a6fe651
setting query_plan_preserve_num_streams_after_window_functions defaul…
frinkr Jun 14, 2023
d1c714a
Merge branch 'master' into feat-preserve-num-streams-window-function
frinkr Jun 14, 2023
db845b0
fix tests by SETTINGS query_plan_preserve_num_streams_after_window_fu…
frinkr Jun 14, 2023
2415933
fix test references
frinkr Jun 15, 2023
3ffabc1
Merge branch 'ClickHouse:master' into feat-preserve-num-streams-windo…
frinkr Jun 15, 2023
e2786b2
Resize the streams after the last window function, to keep the order …
frinkr Jun 19, 2023
a928650
Merge branch 'master' into feat-preserve-num-streams-window-function
frinkr Jul 31, 2023
963d813
Merge branch 'ClickHouse:master' into feat-preserve-num-streams-windo…
frinkr Jul 31, 2023
f0c5679
Merge branch 'ClickHouse:master' into feat-preserve-num-streams-windo…
frinkr Jul 31, 2023
5584408
feat: Preserve number of streams after evaluation the window function…
frinkr Jun 9, 2023
f10b071
fix style
frinkr Jun 11, 2023
e2bf27f
fix style
frinkr Jun 12, 2023
f165e6c
fix style
frinkr Jun 12, 2023
973fd7d
setting query_plan_preserve_num_streams_after_window_functions defaul…
frinkr Jun 14, 2023
776c533
fix tests by SETTINGS query_plan_preserve_num_streams_after_window_fu…
frinkr Jun 14, 2023
bbad83f
fix test references
frinkr Jun 15, 2023
3cdeff7
Resize the streams after the last window function, to keep the order …
frinkr Jun 19, 2023
67eff20
Merge branch 'feat-preserve-num-streams-window-function' of github.co…
frinkr Jul 31, 2023
fb48070
Merge branch 'ClickHouse:master' into feat-preserve-num-streams-windo…
frinkr Sep 26, 2023
7d20fb0
Merge branch 'ClickHouse:master' into feat-preserve-num-streams-windo…
frinkr Sep 27, 2023
6614045
add perf test
frinkr Sep 27, 2023
a5c138c
perf: change the dataset from 50M to 5M
frinkr Sep 28, 2023
8f3f23b
Merge branch 'master' into feat-preserve-num-streams-window-function
nickitat Oct 11, 2023
3b4e4d2
rename query_plan_preserve_num_streams_after_window_functions -> quer…
frinkr Oct 13, 2023
4bf1ec9
update test reference
frinkr Oct 13, 2023
c4dfe41
Merge branch 'master' into feat-preserve-num-streams-window-function
nickitat Oct 16, 2023
83a24e8
Merge branch 'ClickHouse:master' into feat-preserve-num-streams-windo…
frinkr Oct 17, 2023
ed029fa
Merge branch 'ClickHouse:master' into feat-preserve-num-streams-windo…
frinkr Oct 25, 2023
ac07b34
fix clang-tidy
frinkr Oct 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Core/Settings.h
Expand Up @@ -684,6 +684,7 @@ class IColumn;
M(Bool, query_plan_aggregation_in_order, true, "Use query plan for aggregation-in-order optimisation", 0) \
M(Bool, query_plan_remove_redundant_sorting, true, "Remove redundant sorting in query plan. For example, sorting steps related to ORDER BY clauses in subqueries", 0) \
M(Bool, query_plan_remove_redundant_distinct, true, "Remove redundant Distinct step in query plan", 0) \
M(Bool, query_plan_enable_multithreading_after_window_functions, true, "Enable multithreading after evaluating window functions to allow parallel stream processing", 0) \
M(UInt64, regexp_max_matches_per_row, 1000, "Max matches of any single regexp per row, used to safeguard 'extractAllGroupsHorizontal' against consuming too much memory with greedy RE.", 0) \
\
M(UInt64, limit, 0, "Limit on read rows from the most 'end' result for select query, default 0 means no limit length", 0) \
Expand Down
6 changes: 5 additions & 1 deletion src/Interpreters/InterpreterSelectQuery.cpp
Expand Up @@ -2936,7 +2936,11 @@ void InterpreterSelectQuery::executeWindow(QueryPlan & query_plan)
query_plan.addStep(std::move(sorting_step));
}

auto window_step = std::make_unique<WindowStep>(query_plan.getCurrentDataStream(), window, window.window_functions);
// Fan out streams only for the last window to preserve the ordering between windows,
// and WindowTransform works on single stream anyway.
const bool streams_fan_out = settings.query_plan_enable_multithreading_after_window_functions && ((i + 1) == windows_sorted.size());

auto window_step = std::make_unique<WindowStep>(query_plan.getCurrentDataStream(), window, window.window_functions, streams_fan_out);
window_step->setStepDescription("Window step for window '" + window.window_name + "'");

query_plan.addStep(std::move(window_step));
Expand Down
6 changes: 5 additions & 1 deletion src/Planner/Planner.cpp
Expand Up @@ -905,8 +905,12 @@ void addWindowSteps(QueryPlan & query_plan,
query_plan.addStep(std::move(sorting_step));
}

// Fan out streams only for the last window to preserve the ordering between windows,
// and WindowTransform works on single stream anyway.
const bool streams_fan_out = settings.query_plan_enable_multithreading_after_window_functions && ((i + 1) == window_descriptions_size);

auto window_step
= std::make_unique<WindowStep>(query_plan.getCurrentDataStream(), window_description, window_description.window_functions);
= std::make_unique<WindowStep>(query_plan.getCurrentDataStream(), window_description, window_description.window_functions, streams_fan_out);
window_step->setStepDescription("Window step for window '" + window_description.window_name + "'");
query_plan.addStep(std::move(window_step));
}
Expand Down
17 changes: 13 additions & 4 deletions src/Processors/QueryPlan/WindowStep.cpp
Expand Up @@ -10,14 +10,14 @@
namespace DB
{

static ITransformingStep::Traits getTraits()
static ITransformingStep::Traits getTraits(bool preserves_sorting)
{
return ITransformingStep::Traits
{
{
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,
.preserves_sorting = preserves_sorting,
},
{
.preserves_number_of_rows = true
Expand Down Expand Up @@ -46,10 +46,12 @@ static Block addWindowFunctionResultColumns(const Block & block,
WindowStep::WindowStep(
const DataStream & input_stream_,
const WindowDescription & window_description_,
const std::vector<WindowFunctionDescription> & window_functions_)
: ITransformingStep(input_stream_, addWindowFunctionResultColumns(input_stream_.header, window_functions_), getTraits())
const std::vector<WindowFunctionDescription> & window_functions_,
bool streams_fan_out_)
: ITransformingStep(input_stream_, addWindowFunctionResultColumns(input_stream_.header, window_functions_), getTraits(!streams_fan_out_))
, window_description(window_description_)
, window_functions(window_functions_)
, streams_fan_out(streams_fan_out_)
{
// We don't remove any columns, only add, so probably we don't have to update
// the output DataStream::distinct_columns.
Expand All @@ -60,6 +62,8 @@ WindowStep::WindowStep(

void WindowStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
auto num_threads = pipeline.getNumThreads();

// This resize is needed for cases such as `over ()` when we don't have a
// sort node, and the input might have multiple streams. The sort node would
// have resized it.
Expand All @@ -72,6 +76,11 @@ void WindowStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQ
input_streams.front().header, output_stream->header, window_description, window_functions);
});

if (streams_fan_out)
{
pipeline.resize(num_threads);
}

assertBlocksHaveEqualStructure(pipeline.getHeader(), output_stream->header,
"WindowStep transform for '" + window_description.window_name + "'");
}
Expand Down
4 changes: 3 additions & 1 deletion src/Processors/QueryPlan/WindowStep.h
Expand Up @@ -16,7 +16,8 @@ class WindowStep : public ITransformingStep
public:
explicit WindowStep(const DataStream & input_stream_,
const WindowDescription & window_description_,
const std::vector<WindowFunctionDescription> & window_functions_);
const std::vector<WindowFunctionDescription> & window_functions_,
bool streams_fan_out_);

String getName() const override { return "Window"; }

Expand All @@ -32,6 +33,7 @@ class WindowStep : public ITransformingStep

WindowDescription window_description;
std::vector<WindowFunctionDescription> window_functions;
bool streams_fan_out;
};

}
69 changes: 69 additions & 0 deletions tests/performance/window_functions_downstream_multithreading.xml
@@ -0,0 +1,69 @@
<test>
<create_query>
CREATE TABLE
window_test(id Int64, value Int64, partition Int64, msg String)
Engine=MergeTree
ORDER BY id
</create_query>

<fill_query>
INSERT INTO window_test
SELECT number, rand(1) % 500, number % 3000, randomPrintableASCII(2) FROM numbers(5000000)
</fill_query>

<query>
SELECT id,
AVG(value) OVER (ORDER BY id ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS frame1,
MAX(value) OVER (ORDER BY id ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS frame2,
sipHash64(frame1),
sipHash64(frame2)
FROM window_test
</query>

<query>
SELECT id AS key,
sipHash64(sum(frame)) AS value
FROM (
SELECT id,
AVG(value) OVER (ORDER BY id ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS frame
FROM window_test)
GROUP BY key
ORDER BY key, value
</query>

<query>
SELECT id % 100000 AS key,
sipHash64(sum(frame)) AS value
FROM (
SELECT id,
AVG(value) OVER (ORDER BY id ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS frame
FROM window_test)
GROUP BY key
ORDER BY key, value
</query>

<query>
WITH 'xxxxyyyyxxxxyyyyxxxxyyyyxxxxyyyy' AS cipherKey
SELECT id,
AVG(value) OVER (ORDER BY id ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS frame,
toString(frame) AS str,
encrypt('aes-256-ofb', str, cipherKey) AS enc,
decrypt('aes-256-ofb', str, cipherKey) AS dec
FROM window_test
</query>

<query>
SELECT id,
AVG(value) OVER (PARTITION by partition ORDER BY id ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS frame
FROM window_test
ORDER BY id
</query>

<query>
SELECT DISTINCT AVG(value) OVER (PARTITION by partition ORDER BY id ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS frame
FROM window_test
ORDER BY frame
</query>

<drop_query>DROP TABLE IF EXISTS window_test</drop_query>
</test>
@@ -1,18 +1,18 @@
-- { echo }
select row_number() over (order by dummy) from (select * from remote('127.0.0.{1,2}', system, one));
select row_number() over (order by dummy) as x from (select * from remote('127.0.0.{1,2}', system, one)) order by x;
1
2
select row_number() over (order by dummy) from remote('127.0.0.{1,2}', system, one);
select row_number() over (order by dummy) as x from remote('127.0.0.{1,2}', system, one) order by x;
1
2
select max(identity(dummy + 1)) over () from remote('127.0.0.{1,2}', system, one);
select max(identity(dummy + 1)) over () as x from remote('127.0.0.{1,2}', system, one) order by x;
1
1
drop table if exists t_01568;
create table t_01568 engine Memory as
select intDiv(number, 3) p, modulo(number, 3) o, number
from numbers(9);
select sum(number) over w, max(number) over w from t_01568 window w as (partition by p);
select sum(number) over w as x, max(number) over w as y from t_01568 window w as (partition by p) order by x, y;
3 2
3 2
3 2
Expand All @@ -22,7 +22,7 @@ select sum(number) over w, max(number) over w from t_01568 window w as (partitio
21 8
21 8
21 8
select sum(number) over w, max(number) over w from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p);
select sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p) order by x, y;
6 2
6 2
6 2
Expand All @@ -41,23 +41,23 @@ select sum(number) over w, max(number) over w from remote('127.0.0.{1,2}', '', t
42 8
42 8
42 8
select distinct sum(number) over w, max(number) over w from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p);
select distinct sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p) order by x, y;
6 2
24 5
42 8
-- window functions + aggregation w/shards
select groupArray(groupArray(number)) over (rows unbounded preceding) from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3);
select groupArray(groupArray(number)) over (rows unbounded preceding) as x from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) order by x;
[[0,3,6,0,3,6]]
[[0,3,6,0,3,6],[1,4,7,1,4,7]]
[[0,3,6,0,3,6],[1,4,7,1,4,7],[2,5,8,2,5,8]]
select groupArray(groupArray(number)) over (rows unbounded preceding) from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) settings distributed_group_by_no_merge=1;
select groupArray(groupArray(number)) over (rows unbounded preceding) as x from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) order by x settings distributed_group_by_no_merge=1;
[[0,3,6]]
[[0,3,6],[1,4,7]]
[[0,3,6],[1,4,7],[2,5,8]]
[[0,3,6]]
[[0,3,6],[1,4,7]]
[[0,3,6],[1,4,7],[2,5,8]]
select groupArray(groupArray(number)) over (rows unbounded preceding) from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) settings distributed_group_by_no_merge=2; -- { serverError 48 }
select groupArray(groupArray(number)) over (rows unbounded preceding) as x from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) order by x settings distributed_group_by_no_merge=2; -- { serverError 48 }
-- proper ORDER BY w/window functions
select p, o, count() over (partition by p)
from remote('127.0.0.{1,2}', '', t_01568)
Expand Down
18 changes: 9 additions & 9 deletions tests/queries/0_stateless/01568_window_functions_distributed.sql
@@ -1,28 +1,28 @@
-- Tags: distributed

-- { echo }
select row_number() over (order by dummy) from (select * from remote('127.0.0.{1,2}', system, one));
select row_number() over (order by dummy) as x from (select * from remote('127.0.0.{1,2}', system, one)) order by x;

select row_number() over (order by dummy) from remote('127.0.0.{1,2}', system, one);
select row_number() over (order by dummy) as x from remote('127.0.0.{1,2}', system, one) order by x;

select max(identity(dummy + 1)) over () from remote('127.0.0.{1,2}', system, one);
select max(identity(dummy + 1)) over () as x from remote('127.0.0.{1,2}', system, one) order by x;

drop table if exists t_01568;

create table t_01568 engine Memory as
select intDiv(number, 3) p, modulo(number, 3) o, number
from numbers(9);

select sum(number) over w, max(number) over w from t_01568 window w as (partition by p);
select sum(number) over w as x, max(number) over w as y from t_01568 window w as (partition by p) order by x, y;

select sum(number) over w, max(number) over w from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p);
select sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p) order by x, y;

select distinct sum(number) over w, max(number) over w from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p);
select distinct sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p) order by x, y;

-- window functions + aggregation w/shards
select groupArray(groupArray(number)) over (rows unbounded preceding) from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3);
select groupArray(groupArray(number)) over (rows unbounded preceding) from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) settings distributed_group_by_no_merge=1;
select groupArray(groupArray(number)) over (rows unbounded preceding) from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) settings distributed_group_by_no_merge=2; -- { serverError 48 }
select groupArray(groupArray(number)) over (rows unbounded preceding) as x from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) order by x;
select groupArray(groupArray(number)) over (rows unbounded preceding) as x from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) order by x settings distributed_group_by_no_merge=1;
select groupArray(groupArray(number)) over (rows unbounded preceding) as x from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) order by x settings distributed_group_by_no_merge=2; -- { serverError 48 }

-- proper ORDER BY w/window functions
select p, o, count() over (partition by p)
Expand Down
3 changes: 2 additions & 1 deletion tests/queries/0_stateless/01571_window_functions.reference
Expand Up @@ -19,7 +19,8 @@ system stop merges order_by_const;
INSERT INTO order_by_const(a, b, c, d) VALUES (1, 1, 101, 1), (1, 2, 102, 1), (1, 3, 103, 1), (1, 4, 104, 1);
INSERT INTO order_by_const(a, b, c, d) VALUES (1, 5, 104, 1), (1, 6, 105, 1), (2, 1, 106, 2), (2, 1, 107, 2);
INSERT INTO order_by_const(a, b, c, d) VALUES (2, 2, 107, 2), (2, 3, 108, 2), (2, 4, 109, 2);
SELECT row_number() OVER (order by 1, a) FROM order_by_const;
-- output 1 sorted stream
SELECT row_number() OVER (order by 1, a) FROM order_by_const SETTINGS query_plan_enable_multithreading_after_window_functions=0;
1
2
3
Expand Down
4 changes: 3 additions & 1 deletion tests/queries/0_stateless/01571_window_functions.sql
Expand Up @@ -20,7 +20,9 @@ system stop merges order_by_const;
INSERT INTO order_by_const(a, b, c, d) VALUES (1, 1, 101, 1), (1, 2, 102, 1), (1, 3, 103, 1), (1, 4, 104, 1);
INSERT INTO order_by_const(a, b, c, d) VALUES (1, 5, 104, 1), (1, 6, 105, 1), (2, 1, 106, 2), (2, 1, 107, 2);
INSERT INTO order_by_const(a, b, c, d) VALUES (2, 2, 107, 2), (2, 3, 108, 2), (2, 4, 109, 2);
SELECT row_number() OVER (order by 1, a) FROM order_by_const;

-- output 1 sorted stream
SELECT row_number() OVER (order by 1, a) FROM order_by_const SETTINGS query_plan_enable_multithreading_after_window_functions=0;

drop table order_by_const;

Expand Down