Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a setting max_execution_time_leaf to limit the execution time on shard for distributed query #51823

Merged
merged 9 commits into from Nov 3, 2023
22 changes: 21 additions & 1 deletion docs/en/operations/settings/query-complexity.md
Expand Up @@ -163,7 +163,27 @@ If you set `timeout_before_checking_execution_speed `to 0, ClickHouse will use c

## timeout_overflow_mode {#timeout-overflow-mode}

What to do if the query is run longer than ‘max_execution_time’: ‘throw’ or ‘break’. By default, throw.
What to do if the query is run longer than `max_execution_time`: `throw` or `break`. By default, `throw`.

# max_execution_time_leaf

Similar semantic to `max_execution_time` but only apply on leaf node for distributed or remote queries.

For example, if we want to limit execution time on leaf node to `10s` but no limit on the initial node, instead of having `max_execution_time` in the nested subquery settings:

``` sql
SELECT count() FROM cluster(cluster, view(SELECT * FROM t SETTINGS max_execution_time = 10));
```

We can use `max_execution_time_leaf` as the query settings:

``` sql
SELECT count() FROM cluster(cluster, view(SELECT * FROM t)) SETTINGS max_execution_time_leaf = 10;
```

# timeout_overflow_mode_leaf

What to do when the query in leaf node run longer than `max_execution_time_leaf`: `throw` or `break`. By default, `throw`.

## min_execution_speed {#min-execution-speed}

Expand Down
16 changes: 9 additions & 7 deletions src/Core/Settings.h
Expand Up @@ -364,16 +364,16 @@ class IColumn;
M(UInt64, max_bytes_to_read, 0, "Limit on read bytes (after decompression) from the most 'deep' sources. That is, only in the deepest subquery. When reading from a remote server, it is only checked on a remote server.", 0) \
M(OverflowMode, read_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
\
M(UInt64, max_rows_to_read_leaf, 0, "Limit on read rows on the leaf nodes for distributed queries. Limit is applied for local reads only excluding the final merge stage on the root node. Note, the setting is unstable with prefer_localhost_replica=1.", 0) \
M(UInt64, max_bytes_to_read_leaf, 0, "Limit on read bytes (after decompression) on the leaf nodes for distributed queries. Limit is applied for local reads only excluding the final merge stage on the root node. Note, the setting is unstable with prefer_localhost_replica=1.", 0) \
M(UInt64, max_rows_to_read_leaf, 0, "Limit on read rows on the leaf nodes for distributed queries. Limit is applied for local reads only, excluding the final merge stage on the root node. Note, the setting is unstable with prefer_localhost_replica=1.", 0) \
M(UInt64, max_bytes_to_read_leaf, 0, "Limit on read bytes (after decompression) on the leaf nodes for distributed queries. Limit is applied for local reads only, excluding the final merge stage on the root node. Note, the setting is unstable with prefer_localhost_replica=1.", 0) \
M(OverflowMode, read_overflow_mode_leaf, OverflowMode::THROW, "What to do when the leaf limit is exceeded.", 0) \
\
M(UInt64, max_rows_to_group_by, 0, "If aggregation during GROUP BY is generating more than specified number of rows (unique GROUP BY keys), the behavior will be determined by the 'group_by_overflow_mode' which by default is - throw an exception, but can be also switched to an approximate GROUP BY mode.", 0) \
M(UInt64, max_rows_to_group_by, 0, "If aggregation during GROUP BY is generating more than the specified number of rows (unique GROUP BY keys), the behavior will be determined by the 'group_by_overflow_mode' which by default is - throw an exception, but can be also switched to an approximate GROUP BY mode.", 0) \
M(OverflowModeGroupBy, group_by_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
M(UInt64, max_bytes_before_external_group_by, 0, "If memory usage during GROUP BY operation is exceeding this threshold in bytes, activate the 'external aggregation' mode (spill data to disk). Recommended value is half of available system memory.", 0) \
\
M(UInt64, max_rows_to_sort, 0, "If more than specified amount of records have to be processed for ORDER BY operation, the behavior will be determined by the 'sort_overflow_mode' which by default is - throw an exception", 0) \
M(UInt64, max_bytes_to_sort, 0, "If more than specified amount of (uncompressed) bytes have to be processed for ORDER BY operation, the behavior will be determined by the 'sort_overflow_mode' which by default is - throw an exception", 0) \
M(UInt64, max_rows_to_sort, 0, "If more than the specified amount of records have to be processed for ORDER BY operation, the behavior will be determined by the 'sort_overflow_mode' which by default is - throw an exception", 0) \
M(UInt64, max_bytes_to_sort, 0, "If more than the specified amount of (uncompressed) bytes have to be processed for ORDER BY operation, the behavior will be determined by the 'sort_overflow_mode' which by default is - throw an exception", 0) \
M(OverflowMode, sort_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
M(UInt64, max_bytes_before_external_sort, 0, "If memory usage during ORDER BY operation is exceeding this threshold in bytes, activate the 'external sorting' mode (spill data to disk). Recommended value is half of available system memory.", 0) \
M(UInt64, max_bytes_before_remerge_sort, 1000000000, "In case of ORDER BY with LIMIT, when memory usage is higher than specified threshold, perform additional steps of merging blocks before final merge to keep just top LIMIT rows.", 0) \
Expand All @@ -384,8 +384,10 @@ class IColumn;
M(OverflowMode, result_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
\
/* TODO: Check also when merging and finalizing aggregate functions. */ \
M(Seconds, max_execution_time, 0, "If query run time exceeded the specified number of seconds, the behavior will be determined by the 'timeout_overflow_mode' which by default is - throw an exception. Note that the timeout is checked and query can stop only in designated places during data processing. It currently cannot stop during merging of aggregation states or during query analysis, and the actual run time will be higher than the value of this setting.", 0) \
M(Seconds, max_execution_time, 0, "If query runtime exceeds the specified number of seconds, the behavior will be determined by the 'timeout_overflow_mode', which by default is - throw an exception. Note that the timeout is checked and query can stop only in designated places during data processing. It currently cannot stop during merging of aggregation states or during query analysis, and the actual run time will be higher than the value of this setting.", 0) \
M(OverflowMode, timeout_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
M(Seconds, max_execution_time_leaf, 0, "Similar semantic to max_execution_time but only apply on leaf node for distributed queries, the time out behavior will be determined by 'timeout_overflow_mode_leaf' which by default is - throw an exception", 0) \
M(OverflowMode, timeout_overflow_mode_leaf, OverflowMode::THROW, "What to do when the leaf limit is exceeded.", 0) \
canhld94 marked this conversation as resolved.
Show resolved Hide resolved
\
M(UInt64, min_execution_speed, 0, "Minimum number of execution rows per second.", 0) \
M(UInt64, max_execution_speed, 0, "Maximum number of execution rows per second.", 0) \
Expand All @@ -399,7 +401,7 @@ class IColumn;
\
M(UInt64, max_sessions_for_user, 0, "Maximum number of simultaneous sessions for a user.", 0) \
\
M(UInt64, max_subquery_depth, 100, "If a query has more than specified number of nested subqueries, throw an exception. This allows you to have a sanity check to protect the users of your cluster from going insane with their queries.", 0) \
M(UInt64, max_subquery_depth, 100, "If a query has more than the specified number of nested subqueries, throw an exception. This allows you to have a sanity check to protect the users of your cluster from going insane with their queries.", 0) \
M(UInt64, max_analyze_depth, 5000, "Maximum number of analyses performed by interpreter.", 0) \
M(UInt64, max_ast_depth, 1000, "Maximum depth of query syntax tree. Checked after parsing.", 0) \
M(UInt64, max_ast_elements, 50000, "Maximum size of query syntax tree in number of nodes. Checked after parsing.", 0) \
Expand Down
13 changes: 8 additions & 5 deletions src/Interpreters/ClusterProxy/SelectStreamFactory.cpp
@@ -1,27 +1,29 @@
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
#include <Interpreters/Cluster.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Common/checkStackSize.h>
#include <Common/logger_useful.h>
#include <Common/FailPoint.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Interpreters/RequiredSourceColumnsVisitor.h>
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
#include <DataTypes/ObjectUtils.h>

#include <Client/IConnections.h>
#include <Common/logger_useful.h>
#include <Common/FailPoint.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ReadFromRemote.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/DistributedCreateLocalPlan.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>


namespace ProfileEvents
{
extern const Event DistributedConnectionMissingTable;
Expand Down Expand Up @@ -121,6 +123,7 @@ void SelectStreamFactory::createForShard(
if (it != objects_by_shard.end())
replaceMissedSubcolumnsByConstants(storage_snapshot->object_columns, it->second, query_ast);


auto emplace_local_stream = [&]()
{
local_plans.emplace_back(createLocalPlan(
Expand Down
8 changes: 8 additions & 0 deletions src/Interpreters/ClusterProxy/executeQuery.cpp
Expand Up @@ -141,6 +141,14 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster,
new_settings.allow_experimental_parallel_reading_from_replicas = false;
}

if (settings.max_execution_time_leaf.value > 0)
{
/// Replace 'max_execution_time' of this sub-query with 'max_execution_time_leaf' and 'timeout_overflow_mode'
/// with 'timeout_overflow_mode_leaf'
new_settings.max_execution_time = settings.max_execution_time_leaf;
new_settings.timeout_overflow_mode = settings.timeout_overflow_mode_leaf;
}

auto new_context = Context::createCopy(context);
new_context->setSettings(new_settings);
return new_context;
Expand Down
Empty file.
4 changes: 4 additions & 0 deletions tests/queries/0_stateless/02786_max_execution_time_leaf.sql
@@ -0,0 +1,4 @@
-- Tags: no-fasttest
SELECT count() FROM cluster('test_cluster_two_shards', view( SELECT * FROM numbers(100000000000) )) SETTINGS max_execution_time_leaf = 1; -- { serverError 159 }
-- Can return partial result
SELECT count() FROM cluster('test_cluster_two_shards', view( SELECT * FROM numbers(100000000000) )) FORMAT Null SETTINGS max_execution_time_leaf = 1, timeout_overflow_mode_leaf = 'break';