Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize cluster node list change but DirectoryMonitor can't sense it #42826

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/Interpreters/Context.cpp
Expand Up @@ -353,6 +353,7 @@ struct ContextSharedPart : boost::noncopyable
std::shared_ptr<Clusters> clusters TSA_GUARDED_BY(clusters_mutex);
ConfigurationPtr clusters_config TSA_GUARDED_BY(clusters_mutex); /// Stores updated configs
std::unique_ptr<ClusterDiscovery> cluster_discovery TSA_GUARDED_BY(clusters_mutex);
size_t clusters_version TSA_GUARDED_BY(clusters_mutex) = 0;

/// No lock required for async_insert_queue modified only during initialization
std::shared_ptr<AsynchronousInsertQueue> async_insert_queue;
Expand Down Expand Up @@ -3430,6 +3431,14 @@ void Context::setClustersConfig(const ConfigurationPtr & config, bool enable_dis
shared->clusters = std::make_shared<Clusters>(*shared->clusters_config, settings, getMacros(), config_name);
else
shared->clusters->updateClusters(*shared->clusters_config, settings, config_name, old_clusters_config);

++shared->clusters_version;
}

size_t Context::getClustersVersion() const
{
std::lock_guard lock(shared->clusters_mutex);
return shared->clusters_version;
}


Expand Down
1 change: 1 addition & 0 deletions src/Interpreters/Context.h
Expand Up @@ -1020,6 +1020,7 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
std::shared_ptr<Cluster> getCluster(const std::string & cluster_name) const;
std::shared_ptr<Cluster> tryGetCluster(const std::string & cluster_name) const;
void setClustersConfig(const ConfigurationPtr & config, bool enable_discovery = false, const String & config_name = "remote_servers");
size_t getClustersVersion() const;

void startClusterDiscovery();

Expand Down
41 changes: 9 additions & 32 deletions src/Storages/Distributed/DistributedAsyncInsertDirectoryQueue.cpp
Expand Up @@ -7,7 +7,6 @@
#include <Formats/NativeReader.h>
#include <Processors/ISource.h>
#include <Interpreters/Context.h>
#include <Interpreters/Cluster.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ConnectionTimeouts.h>
Expand Down Expand Up @@ -55,7 +54,7 @@ namespace
{

template <typename PoolFactory>
ConnectionPoolPtrs createPoolsForAddresses(const std::string & name, PoolFactory && factory, const Cluster::ShardsInfo & shards_info, Poco::Logger * log)
ConnectionPoolPtrs createPoolsForAddresses(const Cluster::Addresses & addresses, PoolFactory && factory, Poco::Logger * log)
{
ConnectionPoolPtrs pools;

Expand All @@ -76,30 +75,8 @@ ConnectionPoolPtrs createPoolsForAddresses(const std::string & name, PoolFactory
}
};

for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it)
{
const std::string & dirname = boost::copy_range<std::string>(*it);
Cluster::Address address = Cluster::Address::fromFullString(dirname);
if (address.shard_index && dirname.ends_with("_all_replicas"))
{
if (address.shard_index > shards_info.size())
{
LOG_ERROR(log, "No shard with shard_index={} ({})", address.shard_index, name);
continue;
}

const auto & shard_info = shards_info[address.shard_index - 1];
size_t replicas = shard_info.per_replica_pools.size();

for (size_t replica_index = 1; replica_index <= replicas; ++replica_index)
{
address.replica_index = static_cast<UInt32>(replica_index);
make_connection(address);
}
}
else
make_connection(address);
}
for (const auto & address : addresses)
make_connection(address);

return pools;
}
Expand Down Expand Up @@ -254,9 +231,9 @@ void DistributedAsyncInsertDirectoryQueue::run()
}


ConnectionPoolPtr DistributedAsyncInsertDirectoryQueue::createPool(const std::string & name, const StorageDistributed & storage)
ConnectionPoolPtr DistributedAsyncInsertDirectoryQueue::createPool(const Cluster::Addresses & addresses, const StorageDistributed & storage)
{
const auto pool_factory = [&storage, &name] (const Cluster::Address & address) -> ConnectionPoolPtr
const auto pool_factory = [&storage] (const Cluster::Address & address) -> ConnectionPoolPtr
{
const auto & cluster = storage.getCluster();
const auto & shards_info = cluster->getShardsInfo();
Expand All @@ -268,16 +245,16 @@ ConnectionPoolPtr DistributedAsyncInsertDirectoryQueue::createPool(const std::st
{
if (!address.replica_index)
throw Exception(ErrorCodes::INCORRECT_FILE_NAME,
"Wrong replica_index={} ({})", address.replica_index, name);
"Wrong replica_index={}", address.replica_index);

if (address.shard_index > shards_info.size())
throw Exception(ErrorCodes::INCORRECT_FILE_NAME,
"No shard with shard_index={} ({})", address.shard_index, name);
"No shard with shard_index={}", address.shard_index);

const auto & shard_info = shards_info[address.shard_index - 1];
if (address.replica_index > shard_info.per_replica_pools.size())
throw Exception(ErrorCodes::INCORRECT_FILE_NAME,
"No shard with replica_index={} ({})", address.replica_index, name);
"No shard with replica_index={}", address.replica_index);

return shard_info.per_replica_pools[address.replica_index - 1];
}
Expand Down Expand Up @@ -318,7 +295,7 @@ ConnectionPoolPtr DistributedAsyncInsertDirectoryQueue::createPool(const std::st
address.secure);
};

auto pools = createPoolsForAddresses(name, pool_factory, storage.getCluster()->getShardsInfo(), storage.log);
auto pools = createPoolsForAddresses(addresses, pool_factory, storage.log);

const auto settings = storage.getContext()->getSettings();
return pools.size() == 1 ? pools.front() : std::make_shared<ConnectionPoolWithFailover>(pools,
Expand Down
Expand Up @@ -4,6 +4,7 @@
#include <Common/ConcurrentBoundedQueue.h>
#include <Client/ConnectionPool.h>
#include <IO/ReadBufferFromFile.h>
#include <Interpreters/Cluster.h>
#include <Disks/IDisk.h>
#include <atomic>
#include <mutex>
Expand Down Expand Up @@ -56,7 +57,7 @@ class DistributedAsyncInsertDirectoryQueue

~DistributedAsyncInsertDirectoryQueue();

static ConnectionPoolPtr createPool(const std::string & name, const StorageDistributed & storage);
static ConnectionPoolPtr createPool(const Cluster::Addresses & addresses, const StorageDistributed & storage);

void updatePath(const std::string & new_relative_path);

Expand Down
63 changes: 61 additions & 2 deletions src/Storages/StorageDistributed.cpp
Expand Up @@ -1371,9 +1371,13 @@ DistributedAsyncInsertDirectoryQueue & StorageDistributed::getDirectoryQueue(con

std::lock_guard lock(cluster_nodes_mutex);
auto & node_data = cluster_nodes_data[key];
if (!node_data.directory_queue)
/// If the node changes, you need to recreate the DistributedAsyncInsertDirectoryQueue
if (!node_data.directory_queue
|| (node_data.clusters_version < getContext()->getClustersVersion() && node_data.addresses != parseAddresses(name)))
{
node_data.connection_pool = DistributedAsyncInsertDirectoryQueue::createPool(name, *this);
node_data.addresses = parseAddresses(name);
node_data.clusters_version = getContext()->getClustersVersion();
node_data.connection_pool = DistributedAsyncInsertDirectoryQueue::createPool(node_data.addresses, *this);
node_data.directory_queue = std::make_unique<DistributedAsyncInsertDirectoryQueue>(
*this, disk, relative_data_path + name,
node_data.connection_pool,
Expand All @@ -1393,6 +1397,61 @@ std::vector<DistributedAsyncInsertDirectoryQueue::Status> StorageDistributed::ge
return statuses;
}

Cluster::Addresses StorageDistributed::parseAddresses(const std::string & name) const
{
Cluster::Addresses addresses;

const auto & cluster = getCluster();
const auto & shards_info = cluster->getShardsInfo();
const auto & shards_addresses = cluster->getShardsAddresses();

for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it)
{
const std::string & dirname = boost::copy_range<std::string>(*it);
Cluster::Address address = Cluster::Address::fromFullString(dirname);

/// Check new format shard{shard_index}_replica{replica_index}
/// (shard_index and replica_index starts from 1).
if (address.shard_index)
zhongyuankai marked this conversation as resolved.
Show resolved Hide resolved
{
if (address.shard_index > shards_info.size())
{
LOG_ERROR(log, "No shard with shard_index={} ({})", address.shard_index, name);
zhongyuankai marked this conversation as resolved.
Show resolved Hide resolved
continue;
}

const auto & replicas_addresses = shards_addresses[address.shard_index - 1];
size_t replicas = replicas_addresses.size();

if (dirname.ends_with("_all_replicas"))
{
for (size_t replica_index = 1; replica_index <= replicas; ++replica_index)
{
Cluster::Address replica_address = replicas_addresses[replica_index - 1];
replica_address.shard_index = address.shard_index;
replica_address.replica_index = static_cast<UInt32>(replica_index);
addresses.emplace_back(std::move(replica_address));
}
continue;
}

if (address.replica_index > replicas)
{
LOG_ERROR(log, "No shard with replica_index={} ({})", address.replica_index, name);
continue;
}

Cluster::Address replica_address = replicas_addresses[address.replica_index - 1];
replica_address.shard_index = address.shard_index;
replica_address.replica_index = address.replica_index;
addresses.emplace_back(std::move(replica_address));
zhongyuankai marked this conversation as resolved.
Show resolved Hide resolved
}
else
addresses.push_back(address);
}
return addresses;
}

std::optional<UInt64> StorageDistributed::totalBytes(const Settings &) const
{
UInt64 total_bytes = 0;
Expand Down
4 changes: 4 additions & 0 deletions src/Storages/StorageDistributed.h
Expand Up @@ -176,6 +176,8 @@ class StorageDistributed final : public IStorage, WithContext
/// Used for the INSERT into Distributed in case of distributed_foreground_insert==1, from DistributedSink.
DistributedAsyncInsertDirectoryQueue & getDirectoryQueue(const DiskPtr & disk, const std::string & name);

/// Parse the address corresponding to the directory name of the directory queue
Cluster::Addresses parseAddresses(const std::string & name) const;

/// Return list of metrics for all created monitors
/// (note that monitors are created lazily, i.e. until at least one INSERT executed)
Expand Down Expand Up @@ -270,6 +272,8 @@ class StorageDistributed final : public IStorage, WithContext
{
std::shared_ptr<DistributedAsyncInsertDirectoryQueue> directory_queue;
ConnectionPoolPtr connection_pool;
Cluster::Addresses addresses;
size_t clusters_version;
};
std::unordered_map<std::string, ClusterNodeData> cluster_nodes_data;
mutable std::mutex cluster_nodes_mutex;
Expand Down
@@ -0,0 +1,18 @@
<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
zhongyuankai marked this conversation as resolved.
Show resolved Hide resolved
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</clickhouse>
@@ -0,0 +1,123 @@
import pytest
from helpers.cluster import ClickHouseCluster

cluster = ClickHouseCluster(__file__)


node1 = cluster.add_instance(
"node1",
main_configs=["configs/remote_servers.xml"],
)

node2 = cluster.add_instance(
"node2",
main_configs=["configs/remote_servers.xml"],
)

node3 = cluster.add_instance(
"node3",
main_configs=["configs/remote_servers.xml"],
)

config1 = """<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</clickhouse>"""

config2 = """<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</clickhouse>
"""


@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
for _, node in cluster.instances.items():
node.query(
f"""
create table dist_local (c1 Int32, c2 String) engine=MergeTree() order by c1;
create table dist (c1 Int32, c2 String) engine=Distributed(test_cluster, currentDatabase(), dist_local, c1);
"""
)
yield cluster
finally:
cluster.shutdown()


def test_distributed_async_insert(started_cluster):
node1.query("insert into dist select number,'A' from system.numbers limit 10;")
node1.query("system flush distributed dist;")

assert int(node3.query("select count() from dist_local where c2 = 'A'")) == 5
assert int(node1.query("select count() from dist_local where c2 = 'A'")) == 5

# Add node2
node1.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config2)
node1.query("SYSTEM RELOAD CONFIG;")

node2.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config2)
node2.query("SYSTEM RELOAD CONFIG;")

node3.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config2)
node3.query("SYSTEM RELOAD CONFIG;")

node1.query("insert into dist select number,'B' from system.numbers limit 12;")
node1.query("system flush distributed dist;")

assert int(node1.query("select count() from dist_local where c2 = 'B'")) == 4
assert int(node2.query("select count() from dist_local where c2 = 'B'")) == 4
assert int(node3.query("select count() from dist_local where c2 = 'B'")) == 4

# Delete node2
node1.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config1)
node1.query("SYSTEM RELOAD CONFIG;")

node2.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config1)
node2.query("SYSTEM RELOAD CONFIG;")

node3.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config1)
node3.query("SYSTEM RELOAD CONFIG;")

node1.query("insert into dist select number,'C' from system.numbers limit 10;")
node1.query("system flush distributed dist;")

assert int(node1.query("select count() from dist_local where c2 = 'C'")) == 5
assert int(node2.query("select count() from dist_local where c2 = 'C'")) == 0
assert int(node3.query("select count() from dist_local where c2 = 'C'")) == 5