From e547db0a8c7028fbddfd5224c5ce2a89ac05c009 Mon Sep 17 00:00:00 2001 From: Gagan Goel Date: Fri, 24 Nov 2023 21:08:18 -0500 Subject: [PATCH 1/5] Issue 43666: Add skip_unavailable_shards as a setting for Distributed table. This setting, when enabled (disabled by default), allows ClickHouse to silently skip unavailable shards of a Distributed table during a query execution, instead of throwing an exception to the client. --- src/Core/Settings.h | 2 +- .../ClusterProxy/executeQuery.cpp | 14 ++++++++-- src/Interpreters/ClusterProxy/executeQuery.h | 5 +++- .../Distributed/DistributedSettings.h | 2 ++ src/Storages/StorageDistributed.cpp | 1 + ...tributed_skip_unavailable_shards.reference | 1 + ...16_distributed_skip_unavailable_shards.sql | 28 +++++++++++++++++++ 7 files changed, 49 insertions(+), 4 deletions(-) create mode 100644 tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.reference create mode 100644 tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.sql diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 7750bd4a0929..d5ce26e4d05b 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -185,7 +185,7 @@ class IColumn; M(Bool, parallel_replicas_for_non_replicated_merge_tree, false, "If true, ClickHouse will use parallel replicas algorithm also for non-replicated MergeTree tables", 0) \ M(UInt64, parallel_replicas_min_number_of_rows_per_replica, 0, "Limit the number of replicas used in a query to (estimated rows to read / min_number_of_rows_per_replica). The max is still limited by 'max_parallel_replicas'", 0) \ \ - M(Bool, skip_unavailable_shards, false, "If true, ClickHouse silently skips unavailable shards and nodes unresolvable through DNS. Shard is marked as unavailable when none of the replicas can be reached.", 0) \ + M(Bool, skip_unavailable_shards, false, "If true, ClickHouse silently skips unavailable shards. Shard is marked as unavailable when: 1) The shard cannot be reached due to a connection failure. 2) Shard is unresolvable through DNS. 3) Table does not exist on the shard.", 0) \ \ M(UInt64, parallel_distributed_insert_select, 0, "Process distributed INSERT SELECT query in the same cluster on local tables on every shard; if set to 1 - SELECT is executed on each shard; if set to 2 - SELECT and INSERT are executed on each shard", 0) \ M(UInt64, distributed_group_by_no_merge, 0, "If 1, Do not merge aggregation states from different servers for distributed queries (shards will process query up to the Complete stage, initiator just proxies the data from the shards). If 2 the initiator will apply ORDER BY and LIMIT stages (it is not in case when shard process query up to the Complete stage)", 0) \ diff --git a/src/Interpreters/ClusterProxy/executeQuery.cpp b/src/Interpreters/ClusterProxy/executeQuery.cpp index 8a2f7e3205a2..492af6d0aa99 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.cpp +++ b/src/Interpreters/ClusterProxy/executeQuery.cpp @@ -21,6 +21,7 @@ #include #include #include +#include namespace DB @@ -40,7 +41,8 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster, const Settings & settings, const StorageID & main_table, ASTPtr additional_filter_ast, - Poco::Logger * log) + Poco::Logger * log, + const DistributedSettings * distributed_settings) { Settings new_settings = settings; new_settings.queue_max_wait_ms = Cluster::saturate(new_settings.queue_max_wait_ms, settings.max_execution_time); @@ -100,6 +102,12 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster, } } + if (!settings.skip_unavailable_shards.changed && distributed_settings) + { + new_settings.skip_unavailable_shards = distributed_settings->skip_unavailable_shards.value; + new_settings.skip_unavailable_shards.changed = true; + } + if (settings.offset) { new_settings.offset = 0; @@ -193,6 +201,7 @@ void executeQuery( const ExpressionActionsPtr & sharding_key_expr, const std::string & sharding_key_column_name, const ClusterPtr & not_optimized_cluster, + const DistributedSettings & distributed_settings, AdditionalShardFilterGenerator shard_filter_generator) { const Settings & settings = context->getSettingsRef(); @@ -204,7 +213,8 @@ void executeQuery( SelectStreamFactory::Shards remote_shards; auto cluster = query_info.getCluster(); - auto new_context = updateSettingsForCluster(*cluster, context, settings, main_table, query_info.additional_filter_ast, log); + auto new_context = updateSettingsForCluster(*cluster, context, settings, main_table, query_info.additional_filter_ast, log, + &distributed_settings); if (context->getSettingsRef().allow_experimental_parallel_reading_from_replicas && context->getSettingsRef().allow_experimental_parallel_reading_from_replicas.value != new_context->getSettingsRef().allow_experimental_parallel_reading_from_replicas.value) diff --git a/src/Interpreters/ClusterProxy/executeQuery.h b/src/Interpreters/ClusterProxy/executeQuery.h index 7ffaa3ae62c3..79df6065ff20 100644 --- a/src/Interpreters/ClusterProxy/executeQuery.h +++ b/src/Interpreters/ClusterProxy/executeQuery.h @@ -8,6 +8,7 @@ namespace DB { struct Settings; +struct DistributedSettings; class Cluster; using ClusterPtr = std::shared_ptr; struct SelectQueryInfo; @@ -42,7 +43,8 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster, const Settings & settings, const StorageID & main_table, ASTPtr additional_filter_ast = nullptr, - Poco::Logger * log = nullptr); + Poco::Logger * log = nullptr, + const DistributedSettings * distributed_settings = nullptr); using AdditionalShardFilterGenerator = std::function; /// Execute a distributed query, creating a query plan, from which the query pipeline can be built. @@ -62,6 +64,7 @@ void executeQuery( const ExpressionActionsPtr & sharding_key_expr, const std::string & sharding_key_column_name, const ClusterPtr & not_optimized_cluster, + const DistributedSettings & distributed_settings, AdditionalShardFilterGenerator shard_filter_generator = {}); diff --git a/src/Storages/Distributed/DistributedSettings.h b/src/Storages/Distributed/DistributedSettings.h index 8f05d43e3f03..a326e6310dc1 100644 --- a/src/Storages/Distributed/DistributedSettings.h +++ b/src/Storages/Distributed/DistributedSettings.h @@ -17,6 +17,8 @@ class ASTStorage; #define LIST_OF_DISTRIBUTED_SETTINGS(M, ALIAS) \ M(Bool, fsync_after_insert, false, "Do fsync for every inserted. Will decreases performance of inserts (only for background INSERT, i.e. distributed_foreground_insert=false)", 0) \ M(Bool, fsync_directories, false, "Do fsync for temporary directory (that is used for background INSERT only) after all part operations (writes, renames, etc.).", 0) \ + /** This is the distributed version of the skip_unavailable_shards setting available in src/Core/Settings.h */ \ + M(Bool, skip_unavailable_shards, false, "If true, ClickHouse silently skips unavailable shards. Shard is marked as unavailable when: 1) The shard cannot be reached due to a connection failure. 2) Shard is unresolvable through DNS. 3) Table does not exist on the shard.", 0) \ /** Inserts settings. */ \ M(UInt64, bytes_to_throw_insert, 0, "If more than this number of compressed bytes will be pending for background INSERT, an exception will be thrown. 0 - do not throw.", 0) \ M(UInt64, bytes_to_delay_insert, 0, "If more than this number of compressed bytes will be pending for background INSERT, the query will be delayed. 0 - do not delay.", 0) \ diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 5e351d1a85e1..57e8ebc0a1f4 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -921,6 +921,7 @@ void StorageDistributed::read( sharding_key_expr, sharding_key_column_name, query_info.cluster, + distributed_settings, additional_shard_filter_generator); /// This is a bug, it is possible only when there is no shards to query, and this is handled earlier. diff --git a/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.reference b/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.reference new file mode 100644 index 000000000000..77fc99a2f2fe --- /dev/null +++ b/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.reference @@ -0,0 +1 @@ +1234 abcd 1 diff --git a/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.sql b/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.sql new file mode 100644 index 000000000000..9441985cad94 --- /dev/null +++ b/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.sql @@ -0,0 +1,28 @@ +-- Tags: shard, no-fasttest + +DROP TABLE IF EXISTS table_02916; +DROP TABLE IF EXISTS table_02916_distributed; + +CREATE TABLE table_02916 +( + `ID` UInt32, + `Name` String +) +ENGINE = MergeTree +ORDER BY ID; + +INSERT INTO table_02916 VALUES (1234, 'abcd'); + +CREATE TABLE table_02916_distributed +( + `ID` UInt32, + `Name` String +) +ENGINE = Distributed(test_unavailable_shard, currentDatabase(), table_02916, rand()) +SETTINGS skip_unavailable_shards = 1; + +SELECT *, _shard_num FROM table_02916_distributed; +--SELECT *, _shard_num FROM table_02916_distributed SETTINGS skip_unavailable_shards=0; -- { serverError ALL_CONNECTION_TRIES_FAILED } + +DROP TABLE table_02916_distributed; +DROP TABLE table_02916; From d4e8c0558302b2a006cac17976558e2617ddd982 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 10 Dec 2023 21:16:03 +0300 Subject: [PATCH 2/5] Update 02916_distributed_skip_unavailable_shards.sql --- .../0_stateless/02916_distributed_skip_unavailable_shards.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.sql b/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.sql index 9441985cad94..ac57ecadb591 100644 --- a/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.sql +++ b/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.sql @@ -22,7 +22,7 @@ ENGINE = Distributed(test_unavailable_shard, currentDatabase(), table_02916, ran SETTINGS skip_unavailable_shards = 1; SELECT *, _shard_num FROM table_02916_distributed; ---SELECT *, _shard_num FROM table_02916_distributed SETTINGS skip_unavailable_shards=0; -- { serverError ALL_CONNECTION_TRIES_FAILED } +SELECT *, _shard_num FROM table_02916_distributed SETTINGS skip_unavailable_shards=0; -- { serverError ALL_CONNECTION_TRIES_FAILED } DROP TABLE table_02916_distributed; DROP TABLE table_02916; From ec05335180693877420072e4a14bb810b5f95f86 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 10 Dec 2023 23:34:35 +0300 Subject: [PATCH 3/5] Update 02916_distributed_skip_unavailable_shards.sql --- .../0_stateless/02916_distributed_skip_unavailable_shards.sql | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.sql b/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.sql index ac57ecadb591..e2a1f1be9d81 100644 --- a/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.sql +++ b/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.sql @@ -1,5 +1,7 @@ -- Tags: shard, no-fasttest +SET prefer_localhost_replica = 0; -- Always do network communication to check if the shard is unavailable. + DROP TABLE IF EXISTS table_02916; DROP TABLE IF EXISTS table_02916_distributed; From 229a044d75a6d27c0df86bdf477184a78d93ca93 Mon Sep 17 00:00:00 2001 From: Gagan Goel Date: Sun, 10 Dec 2023 23:59:51 -0500 Subject: [PATCH 4/5] Update 02916_distributed_skip_unavailable_shards.reference. --- .../02916_distributed_skip_unavailable_shards.reference | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.reference b/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.reference index 77fc99a2f2fe..15430c784fc4 100644 --- a/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.reference +++ b/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.reference @@ -1 +1,2 @@ 1234 abcd 1 +1234 abcd 1 From bae58febf3b2573a745243ae5ed016aae13b3d49 Mon Sep 17 00:00:00 2001 From: Gagan Goel Date: Tue, 12 Dec 2023 17:28:35 -0500 Subject: [PATCH 5/5] Update 02916_distributed_skip_unavailable_shards.sql/.reference files. --- .../02916_distributed_skip_unavailable_shards.reference | 1 - .../0_stateless/02916_distributed_skip_unavailable_shards.sql | 4 +--- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.reference b/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.reference index 15430c784fc4..77fc99a2f2fe 100644 --- a/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.reference +++ b/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.reference @@ -1,2 +1 @@ 1234 abcd 1 -1234 abcd 1 diff --git a/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.sql b/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.sql index e2a1f1be9d81..48a1294982db 100644 --- a/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.sql +++ b/tests/queries/0_stateless/02916_distributed_skip_unavailable_shards.sql @@ -1,7 +1,5 @@ -- Tags: shard, no-fasttest -SET prefer_localhost_replica = 0; -- Always do network communication to check if the shard is unavailable. - DROP TABLE IF EXISTS table_02916; DROP TABLE IF EXISTS table_02916_distributed; @@ -23,8 +21,8 @@ CREATE TABLE table_02916_distributed ENGINE = Distributed(test_unavailable_shard, currentDatabase(), table_02916, rand()) SETTINGS skip_unavailable_shards = 1; +SET send_logs_level='fatal'; SELECT *, _shard_num FROM table_02916_distributed; -SELECT *, _shard_num FROM table_02916_distributed SETTINGS skip_unavailable_shards=0; -- { serverError ALL_CONNECTION_TRIES_FAILED } DROP TABLE table_02916_distributed; DROP TABLE table_02916;