Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize cluster node list change but DirectoryMonitor can't sense it #42826

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/Interpreters/Context.cpp
Expand Up @@ -353,6 +353,7 @@ struct ContextSharedPart : boost::noncopyable
std::shared_ptr<Clusters> clusters TSA_GUARDED_BY(clusters_mutex);
ConfigurationPtr clusters_config TSA_GUARDED_BY(clusters_mutex); /// Stores updated configs
std::unique_ptr<ClusterDiscovery> cluster_discovery TSA_GUARDED_BY(clusters_mutex);
size_t clusters_version TSA_GUARDED_BY(clusters_mutex) = 0;

/// No lock required for async_insert_queue modified only during initialization
std::shared_ptr<AsynchronousInsertQueue> async_insert_queue;
Expand Down Expand Up @@ -3430,6 +3431,14 @@ void Context::setClustersConfig(const ConfigurationPtr & config, bool enable_dis
shared->clusters = std::make_shared<Clusters>(*shared->clusters_config, settings, getMacros(), config_name);
else
shared->clusters->updateClusters(*shared->clusters_config, settings, config_name, old_clusters_config);

++shared->clusters_version;
}

size_t Context::getClustersVersion() const
{
std::lock_guard lock(shared->clusters_mutex);
return shared->clusters_version;
}


Expand Down
1 change: 1 addition & 0 deletions src/Interpreters/Context.h
Expand Up @@ -1020,6 +1020,7 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
std::shared_ptr<Cluster> getCluster(const std::string & cluster_name) const;
std::shared_ptr<Cluster> tryGetCluster(const std::string & cluster_name) const;
void setClustersConfig(const ConfigurationPtr & config, bool enable_discovery = false, const String & config_name = "remote_servers");
size_t getClustersVersion() const;

void startClusterDiscovery();

Expand Down
55 changes: 6 additions & 49 deletions src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp
Expand Up @@ -7,7 +7,6 @@
#include <Formats/NativeReader.h>
#include <Processors/ISource.h>
#include <Interpreters/Context.h>
#include <Interpreters/Cluster.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ConnectionTimeouts.h>
Expand Down Expand Up @@ -55,7 +54,7 @@ namespace
{

template <typename PoolFactory>
ConnectionPoolPtrs createPoolsForAddresses(const std::string & name, PoolFactory && factory, const Cluster::ShardsInfo & shards_info, Poco::Logger * log)
ConnectionPoolPtrs createPoolsForAddresses(const Cluster::Addresses & addresses, PoolFactory && factory, Poco::Logger * log)
{
ConnectionPoolPtrs pools;

Expand All @@ -76,30 +75,8 @@ ConnectionPoolPtrs createPoolsForAddresses(const std::string & name, PoolFactory
}
};

for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it)
{
const std::string & dirname = boost::copy_range<std::string>(*it);
Cluster::Address address = Cluster::Address::fromFullString(dirname);
if (address.shard_index && dirname.ends_with("_all_replicas"))
{
if (address.shard_index > shards_info.size())
{
LOG_ERROR(log, "No shard with shard_index={} ({})", address.shard_index, name);
continue;
}

const auto & shard_info = shards_info[address.shard_index - 1];
size_t replicas = shard_info.per_replica_pools.size();

for (size_t replica_index = 1; replica_index <= replicas; ++replica_index)
{
address.replica_index = static_cast<UInt32>(replica_index);
make_connection(address);
}
}
else
make_connection(address);
}
for (const auto & address : addresses)
make_connection(address);

return pools;
}
Expand Down Expand Up @@ -254,34 +231,14 @@ void DistributedAsyncInsertDirectoryQueue::run()
}


ConnectionPoolPtr DistributedAsyncInsertDirectoryQueue::createPool(const std::string & name, const StorageDistributed & storage)
ConnectionPoolPtr DistributedAsyncInsertDirectoryQueue::createPool(const Cluster::Addresses & addresses, const StorageDistributed & storage)
{
const auto pool_factory = [&storage, &name] (const Cluster::Address & address) -> ConnectionPoolPtr
const auto pool_factory = [&storage] (const Cluster::Address & address) -> ConnectionPoolPtr
{
const auto & cluster = storage.getCluster();
const auto & shards_info = cluster->getShardsInfo();
const auto & shards_addresses = cluster->getShardsAddresses();

/// Check new format shard{shard_index}_replica{replica_index}
/// (shard_index and replica_index starts from 1).
if (address.shard_index != 0)
{
if (!address.replica_index)
throw Exception(ErrorCodes::INCORRECT_FILE_NAME,
"Wrong replica_index={} ({})", address.replica_index, name);

if (address.shard_index > shards_info.size())
throw Exception(ErrorCodes::INCORRECT_FILE_NAME,
"No shard with shard_index={} ({})", address.shard_index, name);

const auto & shard_info = shards_info[address.shard_index - 1];
if (address.replica_index > shard_info.per_replica_pools.size())
throw Exception(ErrorCodes::INCORRECT_FILE_NAME,
"No shard with replica_index={} ({})", address.replica_index, name);

return shard_info.per_replica_pools[address.replica_index - 1];
}

/// Existing connections pool have a higher priority.
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
{
Expand Down Expand Up @@ -318,7 +275,7 @@ ConnectionPoolPtr DistributedAsyncInsertDirectoryQueue::createPool(const std::st
address.secure);
};

auto pools = createPoolsForAddresses(name, pool_factory, storage.getCluster()->getShardsInfo(), storage.log);
auto pools = createPoolsForAddresses(addresses, pool_factory, storage.log);

const auto settings = storage.getContext()->getSettings();
return pools.size() == 1 ? pools.front() : std::make_shared<ConnectionPoolWithFailover>(pools,
Expand Down
Expand Up @@ -4,6 +4,7 @@
#include <Common/ConcurrentBoundedQueue.h>
#include <Client/ConnectionPool.h>
#include <IO/ReadBufferFromFile.h>
#include <Interpreters/Cluster.h>
#include <Disks/IDisk.h>
#include <atomic>
#include <mutex>
Expand Down Expand Up @@ -56,7 +57,7 @@ class DistributedAsyncInsertDirectoryQueue

~DistributedAsyncInsertDirectoryQueue();

static ConnectionPoolPtr createPool(const std::string & name, const StorageDistributed & storage);
static ConnectionPoolPtr createPool(const Cluster::Addresses & addresses, const StorageDistributed & storage);

void updatePath(const std::string & new_relative_path);

Expand Down
55 changes: 53 additions & 2 deletions src/Storages/StorageDistributed.cpp
Expand Up @@ -1371,9 +1371,13 @@ DistributedAsyncInsertDirectoryQueue & StorageDistributed::getDirectoryQueue(con

std::lock_guard lock(cluster_nodes_mutex);
auto & node_data = cluster_nodes_data[key];
if (!node_data.directory_queue)
/// If the node changes, you need to recreate the DistributedAsyncInsertDirectoryQueue
if (!node_data.directory_queue
|| (node_data.clusters_version < getContext()->getClustersVersion() && node_data.addresses != parseAddresses(name)))
{
node_data.connection_pool = DistributedAsyncInsertDirectoryQueue::createPool(name, *this);
node_data.addresses = parseAddresses(name);
node_data.clusters_version = getContext()->getClustersVersion();
node_data.connection_pool = DistributedAsyncInsertDirectoryQueue::createPool(node_data.addresses, *this);
node_data.directory_queue = std::make_unique<DistributedAsyncInsertDirectoryQueue>(
*this, disk, relative_data_path + name,
node_data.connection_pool,
Expand All @@ -1393,6 +1397,53 @@ std::vector<DistributedAsyncInsertDirectoryQueue::Status> StorageDistributed::ge
return statuses;
}

Cluster::Addresses StorageDistributed::parseAddresses(const std::string & name) const
{
Cluster::Addresses addresses;

const auto & cluster = getCluster();
const auto & shards_info = cluster->getShardsInfo();
const auto & shards_addresses = cluster->getShardsAddresses();

for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it)
{
const std::string & dirname = boost::copy_range<std::string>(*it);
Cluster::Address address = Cluster::Address::fromFullString(dirname);

/// Check new format shard{shard_index}_replica{replica_index}
/// (shard_index and replica_index starts from 1).
if (address.shard_index)
zhongyuankai marked this conversation as resolved.
Show resolved Hide resolved
{
if (address.shard_index > shards_info.size())
{
LOG_ERROR(log, "No shard with shard_index={} ({})", address.shard_index, name);
zhongyuankai marked this conversation as resolved.
Show resolved Hide resolved
continue;
}

const auto & replicas_addresses = shards_addresses[address.shard_index - 1];
size_t replicas = replicas_addresses.size();

if (dirname.ends_with("_all_replicas"))
{
for (size_t replica_index = 1; replica_index <= replicas; ++replica_index)
addresses.push_back(replicas_addresses[replica_index - 1]);
zhongyuankai marked this conversation as resolved.
Show resolved Hide resolved
continue;
}

if (address.replica_index > replicas)
{
LOG_ERROR(log, "No shard with replica_index={} ({})", address.replica_index, name);
continue;
}

addresses.push_back(replicas_addresses[address.replica_index - 1]);
}
else
addresses.push_back(address);
}
return addresses;
}

std::optional<UInt64> StorageDistributed::totalBytes(const Settings &) const
{
UInt64 total_bytes = 0;
Expand Down
4 changes: 4 additions & 0 deletions src/Storages/StorageDistributed.h
Expand Up @@ -176,6 +176,8 @@ class StorageDistributed final : public IStorage, WithContext
/// Used for the INSERT into Distributed in case of distributed_foreground_insert==1, from DistributedSink.
DistributedAsyncInsertDirectoryQueue & getDirectoryQueue(const DiskPtr & disk, const std::string & name);

/// Parse the address corresponding to the directory name of the directory queue
Cluster::Addresses parseAddresses(const std::string & name) const;

/// Return list of metrics for all created monitors
/// (note that monitors are created lazily, i.e. until at least one INSERT executed)
Expand Down Expand Up @@ -270,6 +272,8 @@ class StorageDistributed final : public IStorage, WithContext
{
std::shared_ptr<DistributedAsyncInsertDirectoryQueue> directory_queue;
ConnectionPoolPtr connection_pool;
Cluster::Addresses addresses;
size_t clusters_version;
};
std::unordered_map<std::string, ClusterNodeData> cluster_nodes_data;
mutable std::mutex cluster_nodes_mutex;
Expand Down
@@ -0,0 +1,34 @@
<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
zhongyuankai marked this conversation as resolved.
Show resolved Hide resolved
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
<test_replica_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
zhongyuankai marked this conversation as resolved.
Show resolved Hide resolved
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
</test_replica_cluster>
</remote_servers>
</clickhouse>