Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport #50721 to 23.5: Do not read all the columns from right GLOBAL JOIN table. #50794

Merged
merged 1 commit into from Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 2 additions & 9 deletions src/Interpreters/ExpressionAnalyzer.cpp
Expand Up @@ -444,7 +444,7 @@ void ExpressionAnalyzer::initGlobalSubqueriesAndExternalTables(bool do_global, b
if (do_global)
{
GlobalSubqueriesVisitor::Data subqueries_data(
getContext(), subquery_depth, isRemoteStorage(), is_explain, external_tables, prepared_sets, has_global_subqueries);
getContext(), subquery_depth, isRemoteStorage(), is_explain, external_tables, prepared_sets, has_global_subqueries, syntax->analyzed_join.get());
GlobalSubqueriesVisitor(subqueries_data).visit(query);
}
}
Expand Down Expand Up @@ -1056,13 +1056,6 @@ JoinPtr SelectQueryExpressionAnalyzer::appendJoin(
return join;
}

static ActionsDAGPtr createJoinedBlockActions(ContextPtr context, const TableJoin & analyzed_join)
{
ASTPtr expression_list = analyzed_join.rightKeysList();
auto syntax_result = TreeRewriter(context).analyze(expression_list, analyzed_join.columnsFromJoinedTable());
return ExpressionAnalyzer(expression_list, syntax_result, context).getActionsDAG(true, false);
}

std::shared_ptr<DirectKeyValueJoin> tryKeyValueJoin(std::shared_ptr<TableJoin> analyzed_join, const Block & right_sample_block);


Expand Down Expand Up @@ -1144,7 +1137,7 @@ static std::unique_ptr<QueryPlan> buildJoinedPlan(
SelectQueryOptions query_options)
{
/// Actions which need to be calculated on joined block.
auto joined_block_actions = createJoinedBlockActions(context, analyzed_join);
auto joined_block_actions = analyzed_join.createJoinedBlockActions(context);
NamesWithAliases required_columns_with_aliases = analyzed_join.getRequiredColumns(
Block(joined_block_actions->getResultColumns()), joined_block_actions->getRequiredColumns().getNames());

Expand Down
28 changes: 23 additions & 5 deletions src/Interpreters/GlobalSubqueriesVisitor.h
Expand Up @@ -9,6 +9,7 @@
#include <Interpreters/InDepthNodeVisitor.h>
#include <Interpreters/interpretSubquery.h>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/TableJoin.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
Expand Down Expand Up @@ -43,6 +44,7 @@ class GlobalSubqueriesMatcher
TemporaryTablesMapping & external_tables;
PreparedSetsPtr prepared_sets;
bool & has_global_subqueries;
TableJoin * table_join;

Data(
ContextPtr context_,
Expand All @@ -51,18 +53,20 @@ class GlobalSubqueriesMatcher
bool is_explain_,
TemporaryTablesMapping & tables,
PreparedSetsPtr prepared_sets_,
bool & has_global_subqueries_)
bool & has_global_subqueries_,
TableJoin * table_join_)
: WithContext(context_)
, subquery_depth(subquery_depth_)
, is_remote(is_remote_)
, is_explain(is_explain_)
, external_tables(tables)
, prepared_sets(prepared_sets_)
, has_global_subqueries(has_global_subqueries_)
, table_join(table_join_)
{
}

void addExternalStorage(ASTPtr & ast, bool set_alias = false)
void addExternalStorage(ASTPtr & ast, const Names & required_columns, bool set_alias = false)
{
/// With nondistributed queries, creating temporary tables does not make sense.
if (!is_remote)
Expand Down Expand Up @@ -145,7 +149,7 @@ class GlobalSubqueriesMatcher
if (external_tables.contains(external_table_name))
return;

auto interpreter = interpretSubquery(subquery_or_table_name, getContext(), subquery_depth, {});
auto interpreter = interpretSubquery(subquery_or_table_name, getContext(), subquery_depth, required_columns);

Block sample = interpreter->getSampleBlock();
NamesAndTypesList columns = sample.getNamesAndTypesList();
Expand Down Expand Up @@ -238,7 +242,7 @@ class GlobalSubqueriesMatcher
return;
}

data.addExternalStorage(ast);
data.addExternalStorage(ast, {});
data.has_global_subqueries = true;
}
}
Expand All @@ -249,7 +253,21 @@ class GlobalSubqueriesMatcher
if (table_elem.table_join
&& (table_elem.table_join->as<ASTTableJoin &>().locality == JoinLocality::Global || shouldBeExecutedGlobally(data)))
{
data.addExternalStorage(table_elem.table_expression, true);
Names required_columns;

/// Fill required columns for GLOBAL JOIN.
/// This code is partial copy-paste from ExpressionAnalyzer.
if (data.table_join)
{
auto joined_block_actions = data.table_join->createJoinedBlockActions(data.getContext());
NamesWithAliases required_columns_with_aliases = data.table_join->getRequiredColumns(
Block(joined_block_actions->getResultColumns()), joined_block_actions->getRequiredColumns().getNames());

for (auto & pr : required_columns_with_aliases)
required_columns.push_back(pr.first);
}

data.addExternalStorage(table_elem.table_expression, required_columns, true);
data.has_global_subqueries = true;
}
}
Expand Down
9 changes: 9 additions & 0 deletions src/Interpreters/TableJoin.cpp
Expand Up @@ -14,6 +14,8 @@
#include <Dictionaries/DictionaryStructure.h>

#include <Interpreters/ExternalDictionariesLoader.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/ExpressionAnalyzer.h>

#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
Expand Down Expand Up @@ -760,4 +762,11 @@ bool TableJoin::allowParallelHashJoin() const
return true;
}

ActionsDAGPtr TableJoin::createJoinedBlockActions(ContextPtr context) const
{
ASTPtr expression_list = rightKeysList();
auto syntax_result = TreeRewriter(context).analyze(expression_list, columnsFromJoinedTable());
return ExpressionAnalyzer(expression_list, syntax_result, context).getActionsDAG(true, false);
}

}
2 changes: 2 additions & 0 deletions src/Interpreters/TableJoin.h
Expand Up @@ -217,6 +217,8 @@ class TableJoin
const SizeLimits & sizeLimits() const { return size_limits; }
VolumePtr getGlobalTemporaryVolume() { return tmp_volume; }

ActionsDAGPtr createJoinedBlockActions(ContextPtr context) const;

bool isEnabledAlgorithm(JoinAlgorithm val) const
{
/// When join_algorithm = 'default' (not specified by user) we use hash or direct algorithm.
Expand Down
Expand Up @@ -39,6 +39,6 @@ U c 10
UlI+1 10
bX?}ix [ Ny]2 G 10
t<iT X48q:Z]t0 10
0 3 SELECT `key`, `value1`, `value2`, toUInt64(min(`time`)) AS `start_ts` FROM `default`.`join_inner_table` PREWHERE (`id` = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (`number` > toUInt64(\'1610517366120\')) GROUP BY `key`, `value1`, `value2`
0 3 SELECT `key`, `value1`, `value2` FROM `default`.`join_inner_table` PREWHERE (`id` = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (`number` > toUInt64(\'1610517366120\')) GROUP BY `key`, `value1`, `value2`
0 3 SELECT `value1`, `value2`, count() AS `count` FROM `default`.`join_outer_table` ALL INNER JOIN `_data_11888098645495698704_17868075224240210014` USING (`key`) GROUP BY `key`, `value1`, `value2`
1 1 -- Parallel full query\nSELECT\n value1,\n value2,\n avg(count) AS avg\nFROM\n (\n SELECT\n key,\n value1,\n value2,\n count() AS count\n FROM join_outer_table\n INNER JOIN\n (\n SELECT\n key,\n value1,\n value2,\n toUInt64(min(time)) AS start_ts\n FROM join_inner_table\n PREWHERE (id = \'833c9e22-c245-4eb5-8745-117a9a1f26b1\') AND (number > toUInt64(\'1610517366120\'))\n GROUP BY key, value1, value2\n ) USING (key)\n GROUP BY key, value1, value2\n )\nGROUP BY value1, value2\nORDER BY value1, value2\nSETTINGS allow_experimental_parallel_reading_from_replicas = 1;
@@ -0,0 +1 @@
40
14 changes: 14 additions & 0 deletions tests/queries/0_stateless/02785_global_join_too_many_columns.sql
@@ -0,0 +1,14 @@
drop table if exists local;
drop table if exists distr;

create table local (a UInt64, b UInt64, c UInt64, d UInt64, e UInt64, f UInt64, g UInt64, h UInt64) engine = Log;
create table distr as local engine = Distributed('test_cluster_two_shards', currentDatabase(), local);

insert into local (a) select number from numbers(10);

set max_columns_to_read=1;
select count() from distr as l global all left join distr as r on l.a = r.a;

drop table if exists local;
drop table if exists distr;