Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store clusters from ClusterDiscovery in separate map #48795

Merged
merged 4 commits into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 26 additions & 2 deletions src/Interpreters/ClusterDiscovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,9 @@ bool ClusterDiscovery::updateCluster(ClusterInfo & cluster_info)
LOG_DEBUG(log, "Updating system.clusters record for '{}' with {} nodes", cluster_info.name, cluster_info.nodes_info.size());

auto cluster = makeCluster(cluster_info);
context->setCluster(cluster_info.name, cluster);

std::lock_guard lock(mutex);
cluster_impls[cluster_info.name] = cluster;
return true;
}

Expand Down Expand Up @@ -445,6 +447,21 @@ bool ClusterDiscovery::runMainThread(std::function<void()> up_to_date_callback)
return finished;
}

ClusterPtr ClusterDiscovery::getCluster(const String & cluster_name) const
{
std::lock_guard lock(mutex);
auto it = cluster_impls.find(cluster_name);
if (it == cluster_impls.end())
return nullptr;
return it->second;
}

std::unordered_map<String, ClusterPtr> ClusterDiscovery::getClusters() const
{
std::lock_guard lock(mutex);
return cluster_impls;
}

void ClusterDiscovery::shutdown()
{
LOG_DEBUG(log, "Shutting down");
Expand All @@ -456,7 +473,14 @@ void ClusterDiscovery::shutdown()

ClusterDiscovery::~ClusterDiscovery()
{
ClusterDiscovery::shutdown();
try
{
ClusterDiscovery::shutdown();
}
catch (...)
{
tryLogCurrentException(log, "Error on ClusterDiscovery shutdown");
}
}

bool ClusterDiscovery::NodeInfo::parse(const String & data, NodeInfo & result)
Expand Down
6 changes: 6 additions & 0 deletions src/Interpreters/ClusterDiscovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ class ClusterDiscovery

void start();

ClusterPtr getCluster(const String & cluster_name) const;
std::unordered_map<String, ClusterPtr> getClusters() const;

~ClusterDiscovery();

private:
Expand Down Expand Up @@ -124,6 +127,9 @@ class ClusterDiscovery
/// It prevents accessing to invalid object after ClusterDiscovery is destroyed.
std::shared_ptr<UpdateFlags> clusters_to_update;

mutable std::mutex mutex;
std::unordered_map<String, ClusterPtr> cluster_impls;

ThreadFromGlobalPool main_thread;

Poco::Logger * log;
Expand Down
36 changes: 29 additions & 7 deletions src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,8 @@ struct ContextSharedPart : boost::noncopyable
/// Initialized on demand (on distributed storages initialization) since Settings should be initialized
std::shared_ptr<Clusters> clusters;
ConfigurationPtr clusters_config; /// Stores updated configs
mutable std::mutex clusters_mutex; /// Guards clusters and clusters_config
std::unique_ptr<ClusterDiscovery> cluster_discovery;
mutable std::mutex clusters_mutex; /// Guards clusters, clusters_config and cluster_discovery

std::shared_ptr<AsynchronousInsertQueue> async_insert_queue;
std::map<String, UInt16> server_ports;
Expand Down Expand Up @@ -2966,11 +2966,19 @@ std::shared_ptr<Cluster> Context::getCluster(const std::string & cluster_name) c

std::shared_ptr<Cluster> Context::tryGetCluster(const std::string & cluster_name) const
{
auto res = getClusters()->getCluster(cluster_name);
if (res)
return res;
if (!cluster_name.empty())
std::shared_ptr<Cluster> res = nullptr;

{
std::lock_guard lock(shared->clusters_mutex);
res = getClustersImpl(lock)->getCluster(cluster_name);

if (res == nullptr && shared->cluster_discovery)
res = shared->cluster_discovery->getCluster(cluster_name);
}

if (res == nullptr && !cluster_name.empty())
res = tryGetReplicatedDatabaseCluster(cluster_name);

return res;
}

Expand Down Expand Up @@ -3001,10 +3009,23 @@ void Context::reloadClusterConfig() const
}
}


std::shared_ptr<Clusters> Context::getClusters() const
std::map<String, ClusterPtr> Context::getClusters() const
{
std::lock_guard lock(shared->clusters_mutex);

auto clusters = getClustersImpl(lock)->getContainer();

if (shared->cluster_discovery)
antonio2368 marked this conversation as resolved.
Show resolved Hide resolved
{
const auto & cluster_discovery_map = shared->cluster_discovery->getClusters();
for (const auto & [name, cluster] : cluster_discovery_map)
clusters.emplace(name, cluster);
}
return clusters;
}

std::shared_ptr<Clusters> Context::getClustersImpl(std::lock_guard<std::mutex> & /* lock */) const
{
if (!shared->clusters)
{
const auto & config = shared->clusters_config ? *shared->clusters_config : getConfigRef();
Expand All @@ -3016,6 +3037,7 @@ std::shared_ptr<Clusters> Context::getClusters() const

void Context::startClusterDiscovery()
{
std::lock_guard lock(shared->clusters_mutex);
if (!shared->cluster_discovery)
return;
shared->cluster_discovery->start();
Expand Down
5 changes: 4 additions & 1 deletion src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ class Context: public std::enable_shared_from_this<Context>
void setDDLWorker(std::unique_ptr<DDLWorker> ddl_worker);
DDLWorker & getDDLWorker() const;

std::shared_ptr<Clusters> getClusters() const;
std::map<String, std::shared_ptr<Cluster>> getClusters() const;
std::shared_ptr<Cluster> getCluster(const std::string & cluster_name) const;
std::shared_ptr<Cluster> tryGetCluster(const std::string & cluster_name) const;
void setClustersConfig(const ConfigurationPtr & config, bool enable_discovery = false, const String & config_name = "remote_servers");
Expand Down Expand Up @@ -1159,6 +1159,9 @@ class Context: public std::enable_shared_from_this<Context>

DisksMap getDisksMap(std::lock_guard<std::mutex> & lock) const;

/// Expect lock for shared->clusters_mutex
std::shared_ptr<Clusters> getClustersImpl(std::lock_guard<std::mutex> & lock) const;

/// Throttling
public:
ThrottlerPtr getReplicatedFetchesThrottler() const;
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/System/StorageSystemClusters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ NamesAndTypesList StorageSystemClusters::getNamesAndTypes()

void StorageSystemClusters::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
{
for (const auto & name_and_cluster : context->getClusters()->getContainer())
for (const auto & name_and_cluster : context->getClusters())
writeCluster(res_columns, name_and_cluster);

const auto databases = DatabaseCatalog::instance().getDatabases();
Expand Down
27 changes: 27 additions & 0 deletions tests/integration/test_cluster_discovery/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,33 @@ def test_cluster_discovery_startup_and_stop(start_cluster):
[nodes["node0"], nodes["node2"], nodes["node_observer"]], total_shards
)

# test ON CLUSTER query
nodes["node0"].query(
"CREATE TABLE tbl ON CLUSTER 'test_auto_cluster' (x UInt64) ENGINE = MergeTree ORDER BY x"
)
nodes["node0"].query("INSERT INTO tbl VALUES (1)")
nodes["node1"].query("INSERT INTO tbl VALUES (2)")

assert (
int(
nodes["node_observer"]
.query(
"SELECT sum(x) FROM clusterAllReplicas(test_auto_cluster, default.tbl)"
)
.strip()
)
== 3
)

# Query SYSTEM DROP DNS CACHE may reload cluster configuration
# check that it does not affect cluster discovery
nodes["node1"].query("SYSTEM DROP DNS CACHE")
nodes["node0"].query("SYSTEM DROP DNS CACHE")

check_shard_num(
[nodes["node0"], nodes["node2"], nodes["node_observer"]], total_shards
)

nodes["node1"].stop_clickhouse(kill=True)
check_nodes_count(
[nodes["node0"], nodes["node2"], nodes["node_observer"]], total_nodes - 1
Expand Down