Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 43666: Add skip_unavailable_shards as a setting for Distributed table. #57218

Merged
merged 5 commits into from Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Core/Settings.h
Expand Up @@ -185,7 +185,7 @@ class IColumn;
M(Bool, parallel_replicas_for_non_replicated_merge_tree, false, "If true, ClickHouse will use parallel replicas algorithm also for non-replicated MergeTree tables", 0) \
M(UInt64, parallel_replicas_min_number_of_rows_per_replica, 0, "Limit the number of replicas used in a query to (estimated rows to read / min_number_of_rows_per_replica). The max is still limited by 'max_parallel_replicas'", 0) \
\
M(Bool, skip_unavailable_shards, false, "If true, ClickHouse silently skips unavailable shards and nodes unresolvable through DNS. Shard is marked as unavailable when none of the replicas can be reached.", 0) \
M(Bool, skip_unavailable_shards, false, "If true, ClickHouse silently skips unavailable shards. Shard is marked as unavailable when: 1) The shard cannot be reached due to a connection failure. 2) Shard is unresolvable through DNS. 3) Table does not exist on the shard.", 0) \
\
M(UInt64, parallel_distributed_insert_select, 0, "Process distributed INSERT SELECT query in the same cluster on local tables on every shard; if set to 1 - SELECT is executed on each shard; if set to 2 - SELECT and INSERT are executed on each shard", 0) \
M(UInt64, distributed_group_by_no_merge, 0, "If 1, Do not merge aggregation states from different servers for distributed queries (shards will process query up to the Complete stage, initiator just proxies the data from the shards). If 2 the initiator will apply ORDER BY and LIMIT stages (it is not in case when shard process query up to the Complete stage)", 0) \
Expand Down
14 changes: 12 additions & 2 deletions src/Interpreters/ClusterProxy/executeQuery.cpp
Expand Up @@ -21,6 +21,7 @@
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/Distributed/DistributedSettings.h>


namespace DB
Expand All @@ -40,7 +41,8 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster,
const Settings & settings,
const StorageID & main_table,
ASTPtr additional_filter_ast,
Poco::Logger * log)
Poco::Logger * log,
const DistributedSettings * distributed_settings)
{
Settings new_settings = settings;
new_settings.queue_max_wait_ms = Cluster::saturate(new_settings.queue_max_wait_ms, settings.max_execution_time);
Expand Down Expand Up @@ -100,6 +102,12 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster,
}
}

if (!settings.skip_unavailable_shards.changed && distributed_settings)
{
new_settings.skip_unavailable_shards = distributed_settings->skip_unavailable_shards.value;
new_settings.skip_unavailable_shards.changed = true;
}

if (settings.offset)
{
new_settings.offset = 0;
Expand Down Expand Up @@ -193,6 +201,7 @@ void executeQuery(
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const ClusterPtr & not_optimized_cluster,
const DistributedSettings & distributed_settings,
AdditionalShardFilterGenerator shard_filter_generator)
{
const Settings & settings = context->getSettingsRef();
Expand All @@ -204,7 +213,8 @@ void executeQuery(
SelectStreamFactory::Shards remote_shards;

auto cluster = query_info.getCluster();
auto new_context = updateSettingsForCluster(*cluster, context, settings, main_table, query_info.additional_filter_ast, log);
auto new_context = updateSettingsForCluster(*cluster, context, settings, main_table, query_info.additional_filter_ast, log,
&distributed_settings);
if (context->getSettingsRef().allow_experimental_parallel_reading_from_replicas
&& context->getSettingsRef().allow_experimental_parallel_reading_from_replicas.value
!= new_context->getSettingsRef().allow_experimental_parallel_reading_from_replicas.value)
Expand Down
5 changes: 4 additions & 1 deletion src/Interpreters/ClusterProxy/executeQuery.h
Expand Up @@ -8,6 +8,7 @@ namespace DB
{

struct Settings;
struct DistributedSettings;
class Cluster;
using ClusterPtr = std::shared_ptr<Cluster>;
struct SelectQueryInfo;
Expand Down Expand Up @@ -42,7 +43,8 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster,
const Settings & settings,
const StorageID & main_table,
ASTPtr additional_filter_ast = nullptr,
Poco::Logger * log = nullptr);
Poco::Logger * log = nullptr,
const DistributedSettings * distributed_settings = nullptr);

using AdditionalShardFilterGenerator = std::function<ASTPtr(uint64_t)>;
/// Execute a distributed query, creating a query plan, from which the query pipeline can be built.
Expand All @@ -62,6 +64,7 @@ void executeQuery(
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const ClusterPtr & not_optimized_cluster,
const DistributedSettings & distributed_settings,
AdditionalShardFilterGenerator shard_filter_generator = {});


Expand Down
2 changes: 2 additions & 0 deletions src/Storages/Distributed/DistributedSettings.h
Expand Up @@ -17,6 +17,8 @@ class ASTStorage;
#define LIST_OF_DISTRIBUTED_SETTINGS(M, ALIAS) \
M(Bool, fsync_after_insert, false, "Do fsync for every inserted. Will decreases performance of inserts (only for background INSERT, i.e. distributed_foreground_insert=false)", 0) \
M(Bool, fsync_directories, false, "Do fsync for temporary directory (that is used for background INSERT only) after all part operations (writes, renames, etc.).", 0) \
/** This is the distributed version of the skip_unavailable_shards setting available in src/Core/Settings.h */ \
M(Bool, skip_unavailable_shards, false, "If true, ClickHouse silently skips unavailable shards. Shard is marked as unavailable when: 1) The shard cannot be reached due to a connection failure. 2) Shard is unresolvable through DNS. 3) Table does not exist on the shard.", 0) \
/** Inserts settings. */ \
M(UInt64, bytes_to_throw_insert, 0, "If more than this number of compressed bytes will be pending for background INSERT, an exception will be thrown. 0 - do not throw.", 0) \
M(UInt64, bytes_to_delay_insert, 0, "If more than this number of compressed bytes will be pending for background INSERT, the query will be delayed. 0 - do not delay.", 0) \
Expand Down
1 change: 1 addition & 0 deletions src/Storages/StorageDistributed.cpp
Expand Up @@ -921,6 +921,7 @@ void StorageDistributed::read(
sharding_key_expr,
sharding_key_column_name,
query_info.cluster,
distributed_settings,
additional_shard_filter_generator);

/// This is a bug, it is possible only when there is no shards to query, and this is handled earlier.
Expand Down
@@ -0,0 +1 @@
1234 abcd 1
@@ -0,0 +1,28 @@
-- Tags: shard, no-fasttest

DROP TABLE IF EXISTS table_02916;
DROP TABLE IF EXISTS table_02916_distributed;

CREATE TABLE table_02916
(
`ID` UInt32,
`Name` String
)
ENGINE = MergeTree
ORDER BY ID;

INSERT INTO table_02916 VALUES (1234, 'abcd');

CREATE TABLE table_02916_distributed
(
`ID` UInt32,
`Name` String
)
ENGINE = Distributed(test_unavailable_shard, currentDatabase(), table_02916, rand())
SETTINGS skip_unavailable_shards = 1;

SELECT *, _shard_num FROM table_02916_distributed;
SELECT *, _shard_num FROM table_02916_distributed SETTINGS skip_unavailable_shards=0; -- { serverError ALL_CONNECTION_TRIES_FAILED }

DROP TABLE table_02916_distributed;
DROP TABLE table_02916;