diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/proxy_info.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/proxy_info.h new file mode 100644 index 00000000000..b5fccb52134 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/proxy_info.h @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#ifndef INLONG_SDK_PROXY_INFO_H +#define INLONG_SDK_PROXY_INFO_H +namespace inlong { + +class ProxyInfo { +private: + std::string proxy_str_id_; + std::string ip_; + int32_t port_; + +public: + ProxyInfo(std::string proxy_str_id, std::string ip, int32_t port) + : proxy_str_id_(proxy_str_id), ip_(ip), port_(port) {} + ProxyInfo(){}; + std::string ip() const { return ip_; } + int32_t port() const { return port_; } +}; +using ProxyInfoVec = std::vector; +} // namespace inlong +#endif // INLONG_SDK_PROXY_INFO_H \ No newline at end of file diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc new file mode 100644 index 00000000000..5548a441ba6 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "proxy_manager.h" +#include "../utils/capi_constant.h" +#include "../utils/logger.h" +#include "../utils/utils.h" +#include "api_code.h" +#include +#include + +namespace inlong { +ProxyManager *ProxyManager::instance_ = new ProxyManager(); +ProxyManager::~ProxyManager() { + if (update_conf_thread_.joinable()) { + update_conf_thread_.join(); + } + + exit_flag_ = true; + std::unique_lock con_lck(cond_mutex_); + update_flag_ = true; + con_lck.unlock(); + cond_.notify_one(); +} +void ProxyManager::Init() { + timeout_ = SdkConfig::getInstance()->bus_URL_timeout_; + if (__sync_bool_compare_and_swap(&inited_, false, true)) { + update_conf_thread_ = std::thread(&ProxyManager::Update, this); + } +} + +void ProxyManager::Update() { + while (true) { + std::unique_lock con_lck(cond_mutex_); + if (cond_.wait_for(con_lck, + std::chrono::minutes( + SdkConfig::getInstance()->bus_update_interval_), + [this]() { return update_flag_; })) { + if (exit_flag_) + break; + update_flag_ = false; + con_lck.unlock(); + DoUpdate(); + } else { + DoUpdate(); + } + } + LOG_INFO("proxylist DoUpdate thread exit"); +} +void ProxyManager::DoUpdate() { + update_mutex_.try_lock(); + LOG_INFO("start ProxyManager DoUpdate."); + + if (groupid_2_cluster_map_.empty()) { + LOG_INFO("empty groupid, no need to DoUpdate buslist"); + update_mutex_.unlock(); + return; + } + + { + unique_read_lock rdlck(bid_2_cluster_rwmutex_); + for (auto &groupid2cluster : groupid_2_cluster_map_) { + std::string url; + if (SdkConfig::getInstance().enable_proxy_URL_from_cluster_) + url = SdkConfig::getInstance().proxy_cluster_URL_; + else { + url = SdkConfig::getInstance().proxy_URL_ + "/" + groupid2cluster.first; + } + std::string post_data = "ip=" + SdkConfig::getInstance().ser_ip_ + + "&version=" + constants::kTDBusCAPIVersion + + "&protocolType=" + constants::kProtocolType; + LOG_WARN("get inlong_group_id:%s proxy cfg url:%s, post_data:%s", + groupid2cluster.first.c_str(), url.c_str(), post_data.c_str()); + + std::string meta_data; + int32_t ret; + std::string urlByDNS; + for (int i = 0; i < constants::kMaxRequestTDMTimes; i++) { + ret = Utils::requestUrl(url, urlByDNS, meta_data, timeout_); + if (!ret) { + break; + } + } + ProxyInfoVec proxyInfoVec; + ret = ParseAndGet(groupid2cluster.first, meta_data, proxyInfoVec); + if (ret != SdkCode::kSuccess) { + LOG_ERROR("failed to parse groupid:%s json proxy list " + << groupid2cluster.first.c_str()); + continue; + } + if (!proxyInfoVec.empty()) { + unique_write_lock wtlck(groupid_2_proxy_map_rwmutex_); + groupid_2_proxy_map_[groupid2cluster.first] = proxyInfoVec; + LOG_INFO("groupid:" << groupid2cluster.first << " success update " + << proxyInfoVec.size() << " proxy-ip."); + } + } + } + update_mutex_.unlock(); + LOG_INFO("finish ProxyManager DoUpdate."); +} + +int32_t ProxyManager::ParseAndGet(const std::string &groupid, + const std::string &meta_data, + ProxyInfoVec &proxy_info_vec) { + rapidjson::Document doc; + if (doc.Parse(meta_data.c_str()).HasParseError()) { + LOG_ERROR("failed to parse meta_data, error" << doc.GetParseError() << ":" + << doc.GetErrorOffset()); + return SdkCode::kErrorParseJson; + } + + if (doc.HasMember("size") && doc["size"].IsInt() && !doc["size"].IsNull() && + doc["size"].GetInt() > 0) { + const rapidjson::Value &obj = doc["size"]; + } else { + LOG_ERROR("can't find groupid:%s buslist from meta_data" + << groupid.c_str()); + return SdkCode::kErrorParseJson; + } + + if (doc.HasMember("cluster_id") && doc["cluster_id"].IsInt() && + !doc["cluster_id"].IsNull()) { + const rapidjson::Value &obj = doc["cluster_id"]; + } else { + LOG_ERROR("cluster_id of groupid:%s is not found or not a integer" + << groupid.c_str()); + return SdkCode::kErrorParseJson; + } + + if (doc.HasMember("address") && !doc["address"].IsNull()) // v2版本 + { + const rapidjson::Value &hostlist = doc["address"]; + for (auto &info : hostlist.GetArray()) { + std::string id, ip; + int32_t port; + if (info.HasMember("host") && !info["host"].IsNull()) + ip = info["host"].GetString(); + else { + LOG_ERROR("this host info is null"); + continue; + } + if (info.HasMember("port") && !info["port"].IsNull()) { + if (info["port"].IsString()) + port = std::stoi(info["port"].GetString()); + if (info["port"].IsInt()) + port = info["port"].GetInt(); + } + + else { + LOG_ERROR("this port info is null or negative"); + continue; + } + if (info.HasMember("id") && !info["id"].IsNull()) { + if (info["id"].IsString()) + id = info["id"].GetString(); + if (info["id"].IsInt()) + id = std::to_string(info["id"].GetInt()); + } else { + LOG_ERROR("there is no id info of groupid"); + continue; + } + proxy_info_vec.emplace_back(id, ip, port); + } + } else { + LOG_ERROR("there is no any host info of groupid:%s" << groupid.c_str()); + return SdkCode::kErrorParseJson; + } + return SdkCode::kSuccess; +} + +int32_t ProxyManager::GetBusByBid(const std::string &groupid, + BusInfoVec &proxy_info_vec) { + unique_read_lock rdlck(groupid_2_proxy_map_rwmutex_); + auto it = groupid_2_proxy_map_.find(groupid); + if (it == groupid_2_proxy_map_.end()) { + LOG_ERROR("GetProxyByGroupid failed . Groupid " << groupid); + return SdkCode::kFailGetBusConf; + } + proxy_info_vec = it->second; + return SdkCode::kSuccess; +} + +int32_t ProxyManager::CheckBidConf(const std::string &groupid, bool is_inited) { + { + unique_read_lock rdlck(groupid_2_cluster_rwmutex_); + auto it = groupid_2_cluster_map_.find(groupid); + if (it != groupid_2_cluster_map_.end()) { + return SdkCode::kSuccess; + } + } + + { + unique_write_lock wtlck(groupid_2_cluster_rwmutex_); + groupid_2_cluster_map_.emplace(groupid, -1); + } + + LOG_INFO("CheckProxyConf groupid:" << groupid << ",isInited :" << is_inited); + if (is_inited) { + std::unique_lock con_lck(cond_mutex_); + update_flag_ = true; + con_lck.unlock(); + cond_.notify_one(); + } else { + DoUpdate(); + } + return SdkCode::kSuccess; +} + +bool ProxyManager::IsBusExist(const std::string &groupid) { + unique_read_lock rdlck(groupid_2_proxy_map_rwmutex_); + auto it = groupid_2_proxy_map_.find(groupid); + if (it == groupid_2_proxy_map_.end()) { + return false; + } + return true; +} +} // namespace inlong diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h new file mode 100644 index 00000000000..1c76f984577 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef INLONG_SDK_PROXY_MANAGER_H +#define INLONG_SDK_PROXY_MANAGER_H + +#include "../config//proxy_info.h" +#include "../utils/read_write_mutex.h" +#include +#include +#include + +namespace inlong { +class ProxyManager { +private: + static ProxyManager *instance_; + int32_t timeout_; + read_write_mutex groupid_2_cluster_rwmutex_; + read_write_mutex groupid_2_proxy_map_rwmutex_; + + std::unordered_map groupid_2_cluster_map_cluster_map_; + std::unordered_map groupid_2_proxy_map_; + bool update_flag_; + std::mutex cond_mutex_; + std::mutex update_mutex_; + + std::condition_variable cond_; + bool exit_flag_; + std::thread update_conf_thread_; + volatile bool inited_ = false; + + int32_t ParseAndGet(const std::string &groupid, const std::string &meta_data, + ProxyInfoVec &bus_info_vec); + +public: + ProxyManager(){}; + ~ProxyManager(); + static ProxyManager *GetInstance() { return instance_; } + int32_t CheckBidConf(const std::string &groupid, bool is_inited); + void Update(); + void DoUpdate(); + void Init(); + int32_t GetBusByBid(const std::string &groupid, ProxyInfoVec &proxy_info_vec); + bool IsBusExist(const std::string &groupid); +}; +} // namespace inlong + +#endif // INLONG_SDK_PROXY_MANAGER_H diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc index c5c484a2ee5..a5cbda53d1b 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/send_manager.cc @@ -19,7 +19,7 @@ #include "send_manager.h" #include "../utils/utils.h" -#include "bus_conf_manager.h" +#include "proxy_manager.h" namespace inlong { SendManager::SendManager() : send_group_idx_(0) { for (int32_t i = 0; i < SdkConfig::getInstance()->group_ids_.size(); i++) { diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/uitls/capi_constant.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h similarity index 100% rename from inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/uitls/capi_constant.h rename to inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/uitls/logger.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/logger.h similarity index 100% rename from inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/uitls/logger.h rename to inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/logger.h