Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithreading after window functions #50771

Merged
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
ae28b0d
feat: Preserve number of streams after evaluation the window function…
frinkr Jun 9, 2023
af18f0c
Merge remote-tracking branch 'my/master' into feat-preserve-num-strea…
frinkr Jun 9, 2023
5901866
fix style
frinkr Jun 11, 2023
040ed74
Merge branch 'master' into feat-preserve-num-streams-window-function
frinkr Jun 11, 2023
e89cb68
fix style
frinkr Jun 12, 2023
9b709f2
Merge remote-tracking branch 'refs/remotes/my/feat-preserve-num-strea…
frinkr Jun 12, 2023
3e14c9d
Merge branch 'master' into feat-preserve-num-streams-window-function
frinkr Jun 12, 2023
ef256dd
fix style
frinkr Jun 12, 2023
a6fe651
setting query_plan_preserve_num_streams_after_window_functions defaul…
frinkr Jun 14, 2023
d1c714a
Merge branch 'master' into feat-preserve-num-streams-window-function
frinkr Jun 14, 2023
db845b0
fix tests by SETTINGS query_plan_preserve_num_streams_after_window_fu…
frinkr Jun 14, 2023
2415933
fix test references
frinkr Jun 15, 2023
3ffabc1
Merge branch 'ClickHouse:master' into feat-preserve-num-streams-windo…
frinkr Jun 15, 2023
e2786b2
Resize the streams after the last window function, to keep the order …
frinkr Jun 19, 2023
a928650
Merge branch 'master' into feat-preserve-num-streams-window-function
frinkr Jul 31, 2023
963d813
Merge branch 'ClickHouse:master' into feat-preserve-num-streams-windo…
frinkr Jul 31, 2023
f0c5679
Merge branch 'ClickHouse:master' into feat-preserve-num-streams-windo…
frinkr Jul 31, 2023
5584408
feat: Preserve number of streams after evaluation the window function…
frinkr Jun 9, 2023
f10b071
fix style
frinkr Jun 11, 2023
e2bf27f
fix style
frinkr Jun 12, 2023
f165e6c
fix style
frinkr Jun 12, 2023
973fd7d
setting query_plan_preserve_num_streams_after_window_functions defaul…
frinkr Jun 14, 2023
776c533
fix tests by SETTINGS query_plan_preserve_num_streams_after_window_fu…
frinkr Jun 14, 2023
bbad83f
fix test references
frinkr Jun 15, 2023
3cdeff7
Resize the streams after the last window function, to keep the order …
frinkr Jun 19, 2023
67eff20
Merge branch 'feat-preserve-num-streams-window-function' of github.co…
frinkr Jul 31, 2023
fb48070
Merge branch 'ClickHouse:master' into feat-preserve-num-streams-windo…
frinkr Sep 26, 2023
7d20fb0
Merge branch 'ClickHouse:master' into feat-preserve-num-streams-windo…
frinkr Sep 27, 2023
6614045
add perf test
frinkr Sep 27, 2023
a5c138c
perf: change the dataset from 50M to 5M
frinkr Sep 28, 2023
8f3f23b
Merge branch 'master' into feat-preserve-num-streams-window-function
nickitat Oct 11, 2023
3b4e4d2
rename query_plan_preserve_num_streams_after_window_functions -> quer…
frinkr Oct 13, 2023
4bf1ec9
update test reference
frinkr Oct 13, 2023
c4dfe41
Merge branch 'master' into feat-preserve-num-streams-window-function
nickitat Oct 16, 2023
83a24e8
Merge branch 'ClickHouse:master' into feat-preserve-num-streams-windo…
frinkr Oct 17, 2023
ed029fa
Merge branch 'ClickHouse:master' into feat-preserve-num-streams-windo…
frinkr Oct 25, 2023
ac07b34
fix clang-tidy
frinkr Oct 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,7 @@ class IColumn;
M(Bool, query_plan_remove_redundant_sorting, true, "Remove redundant sorting in query plan. For example, sorting steps related to ORDER BY clauses in subqueries", 0) \
M(Bool, query_plan_remove_redundant_distinct, true, "Remove redundant Distinct step in query plan", 0) \
M(Bool, query_plan_optimize_projection, true, "Use query plan for aggregation-in-order optimisation", 0) \
M(Bool, query_plan_preserve_num_streams_after_window_functions, true, "Preserve the number of streams after evaluating window functions to allow parallel stream processing", 0) \
M(UInt64, regexp_max_matches_per_row, 1000, "Max matches of any single regexp per row, used to safeguard 'extractAllGroupsHorizontal' against consuming too much memory with greedy RE.", 0) \
\
M(UInt64, limit, 0, "Limit on read rows from the most 'end' result for select query, default 0 means no limit length", 0) \
Expand Down
6 changes: 5 additions & 1 deletion src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2856,7 +2856,11 @@ void InterpreterSelectQuery::executeWindow(QueryPlan & query_plan)
query_plan.addStep(std::move(sorting_step));
}

auto window_step = std::make_unique<WindowStep>(query_plan.getCurrentDataStream(), window, window.window_functions);
// Restore the number of streams only for the last window to preserve the ordering between windows,
// and WindowTransform works on single stream anyway.
const bool preserve_num_streams = settings.query_plan_preserve_num_streams_after_window_functions && ((i + 1) == windows_sorted.size());

auto window_step = std::make_unique<WindowStep>(query_plan.getCurrentDataStream(), window, window.window_functions, preserve_num_streams);
window_step->setStepDescription("Window step for window '" + window.window_name + "'");

query_plan.addStep(std::move(window_step));
Expand Down
2 changes: 1 addition & 1 deletion src/Planner/Planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ void addWindowSteps(QueryPlan & query_plan,
}

auto window_step
= std::make_unique<WindowStep>(query_plan.getCurrentDataStream(), window_description, window_description.window_functions);
= std::make_unique<WindowStep>(query_plan.getCurrentDataStream(), window_description, window_description.window_functions, settings.query_plan_preserve_num_streams_after_window_functions);
frinkr marked this conversation as resolved.
Show resolved Hide resolved
window_step->setStepDescription("Window step for window '" + window_description.window_name + "'");
query_plan.addStep(std::move(window_step));
}
Expand Down
17 changes: 13 additions & 4 deletions src/Processors/QueryPlan/WindowStep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
namespace DB
{

static ITransformingStep::Traits getTraits()
static ITransformingStep::Traits getTraits(bool preserves_sorting)
{
return ITransformingStep::Traits
{
{
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,
.preserves_sorting = preserves_sorting,
},
{
.preserves_number_of_rows = true
Expand Down Expand Up @@ -46,10 +46,12 @@ static Block addWindowFunctionResultColumns(const Block & block,
WindowStep::WindowStep(
const DataStream & input_stream_,
const WindowDescription & window_description_,
const std::vector<WindowFunctionDescription> & window_functions_)
: ITransformingStep(input_stream_, addWindowFunctionResultColumns(input_stream_.header, window_functions_), getTraits())
const std::vector<WindowFunctionDescription> & window_functions_,
bool preserve_num_streams_)
: ITransformingStep(input_stream_, addWindowFunctionResultColumns(input_stream_.header, window_functions_), getTraits(!preserve_num_streams_))
, window_description(window_description_)
, window_functions(window_functions_)
, preserve_num_streams(preserve_num_streams_)
{
// We don't remove any columns, only add, so probably we don't have to update
// the output DataStream::distinct_columns.
Expand All @@ -60,6 +62,8 @@ WindowStep::WindowStep(

void WindowStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
auto num_streams = pipeline.getNumThreads();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe getNumStreams()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getNumStreams() return 1 if there is a ORDER BY in the window,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, we don't preserve the number of stream, but rather always fan out after the WindowStep

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then I think we should name the setting accordingly


// This resize is needed for cases such as `over ()` when we don't have a
// sort node, and the input might have multiple streams. The sort node would
// have resized it.
Expand All @@ -72,6 +76,11 @@ void WindowStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQ
input_streams.front().header, output_stream->header, window_description, window_functions);
});

if (preserve_num_streams)
frinkr marked this conversation as resolved.
Show resolved Hide resolved
{
pipeline.resize(num_streams);
}

assertBlocksHaveEqualStructure(pipeline.getHeader(), output_stream->header,
"WindowStep transform for '" + window_description.window_name + "'");
}
Expand Down
4 changes: 3 additions & 1 deletion src/Processors/QueryPlan/WindowStep.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ class WindowStep : public ITransformingStep
public:
explicit WindowStep(const DataStream & input_stream_,
const WindowDescription & window_description_,
const std::vector<WindowFunctionDescription> & window_functions_);
const std::vector<WindowFunctionDescription> & window_functions_,
bool preserve_num_streams_);

String getName() const override { return "Window"; }

Expand All @@ -32,6 +33,7 @@ class WindowStep : public ITransformingStep

WindowDescription window_description;
std::vector<WindowFunctionDescription> window_functions;
bool preserve_num_streams;
};

}
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
-- { echo }
select row_number() over (order by dummy) from (select * from remote('127.0.0.{1,2}', system, one));
select row_number() over (order by dummy) as x from (select * from remote('127.0.0.{1,2}', system, one)) order by x;
1
2
select row_number() over (order by dummy) from remote('127.0.0.{1,2}', system, one);
select row_number() over (order by dummy) as x from remote('127.0.0.{1,2}', system, one) order by x;
1
2
select max(identity(dummy + 1)) over () from remote('127.0.0.{1,2}', system, one);
select max(identity(dummy + 1)) over () as x from remote('127.0.0.{1,2}', system, one) order by x;
1
1
drop table if exists t_01568;
create table t_01568 engine Memory as
select intDiv(number, 3) p, modulo(number, 3) o, number
from numbers(9);
select sum(number) over w, max(number) over w from t_01568 window w as (partition by p);
select sum(number) over w as x, max(number) over w as y from t_01568 window w as (partition by p) order by x, y;
3 2
3 2
3 2
Expand All @@ -22,7 +22,7 @@ select sum(number) over w, max(number) over w from t_01568 window w as (partitio
21 8
21 8
21 8
select sum(number) over w, max(number) over w from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p);
select sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p) order by x, y;
6 2
6 2
6 2
Expand All @@ -41,23 +41,23 @@ select sum(number) over w, max(number) over w from remote('127.0.0.{1,2}', '', t
42 8
42 8
42 8
select distinct sum(number) over w, max(number) over w from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p);
select distinct sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p) order by x, y;
6 2
24 5
42 8
-- window functions + aggregation w/shards
select groupArray(groupArray(number)) over (rows unbounded preceding) from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3);
select groupArray(groupArray(number)) over (rows unbounded preceding) as x from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) order by x;
[[0,3,6,0,3,6]]
[[0,3,6,0,3,6],[1,4,7,1,4,7]]
[[0,3,6,0,3,6],[1,4,7,1,4,7],[2,5,8,2,5,8]]
select groupArray(groupArray(number)) over (rows unbounded preceding) from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) settings distributed_group_by_no_merge=1;
select groupArray(groupArray(number)) over (rows unbounded preceding) as x from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) order by x settings distributed_group_by_no_merge=1;
[[0,3,6]]
[[0,3,6],[1,4,7]]
[[0,3,6],[1,4,7],[2,5,8]]
[[0,3,6]]
[[0,3,6],[1,4,7]]
[[0,3,6],[1,4,7],[2,5,8]]
select groupArray(groupArray(number)) over (rows unbounded preceding) from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) settings distributed_group_by_no_merge=2; -- { serverError 48 }
select groupArray(groupArray(number)) over (rows unbounded preceding) as x from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) order by x settings distributed_group_by_no_merge=2; -- { serverError 48 }
-- proper ORDER BY w/window functions
select p, o, count() over (partition by p)
from remote('127.0.0.{1,2}', '', t_01568)
Expand Down
18 changes: 9 additions & 9 deletions tests/queries/0_stateless/01568_window_functions_distributed.sql
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
-- Tags: distributed

-- { echo }
select row_number() over (order by dummy) from (select * from remote('127.0.0.{1,2}', system, one));
select row_number() over (order by dummy) as x from (select * from remote('127.0.0.{1,2}', system, one)) order by x;

select row_number() over (order by dummy) from remote('127.0.0.{1,2}', system, one);
select row_number() over (order by dummy) as x from remote('127.0.0.{1,2}', system, one) order by x;

select max(identity(dummy + 1)) over () from remote('127.0.0.{1,2}', system, one);
select max(identity(dummy + 1)) over () as x from remote('127.0.0.{1,2}', system, one) order by x;

drop table if exists t_01568;

create table t_01568 engine Memory as
select intDiv(number, 3) p, modulo(number, 3) o, number
from numbers(9);

select sum(number) over w, max(number) over w from t_01568 window w as (partition by p);
select sum(number) over w as x, max(number) over w as y from t_01568 window w as (partition by p) order by x, y;

select sum(number) over w, max(number) over w from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p);
select sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p) order by x, y;

select distinct sum(number) over w, max(number) over w from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p);
select distinct sum(number) over w as x, max(number) over w as y from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p) order by x, y;

-- window functions + aggregation w/shards
select groupArray(groupArray(number)) over (rows unbounded preceding) from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3);
select groupArray(groupArray(number)) over (rows unbounded preceding) from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) settings distributed_group_by_no_merge=1;
select groupArray(groupArray(number)) over (rows unbounded preceding) from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) settings distributed_group_by_no_merge=2; -- { serverError 48 }
select groupArray(groupArray(number)) over (rows unbounded preceding) as x from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) order by x;
select groupArray(groupArray(number)) over (rows unbounded preceding) as x from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) order by x settings distributed_group_by_no_merge=1;
select groupArray(groupArray(number)) over (rows unbounded preceding) as x from remote('127.0.0.{1,2}', '', t_01568) group by mod(number, 3) order by x settings distributed_group_by_no_merge=2; -- { serverError 48 }

-- proper ORDER BY w/window functions
select p, o, count() over (partition by p)
Expand Down
3 changes: 2 additions & 1 deletion tests/queries/0_stateless/01571_window_functions.reference
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ system stop merges order_by_const;
INSERT INTO order_by_const(a, b, c, d) VALUES (1, 1, 101, 1), (1, 2, 102, 1), (1, 3, 103, 1), (1, 4, 104, 1);
INSERT INTO order_by_const(a, b, c, d) VALUES (1, 5, 104, 1), (1, 6, 105, 1), (2, 1, 106, 2), (2, 1, 107, 2);
INSERT INTO order_by_const(a, b, c, d) VALUES (2, 2, 107, 2), (2, 3, 108, 2), (2, 4, 109, 2);
SELECT row_number() OVER (order by 1, a) FROM order_by_const;
-- output 1 sorted stream
SELECT row_number() OVER (order by 1, a) FROM order_by_const SETTINGS query_plan_preserve_num_streams_after_window_functions=0;
1
2
3
Expand Down
4 changes: 3 additions & 1 deletion tests/queries/0_stateless/01571_window_functions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ system stop merges order_by_const;
INSERT INTO order_by_const(a, b, c, d) VALUES (1, 1, 101, 1), (1, 2, 102, 1), (1, 3, 103, 1), (1, 4, 104, 1);
INSERT INTO order_by_const(a, b, c, d) VALUES (1, 5, 104, 1), (1, 6, 105, 1), (2, 1, 106, 2), (2, 1, 107, 2);
INSERT INTO order_by_const(a, b, c, d) VALUES (2, 2, 107, 2), (2, 3, 108, 2), (2, 4, 109, 2);
SELECT row_number() OVER (order by 1, a) FROM order_by_const;

-- output 1 sorted stream
SELECT row_number() OVER (order by 1, a) FROM order_by_const SETTINGS query_plan_preserve_num_streams_after_window_functions=0;

drop table order_by_const;

Expand Down