Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some fixes for parallel replicas #48433

Merged
merged 1 commit into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,20 @@ InterpreterSelectQuery::InterpreterSelectQuery(
context->setSetting("parallel_replicas_custom_key", String{""});
}

/// Try to execute query without parallel replicas if we find that there is a FINAL modifier there.
bool is_query_with_final = false;
if (query_info.table_expression_modifiers)
is_query_with_final = query_info.table_expression_modifiers->hasFinal();
else if (query_info.query)
is_query_with_final = query_info.query->as<ASTSelectQuery &>().final();
Comment on lines +466 to +470
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReadFromMergeTree::isFinal(const SelectQueryInfo & query_info)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copied from there, yes 😜


if (is_query_with_final && (!settings.parallel_replicas_custom_key.value.empty() || settings.allow_experimental_parallel_reading_from_replicas))
{
LOG_WARNING(log, "FINAL modifier is supported with parallel replicas. Will try to execute the query without using them.");
context->setSetting("allow_experimental_parallel_reading_from_replicas", false);
context->setSetting("parallel_replicas_custom_key", String{""});
}

/// Rewrite JOINs
if (!has_input && joined_tables.tablesCount() > 1)
{
Expand Down
3 changes: 2 additions & 1 deletion src/Processors/QueryPlan/ReadFromMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ namespace ErrorCodes
extern const int INDEX_NOT_USED;
extern const int LOGICAL_ERROR;
extern const int TOO_MANY_ROWS;
extern const int SUPPORT_IS_DISABLED;
}

static MergeTreeReaderSettings getMergeTreeReaderSettings(
Expand Down Expand Up @@ -1539,7 +1540,7 @@ Pipe ReadFromMergeTree::spreadMarkRanges(
if (final)
{
if (is_parallel_reading_from_replicas)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Final modifier is not supported with parallel replicas");
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "FINAL modifier is not supported with parallel replicas");

if (output_each_partition_through_separate_port)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Optimisation isn't supposed to be used for queries with final");
Expand Down
3 changes: 1 addition & 2 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6939,8 +6939,7 @@ QueryProcessingStage::Enum MergeTreeData::getQueryProcessingStage(
if (query_context->getClientInfo().collaborate_with_initiator)
return QueryProcessingStage::Enum::FetchColumns;

if (query_context->getSettingsRef().allow_experimental_parallel_reading_from_replicas
&& !query_context->getClientInfo().collaborate_with_initiator
if (query_context->canUseParallelReplicasOnInitiator()
&& to_stage >= QueryProcessingStage::WithMergeableState)
return QueryProcessingStage::Enum::WithMergeableState;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
0
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE TABLE IF NOT EXISTS t_02708(x DateTime) ENGINE = MergeTree ORDER BY tuple();
SELECT count() FROM t_02708 SETTINGS allow_experimental_parallel_reading_from_replicas=1;
DROP TABLE t_02708;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1 1 2020-01-01 00:00:00
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
DROP TABLE IF EXISTS t_02709;
CREATE TABLE t_02709 (key UInt32, sign Int8, date Datetime) ENGINE=CollapsingMergeTree(sign) PARTITION BY date ORDER BY key;
INSERT INTO t_02709 VALUES (1, 1, '2020-01-01'), (2, 1, '2020-01-02'), (1, -1, '2020-01-01'), (2, -1, '2020-01-02'), (1, 1, '2020-01-01');
SELECT * FROM t_02709 FINAL ORDER BY key SETTINGS max_parallel_replicas=3, allow_experimental_parallel_reading_from_replicas=1, use_hedged_requests=0, cluster_for_parallel_replicas='parallel_replicas';
DROP TABLE t_02709;