Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable parallel replicas JOIN with CTE (not analyzer) #59239

Merged
merged 3 commits into from Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 1 addition & 4 deletions src/Interpreters/ActionsVisitor.cpp
Expand Up @@ -1414,10 +1414,7 @@ FutureSetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool
set_key = right_in_operand->getTreeHash(/*ignore_aliases=*/ true);

if (auto set = data.prepared_sets->findSubquery(set_key))
{
set->markAsINSubquery();
return set;
}

FutureSetFromSubqueryPtr external_table_set;

Expand Down Expand Up @@ -1464,7 +1461,7 @@ FutureSetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool
}

return data.prepared_sets->addFromSubquery(
set_key, std::move(source), nullptr, std::move(external_table_set), data.getContext()->getSettingsRef(), /*in_subquery=*/true);
set_key, std::move(source), nullptr, std::move(external_table_set), data.getContext()->getSettingsRef());
}
else
{
Expand Down
61 changes: 51 additions & 10 deletions src/Interpreters/GlobalSubqueriesVisitor.h
Expand Up @@ -32,6 +32,7 @@ namespace ErrorCodes
{
extern const int WRONG_GLOBAL_SUBQUERY;
extern const int LOGICAL_ERROR;
extern const int SUPPORT_IS_DISABLED;
}

class GlobalSubqueriesMatcher
Expand Down Expand Up @@ -200,23 +201,33 @@ class GlobalSubqueriesMatcher
}

private:
static bool shouldBeExecutedGlobally(const Data & data)
/// GLOBAL IN
static void visit(ASTFunction & func, ASTPtr &, Data & data)
{
const Settings & settings = data.getContext()->getSettingsRef();
/// For parallel replicas we reinterpret JOIN as GLOBAL JOIN as a way to broadcast data
const bool prefer_global = settings.prefer_global_in_and_join;
const bool enable_parallel_processing_of_joins = data.getContext()->canUseParallelReplicasOnInitiator();
return settings.prefer_global_in_and_join || enable_parallel_processing_of_joins;
}


/// GLOBAL IN
static void visit(ASTFunction & func, ASTPtr &, Data & data)
{
if ((shouldBeExecutedGlobally(data)
if (((prefer_global || enable_parallel_processing_of_joins)
&& (func.name == "in" || func.name == "notIn" || func.name == "nullIn" || func.name == "notNullIn"))
|| func.name == "globalIn" || func.name == "globalNotIn" || func.name == "globalNullIn" || func.name == "globalNotNullIn")
{
ASTPtr & ast = func.arguments->children[1];
if (enable_parallel_processing_of_joins)
{
/// We don't enable parallel replicas for IN (subquery)
if (ast->as<ASTSubquery>())
{
if (settings.allow_experimental_parallel_reading_from_replicas == 1)
{
LOG_DEBUG(getLogger("GlobalSubqueriesMatcher"), "IN with subquery is not supported with parallel replicas");
data.getContext()->getQueryContext()->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
return;
}
else if (settings.allow_experimental_parallel_reading_from_replicas == 2)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "IN with subquery is not supported with parallel replicas");
}
}

/// Literal or function can use regular IN.
/// NOTE: We don't support passing table functions to IN.
Expand All @@ -241,9 +252,39 @@ class GlobalSubqueriesMatcher
/// GLOBAL JOIN
static void visit(ASTTablesInSelectQueryElement & table_elem, ASTPtr &, Data & data)
{
const Settings & settings = data.getContext()->getSettingsRef();
const bool prefer_global = settings.prefer_global_in_and_join;
const bool enable_parallel_processing_of_joins = data.getContext()->canUseParallelReplicasOnInitiator();

if (table_elem.table_join
&& (table_elem.table_join->as<ASTTableJoin &>().locality == JoinLocality::Global || shouldBeExecutedGlobally(data)))
&& (table_elem.table_join->as<ASTTableJoin &>().locality == JoinLocality::Global || prefer_global
|| enable_parallel_processing_of_joins))
{
if (enable_parallel_processing_of_joins)
{
/// For parallel replicas we currently only support JOIN with subqueries
/// Note that tableA join tableB is previously converted into tableA JOIN (Select * FROM tableB) so that's ok
/// We don't support WITH cte as (subquery) Select table JOIN cte because we don't do conversion in AST
bool is_subquery = false;
if (const auto * ast_table_expr = table_elem.table_expression->as<ASTTableExpression>())
is_subquery = ast_table_expr->subquery->as<ASTSubquery>() != nullptr
&& ast_table_expr->subquery->as<ASTSubquery>()->cte_name.empty();
else if (table_elem.table_expression->as<ASTSubquery>())
is_subquery = true;

if (!is_subquery)
{
if (settings.allow_experimental_parallel_reading_from_replicas == 1)
{
LOG_DEBUG(getLogger("GlobalSubqueriesMatcher"), "JOIN with parallel replicas is only supported with subqueries");
data.getContext()->getQueryContext()->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
return;
}
else if (settings.allow_experimental_parallel_reading_from_replicas == 2)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "JOIN with parallel replicas is only supported with subqueries");
}
}

Names required_columns;

/// Fill required columns for GLOBAL JOIN.
Expand Down
33 changes: 1 addition & 32 deletions src/Interpreters/InterpreterSelectQuery.cpp
Expand Up @@ -864,38 +864,7 @@ bool InterpreterSelectQuery::adjustParallelReplicasAfterAnalysis()
ASTSelectQuery & query = getSelectQuery();

/// While only_analyze we don't know anything about parts, so any decision about how many parallel replicas to use would be wrong
if (!storage || !context->canUseParallelReplicasOnInitiator())
return false;

/// check if IN operator with subquery is present in the query
/// if so, disable parallel replicas
if (query_analyzer->getPreparedSets()->hasSubqueries())
{
bool in_subqueries = false;
const auto & sets = query_analyzer->getPreparedSets();
const auto subqueries = sets->getSubqueries();
for (const auto & subquery : subqueries)
{
if (subquery->isINSubquery())
{
in_subqueries = true;
break;
}
}

if (in_subqueries)
{
if (settings.allow_experimental_parallel_reading_from_replicas == 2)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "IN with subquery is not supported with parallel replicas");

context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
context->setSetting("max_parallel_replicas", UInt64{0});
LOG_DEBUG(log, "Disabling parallel replicas to execute a query with IN with subquery");
return true;
}
}

if (options.only_analyze)
if (!storage || options.only_analyze || !context->canUseParallelReplicasOnInitiator())
return false;

if (getTrivialCount(0).has_value())
Expand Down
27 changes: 4 additions & 23 deletions src/Interpreters/PreparedSets.cpp
Expand Up @@ -98,12 +98,8 @@ FutureSetFromSubquery::FutureSetFromSubquery(
std::unique_ptr<QueryPlan> source_,
StoragePtr external_table_,
std::shared_ptr<FutureSetFromSubquery> external_table_set_,
const Settings & settings,
bool in_subquery_)
: external_table(std::move(external_table_))
, external_table_set(std::move(external_table_set_))
, source(std::move(source_))
, in_subquery(in_subquery_)
const Settings & settings)
: external_table(std::move(external_table_)), external_table_set(std::move(external_table_set_)), source(std::move(source_))
{
set_and_key = std::make_shared<SetAndKey>();
set_and_key->key = std::move(key);
Expand Down Expand Up @@ -281,16 +277,10 @@ FutureSetFromSubqueryPtr PreparedSets::addFromSubquery(
std::unique_ptr<QueryPlan> source,
StoragePtr external_table,
FutureSetFromSubqueryPtr external_table_set,
const Settings & settings,
bool in_subquery)
const Settings & settings)
{
auto from_subquery = std::make_shared<FutureSetFromSubquery>(
toString(key, {}),
std::move(source),
std::move(external_table),
std::move(external_table_set),
settings,
in_subquery);
toString(key, {}), std::move(source), std::move(external_table), std::move(external_table_set), settings);

auto [it, inserted] = sets_from_subqueries.emplace(key, from_subquery);

Expand Down Expand Up @@ -340,15 +330,6 @@ std::shared_ptr<FutureSetFromSubquery> PreparedSets::findSubquery(const Hash & k
return it->second;
}

void PreparedSets::markAsINSubquery(const Hash & key)
{
auto it = sets_from_subqueries.find(key);
if (it == sets_from_subqueries.end())
return;

it->second->markAsINSubquery();
}

std::shared_ptr<FutureSetFromStorage> PreparedSets::findStorage(const Hash & key) const
{
auto it = sets_from_storage.find(key);
Expand Down
14 changes: 2 additions & 12 deletions src/Interpreters/PreparedSets.h
Expand Up @@ -101,8 +101,7 @@ class FutureSetFromSubquery final : public FutureSet
std::unique_ptr<QueryPlan> source_,
StoragePtr external_table_,
std::shared_ptr<FutureSetFromSubquery> external_table_set_,
const Settings & settings,
bool in_subquery_);
const Settings & settings);

FutureSetFromSubquery(
String key,
Expand All @@ -118,8 +117,6 @@ class FutureSetFromSubquery final : public FutureSet

QueryTreeNodePtr detachQueryTree() { return std::move(query_tree); }
void setQueryPlan(std::unique_ptr<QueryPlan> source_);
void markAsINSubquery() { in_subquery = true; }
bool isINSubquery() const { return in_subquery; }

private:
SetAndKeyPtr set_and_key;
Expand All @@ -128,11 +125,6 @@ class FutureSetFromSubquery final : public FutureSet

std::unique_ptr<QueryPlan> source;
QueryTreeNodePtr query_tree;
bool in_subquery = false; // subquery used in IN operator
// the flag can be removed after enabling new analyzer and removing interpreter
// or after enabling support IN operator with subqueries in parallel replicas
// Note: it's necessary with interpreter since prepared sets used also for GLOBAL JOINs,
// with new analyzer it's not a case
};

using FutureSetFromSubqueryPtr = std::shared_ptr<FutureSetFromSubquery>;
Expand Down Expand Up @@ -160,8 +152,7 @@ class PreparedSets
std::unique_ptr<QueryPlan> source,
StoragePtr external_table,
FutureSetFromSubqueryPtr external_table_set,
const Settings & settings,
bool in_subquery = false);
const Settings & settings);

FutureSetFromSubqueryPtr addFromSubquery(
const Hash & key,
Expand All @@ -171,7 +162,6 @@ class PreparedSets
FutureSetFromTuplePtr findTuple(const Hash & key, const DataTypes & types) const;
FutureSetFromStoragePtr findStorage(const Hash & key) const;
FutureSetFromSubqueryPtr findSubquery(const Hash & key) const;
void markAsINSubquery(const Hash & key);

using Subqueries = std::vector<FutureSetFromSubqueryPtr>;
Subqueries getSubqueries() const;
Expand Down
@@ -0,0 +1,2 @@
990000
990000
23 changes: 23 additions & 0 deletions tests/queries/0_stateless/02972_parallel_replicas_cte.sql
@@ -0,0 +1,23 @@
DROP TABLE IF EXISTS pr_1;
DROP TABLE IF EXISTS pr_2;

CREATE TABLE pr_1 (`a` UInt32) ENGINE = MergeTree ORDER BY a PARTITION BY a % 10 AS
SELECT 10 * intDiv(number, 10) + 1 FROM numbers(1_000_000);

CREATE TABLE pr_2 (`a` UInt32) ENGINE = MergeTree ORDER BY a AS
SELECT * FROM numbers(1_000_000);

WITH filtered_groups AS (SELECT a FROM pr_1 WHERE a >= 10000)
SELECT count() FROM pr_2 INNER JOIN filtered_groups ON pr_2.a = filtered_groups.a;

WITH filtered_groups AS (SELECT a FROM pr_1 WHERE a >= 10000)
SELECT count() FROM pr_2 INNER JOIN filtered_groups ON pr_2.a = filtered_groups.a
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_two_shards', max_parallel_replicas = 3;

-- Testing that it is disabled for allow_experimental_analyzer=0. With analyzer it will be supported (with correct result)
WITH filtered_groups AS (SELECT a FROM pr_1 WHERE a >= 10000)
SELECT count() FROM pr_2 INNER JOIN filtered_groups ON pr_2.a = filtered_groups.a
SETTINGS allow_experimental_analyzer = 0, allow_experimental_parallel_reading_from_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_two_shards', max_parallel_replicas = 3; -- { serverError SUPPORT_IS_DISABLED }

DROP TABLE IF EXISTS pr_1;
DROP TABLE IF EXISTS pr_2;