From 932327aa46d739da169edb9cdf6e035f00cd0304 Mon Sep 17 00:00:00 2001 From: Gagan Goel Date: Fri, 24 Nov 2023 21:08:18 -0500 Subject: [PATCH] Issue 43666: Add skip_unavailable_shards as a setting for Distributed table. This setting, when enabled (disabled by default), allows ClickHouse to silently skip unavailable shards of a Distributed table during a query execution, instead of throwing an exception to the client. --- src/Core/Settings.h | 2 +- .../ClusterProxy/executeQuery.cpp | 14 ++++++++-- src/Interpreters/ClusterProxy/executeQuery.h | 5 +++- .../Distributed/DistributedSettings.h | 2 ++ src/Storages/StorageDistributed.cpp | 3 +- ...tributed_skip_unavailable_shards.reference | 2 ++ ...16_distributed_skip_unavailable_shards.sql | 28 +++++++++++++++++++ 7 files changed, 51 insertions(+), 5 deletions(-) create mode 100644 tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.reference create mode 100644 tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.sql diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 3b90a3e068b8..f00c6993906d 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -179,7 +179,7 @@ class IColumn; M(Bool, parallel_replicas_for_non_replicated_merge_tree, false, "If true, ClickHouse will use parallel replicas algorithm also for non-replicated MergeTree tables", 0) \ M(UInt64, parallel_replicas_min_number_of_rows_per_replica, 0, "Limit the number of replicas used in a query to (estimated rows to read / min_number_of_rows_per_replica). The max is still limited by 'max_parallel_replicas'", 0) \ \ - M(Bool, skip_unavailable_shards, false, "If true, ClickHouse silently skips unavailable shards and nodes unresolvable through DNS. Shard is marked as unavailable when none of the replicas can be reached.", 0) \ + M(Bool, skip_unavailable_shards, false, "If true, ClickHouse silently skips unavailable shards. Shard is marked as unavailable when: 1) The shard cannot be reached due to a connection failure. 2) Shard is unresolvable through DNS. 3) Table does not exist on the shard.", 0) \ \ M(UInt64, parallel_distributed_insert_select, 0, "Process distributed INSERT SELECT query in the same cluster on local tables on every shard; if set to 1 - SELECT is executed on each shard; if set to 2 - SELECT and INSERT are executed on each shard", 0) \ M(UInt64, distributed_group_by_no_merge, 0, "If 1, Do not merge aggregation states from different servers for distributed queries (shards will process query up to the Complete stage, initiator just proxies the data from the shards). If 2 the initiator will apply ORDER BY and LIMIT stages (it is not in case when shard process query up to the Complete stage)", 0) \ diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index 41235d107cd5..30cb48b1a409 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -21,6 +21,7 @@ #include #include #include +#include namespace DB @@ -40,7 +41,8 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster, const Settings & settings, const StorageID & main_table, ASTPtr additional_filter_ast, - Poco::Logger * log) + Poco::Logger * log, + const DistributedSettings * distributed_settings) { Settings new_settings = settings; new_settings.queue_max_wait_ms = Cluster::saturate(new_settings.queue_max_wait_ms, settings.max_execution_time); @@ -100,6 +102,12 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster, } } + if (!settings.skip_unavailable_shards.changed && distributed_settings) + { + new_settings.skip_unavailable_shards = distributed_settings->skip_unavailable_shards.value; + new_settings.skip_unavailable_shards.changed = true; + } + if (settings.offset) { new_settings.offset = 0; @@ -182,6 +190,7 @@ void executeQuery( const ExpressionActionsPtr & sharding_key_expr, const std::string & sharding_key_column_name, const ClusterPtr & not_optimized_cluster, + const DistributedSettings & distributed_settings, AdditionalShardFilterGenerator shard_filter_generator) { const Settings & settings = context->getSettingsRef(); @@ -193,7 +202,8 @@ void executeQuery( SelectStreamFactory::Shards remote_shards; auto cluster = query_info.getCluster(); - auto new_context = updateSettingsForCluster(*cluster, context, settings, main_table, query_info.additional_filter_ast, log); + auto new_context = updateSettingsForCluster(*cluster, context, settings, main_table, query_info.additional_filter_ast, log, + &distributed_settings); if (context->getSettingsRef().allow_experimental_parallel_reading_from_replicas && context->getSettingsRef().allow_experimental_parallel_reading_from_replicas.value != new_context->getSettingsRef().allow_experimental_parallel_reading_from_replicas.value) diff --git a/src/Interpreters/ClusterProxy/executeQuery.h b/src/Interpreters/ClusterProxy/executeQuery.h index bdca75be31db..a8a6ec7b2134 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.h +++ b/src/Interpreters/ClusterProxy/executeQuery.h @@ -8,6 +8,7 @@ namespace DB { struct Settings; +struct DistributedSettings; class Cluster; using ClusterPtr = std::shared_ptr; struct SelectQueryInfo; @@ -42,7 +43,8 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster, const Settings & settings, const StorageID & main_table, ASTPtr additional_filter_ast = nullptr, - Poco::Logger * log = nullptr); + Poco::Logger * log = nullptr, + const DistributedSettings * distributed_settings = nullptr); using AdditionalShardFilterGenerator = std::function; /// Execute a distributed query, creating a query plan, from which the query pipeline can be built. @@ -59,6 +61,7 @@ void executeQuery( const ExpressionActionsPtr & sharding_key_expr, const std::string & sharding_key_column_name, const ClusterPtr & not_optimized_cluster, + const DistributedSettings & distributed_settings, AdditionalShardFilterGenerator shard_filter_generator = {}); diff --git a/src/Storages/Distributed/DistributedSettings.h b/src/Storages/Distributed/DistributedSettings.h index 8f05d43e3f03..a326e6310dc1 100644 --- a/src/Storages/Distributed/DistributedSettings.h +++ b/src/Storages/Distributed/DistributedSettings.h @@ -17,6 +17,8 @@ class ASTStorage; #define LIST_OF_DISTRIBUTED_SETTINGS(M, ALIAS) \ M(Bool, fsync_after_insert, false, "Do fsync for every inserted. Will decreases performance of inserts (only for background INSERT, i.e. distributed_foreground_insert=false)", 0) \ M(Bool, fsync_directories, false, "Do fsync for temporary directory (that is used for background INSERT only) after all part operations (writes, renames, etc.).", 0) \ + /** This is the distributed version of the skip_unavailable_shards setting available in src/Core/Settings.h */ \ + M(Bool, skip_unavailable_shards, false, "If true, ClickHouse silently skips unavailable shards. Shard is marked as unavailable when: 1) The shard cannot be reached due to a connection failure. 2) Shard is unresolvable through DNS. 3) Table does not exist on the shard.", 0) \ /** Inserts settings. */ \ M(UInt64, bytes_to_throw_insert, 0, "If more than this number of compressed bytes will be pending for background INSERT, an exception will be thrown. 0 - do not throw.", 0) \ M(UInt64, bytes_to_delay_insert, 0, "If more than this number of compressed bytes will be pending for background INSERT, the query will be delayed. 0 - do not delay.", 0) \ diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 7e989eb9ab2a..d55b5f9d4528 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -910,7 +910,8 @@ void StorageDistributed::read( select_stream_factory, log, modified_query_ast, local_context, query_info, sharding_key_expr, sharding_key_column_name, - query_info.cluster, additional_shard_filter_generator); + query_info.cluster, distributed_settings, + additional_shard_filter_generator); /// This is a bug, it is possible only when there is no shards to query, and this is handled earlier. if (!query_plan.isInitialized()) diff --git a/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.reference b/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.reference new file mode 100644 index 000000000000..15430c784fc4 --- /dev/null +++ b/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.reference @@ -0,0 +1,2 @@ +1234 abcd 1 +1234 abcd 1 diff --git a/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.sql b/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.sql new file mode 100644 index 000000000000..8b8e159a45e6 --- /dev/null +++ b/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.sql @@ -0,0 +1,28 @@ +-- Tags: shard, long + +DROP TABLE IF EXISTS table_02916; +DROP TABLE IF EXISTS table_02916_distributed; + +CREATE TABLE table_02916 +( + `ID` UInt32, + `Name` String +) +ENGINE = MergeTree +ORDER BY ID; + +INSERT INTO table_02916 VALUES (1234, 'abcd'); + +CREATE TABLE table_02916_distributed +( + `ID` UInt32, + `Name` String +) +ENGINE = Distributed(test_unavailable_shard, currentDatabase(), table_02916, rand()) +SETTINGS skip_unavailable_shards = 1; + +SELECT *, _shard_num FROM table_02916_distributed; +SELECT *, _shard_num FROM table_02916_distributed SETTINGS skip_unavailable_shards=0; -- { serverError ALL_CONNECTION_TRIES_FAILED } + +DROP TABLE table_02916_distributed; +DROP TABLE table_02916;