Skip to content

Commit

Permalink
Merge pull request #57218 from tntnatbry/issue-43666
Browse files Browse the repository at this point in the history
Issue 43666: Add skip_unavailable_shards as a setting for Distributed table.
  • Loading branch information
alexey-milovidov committed Dec 18, 2023
2 parents f503ea2 + bae58fe commit 58396c5
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 4 deletions.
2 changes: 1 addition & 1 deletion src/Core/Settings.h
Expand Up @@ -186,7 +186,7 @@ class IColumn;
M(Bool, parallel_replicas_for_non_replicated_merge_tree, false, "If true, ClickHouse will use parallel replicas algorithm also for non-replicated MergeTree tables", 0) \
M(UInt64, parallel_replicas_min_number_of_rows_per_replica, 0, "Limit the number of replicas used in a query to (estimated rows to read / min_number_of_rows_per_replica). The max is still limited by 'max_parallel_replicas'", 0) \
\
M(Bool, skip_unavailable_shards, false, "If true, ClickHouse silently skips unavailable shards and nodes unresolvable through DNS. Shard is marked as unavailable when none of the replicas can be reached.", 0) \
M(Bool, skip_unavailable_shards, false, "If true, ClickHouse silently skips unavailable shards. Shard is marked as unavailable when: 1) The shard cannot be reached due to a connection failure. 2) Shard is unresolvable through DNS. 3) Table does not exist on the shard.", 0) \
\
M(UInt64, parallel_distributed_insert_select, 0, "Process distributed INSERT SELECT query in the same cluster on local tables on every shard; if set to 1 - SELECT is executed on each shard; if set to 2 - SELECT and INSERT are executed on each shard", 0) \
M(UInt64, distributed_group_by_no_merge, 0, "If 1, Do not merge aggregation states from different servers for distributed queries (shards will process query up to the Complete stage, initiator just proxies the data from the shards). If 2 the initiator will apply ORDER BY and LIMIT stages (it is not in case when shard process query up to the Complete stage)", 0) \
Expand Down
14 changes: 12 additions & 2 deletions src/Interpreters/ClusterProxy/executeQuery.cpp
Expand Up @@ -21,6 +21,7 @@
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/Distributed/DistributedSettings.h>


namespace DB
Expand All @@ -41,7 +42,8 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster,
const Settings & settings,
const StorageID & main_table,
ASTPtr additional_filter_ast,
Poco::Logger * log)
Poco::Logger * log,
const DistributedSettings * distributed_settings)
{
Settings new_settings = settings;
new_settings.queue_max_wait_ms = Cluster::saturate(new_settings.queue_max_wait_ms, settings.max_execution_time);
Expand Down Expand Up @@ -101,6 +103,12 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster,
}
}

if (!settings.skip_unavailable_shards.changed && distributed_settings)
{
new_settings.skip_unavailable_shards = distributed_settings->skip_unavailable_shards.value;
new_settings.skip_unavailable_shards.changed = true;
}

if (settings.offset)
{
new_settings.offset = 0;
Expand Down Expand Up @@ -194,6 +202,7 @@ void executeQuery(
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const ClusterPtr & not_optimized_cluster,
const DistributedSettings & distributed_settings,
AdditionalShardFilterGenerator shard_filter_generator)
{
const Settings & settings = context->getSettingsRef();
Expand All @@ -205,7 +214,8 @@ void executeQuery(
SelectStreamFactory::Shards remote_shards;

auto cluster = query_info.getCluster();
auto new_context = updateSettingsForCluster(*cluster, context, settings, main_table, query_info.additional_filter_ast, log);
auto new_context = updateSettingsForCluster(*cluster, context, settings, main_table, query_info.additional_filter_ast, log,
&distributed_settings);
if (context->getSettingsRef().allow_experimental_parallel_reading_from_replicas
&& context->getSettingsRef().allow_experimental_parallel_reading_from_replicas.value
!= new_context->getSettingsRef().allow_experimental_parallel_reading_from_replicas.value)
Expand Down
5 changes: 4 additions & 1 deletion src/Interpreters/ClusterProxy/executeQuery.h
Expand Up @@ -8,6 +8,7 @@ namespace DB
{

struct Settings;
struct DistributedSettings;
class Cluster;
using ClusterPtr = std::shared_ptr<Cluster>;
struct SelectQueryInfo;
Expand Down Expand Up @@ -42,7 +43,8 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster,
const Settings & settings,
const StorageID & main_table,
ASTPtr additional_filter_ast = nullptr,
Poco::Logger * log = nullptr);
Poco::Logger * log = nullptr,
const DistributedSettings * distributed_settings = nullptr);

using AdditionalShardFilterGenerator = std::function<ASTPtr(uint64_t)>;
/// Execute a distributed query, creating a query plan, from which the query pipeline can be built.
Expand All @@ -62,6 +64,7 @@ void executeQuery(
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const ClusterPtr & not_optimized_cluster,
const DistributedSettings & distributed_settings,
AdditionalShardFilterGenerator shard_filter_generator = {});


Expand Down
2 changes: 2 additions & 0 deletions src/Storages/Distributed/DistributedSettings.h
Expand Up @@ -17,6 +17,8 @@ class ASTStorage;
#define LIST_OF_DISTRIBUTED_SETTINGS(M, ALIAS) \
M(Bool, fsync_after_insert, false, "Do fsync for every inserted. Will decreases performance of inserts (only for background INSERT, i.e. distributed_foreground_insert=false)", 0) \
M(Bool, fsync_directories, false, "Do fsync for temporary directory (that is used for background INSERT only) after all part operations (writes, renames, etc.).", 0) \
/** This is the distributed version of the skip_unavailable_shards setting available in src/Core/Settings.h */ \
M(Bool, skip_unavailable_shards, false, "If true, ClickHouse silently skips unavailable shards. Shard is marked as unavailable when: 1) The shard cannot be reached due to a connection failure. 2) Shard is unresolvable through DNS. 3) Table does not exist on the shard.", 0) \
/** Inserts settings. */ \
M(UInt64, bytes_to_throw_insert, 0, "If more than this number of compressed bytes will be pending for background INSERT, an exception will be thrown. 0 - do not throw.", 0) \
M(UInt64, bytes_to_delay_insert, 0, "If more than this number of compressed bytes will be pending for background INSERT, the query will be delayed. 0 - do not delay.", 0) \
Expand Down
1 change: 1 addition & 0 deletions src/Storages/StorageDistributed.cpp
Expand Up @@ -921,6 +921,7 @@ void StorageDistributed::read(
sharding_key_expr,
sharding_key_column_name,
query_info.cluster,
distributed_settings,
additional_shard_filter_generator);

/// This is a bug, it is possible only when there is no shards to query, and this is handled earlier.
Expand Down
@@ -0,0 +1 @@
1234 abcd 1
@@ -0,0 +1,28 @@
-- Tags: shard, no-fasttest

DROP TABLE IF EXISTS table_02916;
DROP TABLE IF EXISTS table_02916_distributed;

CREATE TABLE table_02916
(
`ID` UInt32,
`Name` String
)
ENGINE = MergeTree
ORDER BY ID;

INSERT INTO table_02916 VALUES (1234, 'abcd');

CREATE TABLE table_02916_distributed
(
`ID` UInt32,
`Name` String
)
ENGINE = Distributed(test_unavailable_shard, currentDatabase(), table_02916, rand())
SETTINGS skip_unavailable_shards = 1;

SET send_logs_level='fatal';
SELECT *, _shard_num FROM table_02916_distributed;

DROP TABLE table_02916_distributed;
DROP TABLE table_02916;

0 comments on commit 58396c5

Please sign in to comment.