Skip to content

Commit

Permalink
Disable parallel replicas JOIN with CTE (not analyzer)
Browse files Browse the repository at this point in the history
  • Loading branch information
Algunenano committed Jan 26, 2024
1 parent 10d4f1f commit 176d8be
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 81 deletions.
5 changes: 1 addition & 4 deletions src/Interpreters/ActionsVisitor.cpp
Expand Up @@ -1414,10 +1414,7 @@ FutureSetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool
set_key = right_in_operand->getTreeHash(/*ignore_aliases=*/ true);

if (auto set = data.prepared_sets->findSubquery(set_key))
{
set->markAsINSubquery();
return set;
}

FutureSetFromSubqueryPtr external_table_set;

Expand Down Expand Up @@ -1464,7 +1461,7 @@ FutureSetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool
}

return data.prepared_sets->addFromSubquery(
set_key, std::move(source), nullptr, std::move(external_table_set), data.getContext()->getSettingsRef(), /*in_subquery=*/true);
set_key, std::move(source), nullptr, std::move(external_table_set), data.getContext()->getSettingsRef());
}
else
{
Expand Down
61 changes: 51 additions & 10 deletions src/Interpreters/GlobalSubqueriesVisitor.h
Expand Up @@ -32,6 +32,7 @@ namespace ErrorCodes
{
extern const int WRONG_GLOBAL_SUBQUERY;
extern const int LOGICAL_ERROR;
extern const int SUPPORT_IS_DISABLED;
}

class GlobalSubqueriesMatcher
Expand Down Expand Up @@ -200,23 +201,33 @@ class GlobalSubqueriesMatcher
}

private:
static bool shouldBeExecutedGlobally(const Data & data)
/// GLOBAL IN
static void visit(ASTFunction & func, ASTPtr &, Data & data)
{
const Settings & settings = data.getContext()->getSettingsRef();
/// For parallel replicas we reinterpret JOIN as GLOBAL JOIN as a way to broadcast data
const bool prefer_global = settings.prefer_global_in_and_join;
const bool enable_parallel_processing_of_joins = data.getContext()->canUseParallelReplicasOnInitiator();
return settings.prefer_global_in_and_join || enable_parallel_processing_of_joins;
}


/// GLOBAL IN
static void visit(ASTFunction & func, ASTPtr &, Data & data)
{
if ((shouldBeExecutedGlobally(data)
if (((prefer_global || enable_parallel_processing_of_joins)
&& (func.name == "in" || func.name == "notIn" || func.name == "nullIn" || func.name == "notNullIn"))
|| func.name == "globalIn" || func.name == "globalNotIn" || func.name == "globalNullIn" || func.name == "globalNotNullIn")
{
ASTPtr & ast = func.arguments->children[1];
if (enable_parallel_processing_of_joins)
{
/// We don't enable parallel replicas for IN (subquery)
if (ast->as<ASTSubquery>())
{
if (settings.allow_experimental_parallel_reading_from_replicas == 1)
{
LOG_DEBUG(getLogger("GlobalSubqueriesMatcher"), "IN with subquery is not supported with parallel replicas");
data.getContext()->getQueryContext()->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
return;
}
else if (settings.allow_experimental_parallel_reading_from_replicas == 2)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "IN with subquery is not supported with parallel replicas");
}
}

/// Literal or function can use regular IN.
/// NOTE: We don't support passing table functions to IN.
Expand All @@ -241,9 +252,39 @@ class GlobalSubqueriesMatcher
/// GLOBAL JOIN
static void visit(ASTTablesInSelectQueryElement & table_elem, ASTPtr &, Data & data)
{
const Settings & settings = data.getContext()->getSettingsRef();
const bool prefer_global = settings.prefer_global_in_and_join;
const bool enable_parallel_processing_of_joins = data.getContext()->canUseParallelReplicasOnInitiator();

if (table_elem.table_join
&& (table_elem.table_join->as<ASTTableJoin &>().locality == JoinLocality::Global || shouldBeExecutedGlobally(data)))
&& (table_elem.table_join->as<ASTTableJoin &>().locality == JoinLocality::Global || prefer_global
|| enable_parallel_processing_of_joins))
{
if (enable_parallel_processing_of_joins)
{
/// For parallel replicas we currently only support JOIN with subqueries
/// Note that tableA join tableB is previously converted into tableA JOIN (Select * FROM tableB) so that's ok
/// We don't support WITH cte as (subquery) Select table JOIN cte because we don't do conversion in AST
bool is_subquery = false;
if (const auto * ast_table_expr = table_elem.table_expression->as<ASTTableExpression>())
is_subquery = ast_table_expr->subquery->as<ASTSubquery>() != nullptr
&& ast_table_expr->subquery->as<ASTSubquery>()->cte_name.empty();
else if (table_elem.table_expression->as<ASTSubquery>())
is_subquery = true;

if (!is_subquery)
{
if (settings.allow_experimental_parallel_reading_from_replicas == 1)
{
LOG_DEBUG(getLogger("GlobalSubqueriesMatcher"), "JOIN with parallel replicas is only supported with subqueries");
data.getContext()->getQueryContext()->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
return;
}
else if (settings.allow_experimental_parallel_reading_from_replicas == 2)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "JOIN with parallel replicas is only supported with subqueries");
}
}

Names required_columns;

/// Fill required columns for GLOBAL JOIN.
Expand Down
33 changes: 1 addition & 32 deletions src/Interpreters/InterpreterSelectQuery.cpp
Expand Up @@ -864,38 +864,7 @@ bool InterpreterSelectQuery::adjustParallelReplicasAfterAnalysis()
ASTSelectQuery & query = getSelectQuery();

/// While only_analyze we don't know anything about parts, so any decision about how many parallel replicas to use would be wrong
if (!storage || !context->canUseParallelReplicasOnInitiator())
return false;

/// check if IN operator with subquery is present in the query
/// if so, disable parallel replicas
if (query_analyzer->getPreparedSets()->hasSubqueries())
{
bool in_subqueries = false;
const auto & sets = query_analyzer->getPreparedSets();
const auto subqueries = sets->getSubqueries();
for (const auto & subquery : subqueries)
{
if (subquery->isINSubquery())
{
in_subqueries = true;
break;
}
}

if (in_subqueries)
{
if (settings.allow_experimental_parallel_reading_from_replicas == 2)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "IN with subquery is not supported with parallel replicas");

context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
context->setSetting("max_parallel_replicas", UInt64{0});
LOG_DEBUG(log, "Disabling parallel replicas to execute a query with IN with subquery");
return true;
}
}

if (options.only_analyze)
if (!storage || options.only_analyze || !context->canUseParallelReplicasOnInitiator())
return false;

if (getTrivialCount(0).has_value())
Expand Down
27 changes: 4 additions & 23 deletions src/Interpreters/PreparedSets.cpp
Expand Up @@ -98,12 +98,8 @@ FutureSetFromSubquery::FutureSetFromSubquery(
std::unique_ptr<QueryPlan> source_,
StoragePtr external_table_,
std::shared_ptr<FutureSetFromSubquery> external_table_set_,
const Settings & settings,
bool in_subquery_)
: external_table(std::move(external_table_))
, external_table_set(std::move(external_table_set_))
, source(std::move(source_))
, in_subquery(in_subquery_)
const Settings & settings)
: external_table(std::move(external_table_)), external_table_set(std::move(external_table_set_)), source(std::move(source_))
{
set_and_key = std::make_shared<SetAndKey>();
set_and_key->key = std::move(key);
Expand Down Expand Up @@ -281,16 +277,10 @@ FutureSetFromSubqueryPtr PreparedSets::addFromSubquery(
std::unique_ptr<QueryPlan> source,
StoragePtr external_table,
FutureSetFromSubqueryPtr external_table_set,
const Settings & settings,
bool in_subquery)
const Settings & settings)
{
auto from_subquery = std::make_shared<FutureSetFromSubquery>(
toString(key, {}),
std::move(source),
std::move(external_table),
std::move(external_table_set),
settings,
in_subquery);
toString(key, {}), std::move(source), std::move(external_table), std::move(external_table_set), settings);

auto [it, inserted] = sets_from_subqueries.emplace(key, from_subquery);

Expand Down Expand Up @@ -340,15 +330,6 @@ std::shared_ptr<FutureSetFromSubquery> PreparedSets::findSubquery(const Hash & k
return it->second;
}

void PreparedSets::markAsINSubquery(const Hash & key)
{
auto it = sets_from_subqueries.find(key);
if (it == sets_from_subqueries.end())
return;

it->second->markAsINSubquery();
}

std::shared_ptr<FutureSetFromStorage> PreparedSets::findStorage(const Hash & key) const
{
auto it = sets_from_storage.find(key);
Expand Down
14 changes: 2 additions & 12 deletions src/Interpreters/PreparedSets.h
Expand Up @@ -101,8 +101,7 @@ class FutureSetFromSubquery final : public FutureSet
std::unique_ptr<QueryPlan> source_,
StoragePtr external_table_,
std::shared_ptr<FutureSetFromSubquery> external_table_set_,
const Settings & settings,
bool in_subquery_);
const Settings & settings);

FutureSetFromSubquery(
String key,
Expand All @@ -118,8 +117,6 @@ class FutureSetFromSubquery final : public FutureSet

QueryTreeNodePtr detachQueryTree() { return std::move(query_tree); }
void setQueryPlan(std::unique_ptr<QueryPlan> source_);
void markAsINSubquery() { in_subquery = true; }
bool isINSubquery() const { return in_subquery; }

private:
SetAndKeyPtr set_and_key;
Expand All @@ -128,11 +125,6 @@ class FutureSetFromSubquery final : public FutureSet

std::unique_ptr<QueryPlan> source;
QueryTreeNodePtr query_tree;
bool in_subquery = false; // subquery used in IN operator
// the flag can be removed after enabling new analyzer and removing interpreter
// or after enabling support IN operator with subqueries in parallel replicas
// Note: it's necessary with interpreter since prepared sets used also for GLOBAL JOINs,
// with new analyzer it's not a case
};

using FutureSetFromSubqueryPtr = std::shared_ptr<FutureSetFromSubquery>;
Expand Down Expand Up @@ -160,8 +152,7 @@ class PreparedSets
std::unique_ptr<QueryPlan> source,
StoragePtr external_table,
FutureSetFromSubqueryPtr external_table_set,
const Settings & settings,
bool in_subquery = false);
const Settings & settings);

FutureSetFromSubqueryPtr addFromSubquery(
const Hash & key,
Expand All @@ -171,7 +162,6 @@ class PreparedSets
FutureSetFromTuplePtr findTuple(const Hash & key, const DataTypes & types) const;
FutureSetFromStoragePtr findStorage(const Hash & key) const;
FutureSetFromSubqueryPtr findSubquery(const Hash & key) const;
void markAsINSubquery(const Hash & key);

using Subqueries = std::vector<FutureSetFromSubqueryPtr>;
Subqueries getSubqueries() const;
Expand Down
@@ -0,0 +1,2 @@
990000
990000
23 changes: 23 additions & 0 deletions tests/queries/0_stateless/02972_parallel_replicas_cte.sql
@@ -0,0 +1,23 @@
DROP TABLE IF EXISTS pr_1;
DROP TABLE IF EXISTS pr_2;

CREATE TABLE pr_1 (`a` UInt32) ENGINE = MergeTree ORDER BY a PARTITION BY a % 10 AS
SELECT 10 * intDiv(number, 10) + 1 FROM numbers(1_000_000);

CREATE TABLE pr_2 (`a` UInt32) ENGINE = MergeTree ORDER BY a AS
SELECT * FROM numbers(1_000_000);

WITH filtered_groups AS (SELECT a FROM pr_1 WHERE a >= 10000)
SELECT count() FROM pr_2 INNER JOIN filtered_groups ON pr_2.a = filtered_groups.a;

WITH filtered_groups AS (SELECT a FROM pr_1 WHERE a >= 10000)
SELECT count() FROM pr_2 INNER JOIN filtered_groups ON pr_2.a = filtered_groups.a
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_two_shards', max_parallel_replicas = 3;

-- Testing that it is disabled for allow_experimental_analyzer=0. With analyzer it will be supported (with correct result)
WITH filtered_groups AS (SELECT a FROM pr_1 WHERE a >= 10000)
SELECT count() FROM pr_2 INNER JOIN filtered_groups ON pr_2.a = filtered_groups.a
SETTINGS allow_experimental_analyzer = 0, allow_experimental_parallel_reading_from_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_two_shards', max_parallel_replicas = 3; -- { serverError SUPPORT_IS_DISABLED }

DROP TABLE IF EXISTS pr_1;
DROP TABLE IF EXISTS pr_2;

0 comments on commit 176d8be

Please sign in to comment.