Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 15 additions & 51 deletions cloud/src/resource-manager/resource_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <gen_cpp/cloud.pb.h>

#include <mutex>
#include <regex>
#include <sstream>

Expand Down Expand Up @@ -111,9 +112,10 @@ int ResourceManager::init() {
key0.push_back('\x00'); // Update to next smallest key for iteration
} while (it->more());

std::unique_lock l(mtx_);
for (auto& [inst_id, inst] : instances) {
for (auto& c : inst.clusters()) {
add_cluster_to_index(inst_id, c);
add_cluster_to_index_no_lock(inst_id, c);
}
}

Expand Down Expand Up @@ -444,7 +446,7 @@ std::pair<MetaServiceCode, std::string> ResourceManager::add_cluster(const std::
return std::make_pair(cast_as<ErrCategory::COMMIT>(err), msg);
}

add_cluster_to_index(instance_id, cluster.cluster);
refresh_instance(instance_id, instance);
Copy link
Contributor

@gavinchou gavinchou Jul 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there (the original impl. also) may be a bug that before L499 is called there could be another MS update the instancePB in KV, however the possibility is very low and seems will never happen:

  1. call to L449 is very fast, in microsecs
  2. the remote MS update will trigger a refresh_signal to all other instance, which make all MS eventually consistent

CC @deardeng


return std::make_pair(MetaServiceCode::OK, "");
}
Expand Down Expand Up @@ -575,7 +577,7 @@ std::pair<MetaServiceCode, std::string> ResourceManager::drop_cluster(
return std::make_pair(cast_as<ErrCategory::COMMIT>(err), msg);
}

remove_cluster_from_index(instance_id, to_del);
refresh_instance(instance_id, new_instance);

return std::make_pair(MetaServiceCode::OK, "");
}
Expand Down Expand Up @@ -710,18 +712,11 @@ std::string ResourceManager::update_cluster(
LOG(INFO) << "update cluster instance_id=" << instance_id
<< " instance json=" << proto_to_json(instance);

update_cluster_to_index(instance_id, original, now);
refresh_instance(instance_id, instance);

return msg;
}

void ResourceManager::update_cluster_to_index(const std::string& instance_id,
const ClusterPB& original, const ClusterPB& now) {
std::lock_guard l(mtx_);
remove_cluster_from_index_no_lock(instance_id, original);
add_cluster_to_index_no_lock(instance_id, now);
}

void ResourceManager::add_cluster_to_index_no_lock(const std::string& instance_id,
const ClusterPB& c) {
auto type = c.has_type() ? c.type() : -1;
Expand All @@ -745,39 +740,6 @@ void ResourceManager::add_cluster_to_index_no_lock(const std::string& instance_i
}
}

void ResourceManager::add_cluster_to_index(const std::string& instance_id, const ClusterPB& c) {
std::lock_guard l(mtx_);
add_cluster_to_index_no_lock(instance_id, c);
}

void ResourceManager::remove_cluster_from_index_no_lock(const std::string& instance_id,
const ClusterPB& c) {
std::string cluster_name = c.cluster_name();
std::string cluster_id = c.cluster_id();
int cnt = 0;
for (auto it = node_info_.begin(); it != node_info_.end();) {
auto& [_, n] = *it;
if (n.instance_id != instance_id || n.cluster_id != cluster_id ||
n.cluster_name != cluster_name) {
++it;
continue;
}
++cnt;
LOG(INFO) << "remove node from index, instance_id=" << instance_id
<< " role=" << static_cast<int>(n.role) << " cluster_name=" << n.cluster_name
<< " cluster_id=" << n.cluster_id << " node_info=" << proto_to_json(n.node_info);
it = node_info_.erase(it);
}
LOG(INFO) << cnt << " nodes removed from index, cluster_id=" << cluster_id
<< " cluster_name=" << cluster_name << " instance_id=" << instance_id;
}

void ResourceManager::remove_cluster_from_index(const std::string& instance_id,
const ClusterPB& c) {
std::lock_guard l(mtx_);
remove_cluster_from_index_no_lock(instance_id, c);
}

std::pair<TxnErrorCode, std::string> ResourceManager::get_instance(std::shared_ptr<Transaction> txn,
const std::string& instance_id,
InstanceInfoPB* inst_pb) {
Expand Down Expand Up @@ -1209,9 +1171,7 @@ std::string ResourceManager::modify_nodes(const std::string& instance_id,
return msg;
}

for (auto& it : change_from_to_clusters) {
update_cluster_to_index(instance_id, it.first, it.second);
}
refresh_instance(instance_id, instance);

return "";
}
Expand Down Expand Up @@ -1243,9 +1203,15 @@ std::pair<MetaServiceCode, std::string> ResourceManager::refresh_instance(
msg = m0;
return ret0;
}
std::vector<ClusterInfo> clusters;
clusters.reserve(instance.clusters_size());

refresh_instance(instance_id, instance);
LOG(INFO) << "finish refreshing instance, instance_id=" << instance_id << " seq=" << seq;

return ret0;
}

void ResourceManager::refresh_instance(const std::string& instance_id,
const InstanceInfoPB& instance) {
std::lock_guard l(mtx_);
for (auto i = node_info_.begin(); i != node_info_.end();) {
if (i->second.instance_id != instance_id) {
Expand All @@ -1257,8 +1223,6 @@ std::pair<MetaServiceCode, std::string> ResourceManager::refresh_instance(
for (int i = 0; i < instance.clusters_size(); ++i) {
add_cluster_to_index_no_lock(instance_id, instance.clusters(i));
}
LOG(INFO) << "finish refreshing instance, instance_id=" << instance_id << " seq=" << seq;
return ret0;
}

} // namespace doris::cloud
20 changes: 9 additions & 11 deletions cloud/src/resource-manager/resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,20 +163,18 @@ class ResourceManager {
virtual std::pair<MetaServiceCode, std::string> refresh_instance(
const std::string& instance_id);

private:
void add_cluster_to_index(const std::string& instance_id, const ClusterPB& cluster);

void remove_cluster_from_index(const std::string& instance_id, const ClusterPB& cluster);

void update_cluster_to_index(const std::string& instance_id, const ClusterPB& original,
const ClusterPB& now);

void remove_cluster_from_index_no_lock(const std::string& instance_id,
const ClusterPB& cluster);
/**
* Refreshes the cache of given instance from provided InstanceInfoPB. This process
* removes the instance in cache and then replaces it with provided instance state.
*
* @param instance_id instance to manipulate
* @param instance the instance info to refresh from
*/
virtual void refresh_instance(const std::string& instance_id, const InstanceInfoPB& instance);

private:
void add_cluster_to_index_no_lock(const std::string& instance_id, const ClusterPB& cluster);

private:
std::shared_mutex mtx_;
// cloud_unique_id -> NodeInfo
std::multimap<std::string, NodeInfo> node_info_;
Expand Down
Loading