Skip to content

Commit

Permalink
Merge pull request #50721 from ClickHouse/global-join-too-many-columns
Browse files Browse the repository at this point in the history
Do not read all the columns from right GLOBAL JOIN table.
  • Loading branch information
KochetovNicolai committed Jun 9, 2023
2 parents d4602f7 + 350becb commit c73a346
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 15 deletions.
11 changes: 2 additions & 9 deletions src/Interpreters/ExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ void ExpressionAnalyzer::initGlobalSubqueriesAndExternalTables(bool do_global, b
if (do_global)
{
GlobalSubqueriesVisitor::Data subqueries_data(
getContext(), subquery_depth, isRemoteStorage(), is_explain, external_tables, prepared_sets, has_global_subqueries);
getContext(), subquery_depth, isRemoteStorage(), is_explain, external_tables, prepared_sets, has_global_subqueries, syntax->analyzed_join.get());
GlobalSubqueriesVisitor(subqueries_data).visit(query);
}
}
Expand Down Expand Up @@ -1056,13 +1056,6 @@ JoinPtr SelectQueryExpressionAnalyzer::appendJoin(
return join;
}

static ActionsDAGPtr createJoinedBlockActions(ContextPtr context, const TableJoin & analyzed_join)
{
ASTPtr expression_list = analyzed_join.rightKeysList();
auto syntax_result = TreeRewriter(context).analyze(expression_list, analyzed_join.columnsFromJoinedTable());
return ExpressionAnalyzer(expression_list, syntax_result, context).getActionsDAG(true, false);
}

std::shared_ptr<DirectKeyValueJoin> tryKeyValueJoin(std::shared_ptr<TableJoin> analyzed_join, const Block & right_sample_block);


Expand Down Expand Up @@ -1144,7 +1137,7 @@ static std::unique_ptr<QueryPlan> buildJoinedPlan(
SelectQueryOptions query_options)
{
/// Actions which need to be calculated on joined block.
auto joined_block_actions = createJoinedBlockActions(context, analyzed_join);
auto joined_block_actions = analyzed_join.createJoinedBlockActions(context);
NamesWithAliases required_columns_with_aliases = analyzed_join.getRequiredColumns(
Block(joined_block_actions->getResultColumns()), joined_block_actions->getRequiredColumns().getNames());

Expand Down
28 changes: 23 additions & 5 deletions src/Interpreters/GlobalSubqueriesVisitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <Interpreters/InDepthNodeVisitor.h>
#include <Interpreters/interpretSubquery.h>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/TableJoin.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
Expand Down Expand Up @@ -43,6 +44,7 @@ class GlobalSubqueriesMatcher
TemporaryTablesMapping & external_tables;
PreparedSetsPtr prepared_sets;
bool & has_global_subqueries;
TableJoin * table_join;

Data(
ContextPtr context_,
Expand All @@ -51,18 +53,20 @@ class GlobalSubqueriesMatcher
bool is_explain_,
TemporaryTablesMapping & tables,
PreparedSetsPtr prepared_sets_,
bool & has_global_subqueries_)
bool & has_global_subqueries_,
TableJoin * table_join_)
: WithContext(context_)
, subquery_depth(subquery_depth_)
, is_remote(is_remote_)
, is_explain(is_explain_)
, external_tables(tables)
, prepared_sets(prepared_sets_)
, has_global_subqueries(has_global_subqueries_)
, table_join(table_join_)
{
}

void addExternalStorage(ASTPtr & ast, bool set_alias = false)
void addExternalStorage(ASTPtr & ast, const Names & required_columns, bool set_alias = false)
{
/// With nondistributed queries, creating temporary tables does not make sense.
if (!is_remote)
Expand Down Expand Up @@ -145,7 +149,7 @@ class GlobalSubqueriesMatcher
if (external_tables.contains(external_table_name))
return;

auto interpreter = interpretSubquery(subquery_or_table_name, getContext(), subquery_depth, {});
auto interpreter = interpretSubquery(subquery_or_table_name, getContext(), subquery_depth, required_columns);

Block sample = interpreter->getSampleBlock();
NamesAndTypesList columns = sample.getNamesAndTypesList();
Expand Down Expand Up @@ -238,7 +242,7 @@ class GlobalSubqueriesMatcher
return;
}

data.addExternalStorage(ast);
data.addExternalStorage(ast, {});
data.has_global_subqueries = true;
}
}
Expand All @@ -249,7 +253,21 @@ class GlobalSubqueriesMatcher
if (table_elem.table_join
&& (table_elem.table_join->as<ASTTableJoin &>().locality == JoinLocality::Global || shouldBeExecutedGlobally(data)))
{
data.addExternalStorage(table_elem.table_expression, true);
Names required_columns;

/// Fill required columns for GLOBAL JOIN.
/// This code is partial copy-paste from ExpressionAnalyzer.
if (data.table_join)
{
auto joined_block_actions = data.table_join->createJoinedBlockActions(data.getContext());
NamesWithAliases required_columns_with_aliases = data.table_join->getRequiredColumns(
Block(joined_block_actions->getResultColumns()), joined_block_actions->getRequiredColumns().getNames());

for (auto & pr : required_columns_with_aliases)
required_columns.push_back(pr.first);
}

data.addExternalStorage(table_elem.table_expression, required_columns, true);
data.has_global_subqueries = true;
}
}
Expand Down
9 changes: 9 additions & 0 deletions src/Interpreters/TableJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
#include <Dictionaries/DictionaryStructure.h>

#include <Interpreters/ExternalDictionariesLoader.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/ExpressionAnalyzer.h>

#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
Expand Down Expand Up @@ -760,4 +762,11 @@ bool TableJoin::allowParallelHashJoin() const
return true;
}

ActionsDAGPtr TableJoin::createJoinedBlockActions(ContextPtr context) const
{
ASTPtr expression_list = rightKeysList();
auto syntax_result = TreeRewriter(context).analyze(expression_list, columnsFromJoinedTable());
return ExpressionAnalyzer(expression_list, syntax_result, context).getActionsDAG(true, false);
}

}
2 changes: 2 additions & 0 deletions src/Interpreters/TableJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ class TableJoin
const SizeLimits & sizeLimits() const { return size_limits; }
VolumePtr getGlobalTemporaryVolume() { return tmp_volume; }

ActionsDAGPtr createJoinedBlockActions(ContextPtr context) const;

bool isEnabledAlgorithm(JoinAlgorithm val) const
{
/// When join_algorithm = 'default' (not specified by user) we use hash or direct algorithm.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ U c 10
UlI+1 10
bX?}ix [ Ny]2 G 10
t<iT X48q:Z]t0 10
0 3 SELECT `key`, `value1`, `value2`, toUInt64(min(`time`)) AS `start_ts` FROM `default`.`join_inner_table` PREWHERE (`id` = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (`number` > toUInt64(\'1610517366120\')) GROUP BY `key`, `value1`, `value2`
0 3 SELECT `key`, `value1`, `value2` FROM `default`.`join_inner_table` PREWHERE (`id` = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (`number` > toUInt64(\'1610517366120\')) GROUP BY `key`, `value1`, `value2`
0 3 SELECT `value1`, `value2`, count() AS `count` FROM `default`.`join_outer_table` ALL INNER JOIN `_data_11888098645495698704_17868075224240210014` USING (`key`) GROUP BY `key`, `value1`, `value2`
1 1 -- Parallel full query\nSELECT\n value1,\n value2,\n avg(count) AS avg\nFROM\n (\n SELECT\n key,\n value1,\n value2,\n count() AS count\n FROM join_outer_table\n INNER JOIN\n (\n SELECT\n key,\n value1,\n value2,\n toUInt64(min(time)) AS start_ts\n FROM join_inner_table\n PREWHERE (id = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (number > toUInt64(\'1610517366120\'))\n GROUP BY key, value1, value2\n ) USING (key)\n GROUP BY key, value1, value2\n )\nGROUP BY value1, value2\nORDER BY value1, value2\nSETTINGS allow_experimental_parallel_reading_from_replicas = 1;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
40
14 changes: 14 additions & 0 deletions tests/queries/0_stateless/02785_global_join_too_many_columns.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
drop table if exists local;
drop table if exists distr;

create table local (a UInt64, b UInt64, c UInt64, d UInt64, e UInt64, f UInt64, g UInt64, h UInt64) engine = Log;
create table distr as local engine = Distributed('test_cluster_two_shards', currentDatabase(), local);

insert into local (a) select number from numbers(10);

set max_columns_to_read=1;
select count() from distr as l global all left join distr as r on l.a = r.a;

drop table if exists local;
drop table if exists distr;

0 comments on commit c73a346

Please sign in to comment.