Skip to content

Commit

Permalink
Make DirectoryMonitor handle cluster node list change (#42826)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongyuankai committed Dec 8, 2023
1 parent bb2b716 commit 7b0f8d4
Show file tree
Hide file tree
Showing 9 changed files with 345 additions and 52 deletions.
9 changes: 9 additions & 0 deletions src/Interpreters/Context.cpp
Expand Up @@ -367,6 +367,7 @@ struct ContextSharedPart : boost::noncopyable
std::shared_ptr<Clusters> clusters TSA_GUARDED_BY(clusters_mutex);
ConfigurationPtr clusters_config TSA_GUARDED_BY(clusters_mutex); /// Stores updated configs
std::unique_ptr<ClusterDiscovery> cluster_discovery TSA_GUARDED_BY(clusters_mutex);
size_t clusters_version TSA_GUARDED_BY(clusters_mutex) = 0;

/// No lock required for async_insert_queue modified only during initialization
std::shared_ptr<AsynchronousInsertQueue> async_insert_queue;
Expand Down Expand Up @@ -3523,6 +3524,14 @@ void Context::setClustersConfig(const ConfigurationPtr & config, bool enable_dis
shared->clusters = std::make_shared<Clusters>(*shared->clusters_config, settings, getMacros(), config_name);
else
shared->clusters->updateClusters(*shared->clusters_config, settings, config_name, old_clusters_config);

++shared->clusters_version;
}

size_t Context::getClustersVersion() const
{
std::lock_guard lock(shared->clusters_mutex);
return shared->clusters_version;
}


Expand Down
1 change: 1 addition & 0 deletions src/Interpreters/Context.h
Expand Up @@ -1023,6 +1023,7 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
std::shared_ptr<Cluster> getCluster(const std::string & cluster_name) const;
std::shared_ptr<Cluster> tryGetCluster(const std::string & cluster_name) const;
void setClustersConfig(const ConfigurationPtr & config, bool enable_discovery = false, const String & config_name = "remote_servers");
size_t getClustersVersion() const;

void startClusterDiscovery();

Expand Down
55 changes: 6 additions & 49 deletions src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp
Expand Up @@ -7,7 +7,6 @@
#include <Formats/NativeReader.h>
#include <Processors/ISource.h>
#include <Interpreters/Context.h>
#include <Interpreters/Cluster.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ConnectionTimeouts.h>
Expand Down Expand Up @@ -55,7 +54,7 @@ namespace
{

template <typename PoolFactory>
ConnectionPoolPtrs createPoolsForAddresses(const std::string & name, PoolFactory && factory, const Cluster::ShardsInfo & shards_info, Poco::Logger * log)
ConnectionPoolPtrs createPoolsForAddresses(const Cluster::Addresses & addresses, PoolFactory && factory, Poco::Logger * log)
{
ConnectionPoolPtrs pools;

Expand All @@ -76,30 +75,8 @@ ConnectionPoolPtrs createPoolsForAddresses(const std::string & name, PoolFactory
}
};

for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it)
{
const std::string & dirname = boost::copy_range<std::string>(*it);
Cluster::Address address = Cluster::Address::fromFullString(dirname);
if (address.shard_index && dirname.ends_with("_all_replicas"))
{
if (address.shard_index > shards_info.size())
{
LOG_ERROR(log, "No shard with shard_index={} ({})", address.shard_index, name);
continue;
}

const auto & shard_info = shards_info[address.shard_index - 1];
size_t replicas = shard_info.per_replica_pools.size();

for (size_t replica_index = 1; replica_index <= replicas; ++replica_index)
{
address.replica_index = static_cast<UInt32>(replica_index);
make_connection(address);
}
}
else
make_connection(address);
}
for (const auto & address : addresses)
make_connection(address);

return pools;
}
Expand Down Expand Up @@ -254,34 +231,14 @@ void DistributedAsyncInsertDirectoryQueue::run()
}


ConnectionPoolPtr DistributedAsyncInsertDirectoryQueue::createPool(const std::string & name, const StorageDistributed & storage)
ConnectionPoolPtr DistributedAsyncInsertDirectoryQueue::createPool(const Cluster::Addresses & addresses, const StorageDistributed & storage)
{
const auto pool_factory = [&storage, &name] (const Cluster::Address & address) -> ConnectionPoolPtr
const auto pool_factory = [&storage] (const Cluster::Address & address) -> ConnectionPoolPtr
{
const auto & cluster = storage.getCluster();
const auto & shards_info = cluster->getShardsInfo();
const auto & shards_addresses = cluster->getShardsAddresses();

/// Check new format shard{shard_index}_replica{replica_index}
/// (shard_index and replica_index starts from 1).
if (address.shard_index != 0)
{
if (!address.replica_index)
throw Exception(ErrorCodes::INCORRECT_FILE_NAME,
"Wrong replica_index={} ({})", address.replica_index, name);

if (address.shard_index > shards_info.size())
throw Exception(ErrorCodes::INCORRECT_FILE_NAME,
"No shard with shard_index={} ({})", address.shard_index, name);

const auto & shard_info = shards_info[address.shard_index - 1];
if (address.replica_index > shard_info.per_replica_pools.size())
throw Exception(ErrorCodes::INCORRECT_FILE_NAME,
"No shard with replica_index={} ({})", address.replica_index, name);

return shard_info.per_replica_pools[address.replica_index - 1];
}

/// Existing connections pool have a higher priority.
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
{
Expand Down Expand Up @@ -318,7 +275,7 @@ ConnectionPoolPtr DistributedAsyncInsertDirectoryQueue::createPool(const std::st
address.secure);
};

auto pools = createPoolsForAddresses(name, pool_factory, storage.getCluster()->getShardsInfo(), storage.log);
auto pools = createPoolsForAddresses(addresses, pool_factory, storage.log);

const auto settings = storage.getContext()->getSettings();
return pools.size() == 1 ? pools.front() : std::make_shared<ConnectionPoolWithFailover>(pools,
Expand Down
Expand Up @@ -4,6 +4,7 @@
#include <Common/ConcurrentBoundedQueue.h>
#include <Client/ConnectionPool.h>
#include <IO/ReadBufferFromFile.h>
#include <Interpreters/Cluster.h>
#include <Disks/IDisk.h>
#include <atomic>
#include <mutex>
Expand Down Expand Up @@ -56,7 +57,7 @@ class DistributedAsyncInsertDirectoryQueue

~DistributedAsyncInsertDirectoryQueue();

static ConnectionPoolPtr createPool(const std::string & name, const StorageDistributed & storage);
static ConnectionPoolPtr createPool(const Cluster::Addresses & addresses, const StorageDistributed & storage);

void updatePath(const std::string & new_relative_path);

Expand Down
55 changes: 53 additions & 2 deletions src/Storages/StorageDistributed.cpp
Expand Up @@ -1379,9 +1379,13 @@ DistributedAsyncInsertDirectoryQueue & StorageDistributed::getDirectoryQueue(con

std::lock_guard lock(cluster_nodes_mutex);
auto & node_data = cluster_nodes_data[key];
if (!node_data.directory_queue)
/// If the node changes, you need to recreate the DistributedAsyncInsertDirectoryQueue
if (!node_data.directory_queue
|| (node_data.clusters_version < getContext()->getClustersVersion() && node_data.addresses != parseAddresses(name)))
{
node_data.connection_pool = DistributedAsyncInsertDirectoryQueue::createPool(name, *this);
node_data.addresses = parseAddresses(name);
node_data.clusters_version = getContext()->getClustersVersion();
node_data.connection_pool = DistributedAsyncInsertDirectoryQueue::createPool(node_data.addresses, *this);
node_data.directory_queue = std::make_unique<DistributedAsyncInsertDirectoryQueue>(
*this, disk, relative_data_path + name,
node_data.connection_pool,
Expand All @@ -1401,6 +1405,53 @@ std::vector<DistributedAsyncInsertDirectoryQueue::Status> StorageDistributed::ge
return statuses;
}

Cluster::Addresses StorageDistributed::parseAddresses(const std::string & name) const
{
Cluster::Addresses addresses;

const auto & cluster = getCluster();
const auto & shards_info = cluster->getShardsInfo();
const auto & shards_addresses = cluster->getShardsAddresses();

for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it)
{
const std::string & dirname = boost::copy_range<std::string>(*it);
Cluster::Address address = Cluster::Address::fromFullString(dirname);

/// Check new format shard{shard_index}_replica{replica_index}
/// (shard_index and replica_index starts from 1).
if (address.shard_index)
{
if (address.shard_index > shards_info.size())
{
LOG_ERROR(log, "No shard with shard_index={} ({})", address.shard_index, name);
continue;
}

const auto & replicas_addresses = shards_addresses[address.shard_index - 1];
size_t replicas = replicas_addresses.size();

if (dirname.ends_with("_all_replicas"))
{
for (const auto & replica_address : replicas_addresses)
addresses.push_back(replica_address);
continue;
}

if (address.replica_index > replicas)
{
LOG_ERROR(log, "No shard with replica_index={} ({})", address.replica_index, name);
continue;
}

addresses.push_back(replicas_addresses[address.replica_index - 1]);
}
else
addresses.push_back(address);
}
return addresses;
}

std::optional<UInt64> StorageDistributed::totalBytes(const Settings &) const
{
UInt64 total_bytes = 0;
Expand Down
4 changes: 4 additions & 0 deletions src/Storages/StorageDistributed.h
Expand Up @@ -176,6 +176,8 @@ class StorageDistributed final : public IStorage, WithContext
/// Used for the INSERT into Distributed in case of distributed_foreground_insert==1, from DistributedSink.
DistributedAsyncInsertDirectoryQueue & getDirectoryQueue(const DiskPtr & disk, const std::string & name);

/// Parse the address corresponding to the directory name of the directory queue
Cluster::Addresses parseAddresses(const std::string & name) const;

/// Return list of metrics for all created monitors
/// (note that monitors are created lazily, i.e. until at least one INSERT executed)
Expand Down Expand Up @@ -270,6 +272,8 @@ class StorageDistributed final : public IStorage, WithContext
{
std::shared_ptr<DistributedAsyncInsertDirectoryQueue> directory_queue;
ConnectionPoolPtr connection_pool;
Cluster::Addresses addresses;
size_t clusters_version;
};
std::unordered_map<std::string, ClusterNodeData> cluster_nodes_data;
mutable std::mutex cluster_nodes_mutex;
Expand Down
Empty file.
@@ -0,0 +1,38 @@
<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
<test_cluster_with_replication>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
</test_cluster_with_replication>
</remote_servers>
</clickhouse>

0 comments on commit 7b0f8d4

Please sign in to comment.