Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ bool SdkConfig::ParseConfig(const std::string &config_path) {
// Guaranteed to only parse the configuration file once
if (!__sync_bool_compare_and_swap(&parsed_, false, true)) {
LOG_INFO("ParseConfig has parsed .");
if (++instance_num_ > max_instance_) {
return false;
}
return true;
}

Expand Down Expand Up @@ -92,7 +95,7 @@ void SdkConfig::defaultInit() {
load_balance_interval_ = constants::kLoadBalanceInterval;
heart_beat_interval_ = constants::kHeartBeatInterval;
enable_balance_ = constants::kEnableBalance;
isolation_level_=constants::IsolationLevel::kLevelSecond;
isolation_level_ = constants::IsolationLevel::kLevelSecond;

// cache parameter
send_buf_size_ = constants::kSendBufSize;
Expand Down Expand Up @@ -132,6 +135,10 @@ void SdkConfig::defaultInit() {
enable_setaffinity_ = constants::kEnableSetAffinity;
mask_cpu_affinity_ = constants::kMaskCPUAffinity;
extend_field_ = constants::kExtendField;

need_auth_ = constants::kNeedAuth;
max_instance_ = constants::kMaxInstance;
instance_num_ = 1;
}

void SdkConfig::InitThreadParam(const rapidjson::Value &doc) {
Expand Down Expand Up @@ -212,6 +219,14 @@ void SdkConfig::InitCacheParam(const rapidjson::Value &doc) {
} else {
max_stream_id_num_ = constants::kMaxGroupIdNum;
}

// max_cache_num
if (doc.HasMember("max_cache_num") && doc["max_cache_num"].IsInt() && doc["max_cache_num"].GetInt() >= 0) {
const rapidjson::Value &obj = doc["max_cache_num"];
max_cache_num_ = obj.GetInt();
} else {
max_cache_num_ = constants::kMaxCacheNum;
}
}

void SdkConfig::InitZipParam(const rapidjson::Value &doc) {
Expand Down Expand Up @@ -431,9 +446,10 @@ void SdkConfig::InitAuthParm(const rapidjson::Value &doc) {
} else {
need_auth_ = constants::kNeedAuth;
LOG_INFO("need_auth is not expect, then use default:%s" << need_auth_
? "true"
: "false");
? "true"
: "false");
}

}
void SdkConfig::OthersParam(const rapidjson::Value &doc) {
// ser_ip
Expand Down Expand Up @@ -475,12 +491,20 @@ void SdkConfig::OthersParam(const rapidjson::Value &doc) {
} else {
extend_field_ = constants::kExtendField;
}

// instance num
if (doc.HasMember("max_instance") && doc["max_instance"].IsInt() && doc["max_instance"].GetInt() > 0) {
const rapidjson::Value &obj = doc["max_instance"];
max_instance_ = obj.GetInt();
} else {
max_instance_ = constants::kMaxInstance;
}
}

bool SdkConfig::GetLocalIPV4Address(std::string& err_info, std::string& localhost) {
bool SdkConfig::GetLocalIPV4Address(std::string &err_info, std::string &localhost) {
int32_t sockfd;
int32_t ip_num = 0;
char buf[1024] = {0};
char buf[1024] = {0};
struct ifreq *ifreq;
struct ifreq if_flag;
struct ifconf ifconf;
Expand All @@ -493,7 +517,7 @@ bool SdkConfig::GetLocalIPV4Address(std::string& err_info, std::string& localhos
}

ioctl(sockfd, SIOCGIFCONF, &ifconf);
ifreq = (struct ifreq *)buf;
ifreq = (struct ifreq *) buf;
ip_num = ifconf.ifc_len / sizeof(struct ifreq);
for (int32_t i = 0; i < ip_num; i++, ifreq++) {
if (ifreq->ifr_flags != AF_INET) {
Expand All @@ -511,11 +535,11 @@ bool SdkConfig::GetLocalIPV4Address(std::string& err_info, std::string& localhos
continue;
}

if (!strncmp(inet_ntoa(((struct sockaddr_in*)&(ifreq->ifr_addr))->sin_addr),
if (!strncmp(inet_ntoa(((struct sockaddr_in *) &(ifreq->ifr_addr))->sin_addr),
"127.0.0.1", 7)) {
continue;
}
localhost = inet_ntoa(((struct sockaddr_in*)&(ifreq->ifr_addr))->sin_addr);
localhost = inet_ntoa(((struct sockaddr_in *) &(ifreq->ifr_addr))->sin_addr);
close(sockfd);
err_info = "Ok";
return true;
Expand Down Expand Up @@ -545,8 +569,8 @@ void SdkConfig::ShowClientConfig() {
LOG_INFO("manager_cluster_url: " << manager_cluster_url_.c_str());
LOG_INFO(
"enable_manager_url_from_cluster: " << enable_manager_url_from_cluster_
? "true"
: "false");
? "true"
: "false");
LOG_INFO("manager_update_interval: minutes" << manager_update_interval_);
LOG_INFO("manager_url_timeout: " << manager_url_timeout_);
LOG_INFO("max_tcp_num: " << max_proxy_num_);
Expand All @@ -566,6 +590,8 @@ void SdkConfig::ShowClientConfig() {
LOG_INFO("max_group_id_num: " << max_group_id_num_);
LOG_INFO("max_stream_id_num: " << max_stream_id_num_);
LOG_INFO("isolation_level: " << isolation_level_);
LOG_INFO("max_instance: " << max_instance_);
LOG_INFO("max_cache_num: " << max_cache_num_);
}

} // namespace inlong
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ class SdkConfig {
uint32_t send_buf_size_; // Send buf size, bid granularity
uint32_t max_group_id_num_; // Send buf size, bid granularity
uint32_t max_stream_id_num_; // Send buf size, bid granularity
uint32_t max_cache_num_;
uint32_t max_instance_;
uint32_t instance_num_;

// thread parameters
uint32_t per_groupid_thread_nums_; // Sending thread per groupid
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include <queue>

#include "../config/sdk_conf.h"
#include "../utils/send_buffer.h"

#ifndef INLONG_BUFFER_MANAGER_H
#define INLONG_BUFFER_MANAGER_H

namespace inlong {
class BufferManager {
private:
std::queue<SendBufferPtrT> buffer_queue_;
mutable std::mutex mutex_;
uint32_t queue_limit_;
BufferManager() {
uint32_t data_capacity_ = std::max(SdkConfig::getInstance()->max_msg_size_,
SdkConfig::getInstance()->pack_size_);
uint32_t buffer_num =
(SdkConfig::getInstance()->recv_buf_size_ / data_capacity_) *
SdkConfig::getInstance()->instance_num_;
queue_limit_ =
std::min(SdkConfig::getInstance()->max_cache_num_, buffer_num);
LOG_INFO("Data capacity:"
<< data_capacity_ << ", buffer num: " << buffer_num
<< ", instance num: " << SdkConfig::getInstance()->instance_num_
<< ", limit: " << queue_limit_ << " ,max cache num: "
<< SdkConfig::getInstance()->max_cache_num_);
for ( uint32_t index = 0; index < queue_limit_; index++) {
std::shared_ptr<SendBuffer> send_buffer =
std::make_shared<SendBuffer>(data_capacity_);
if (send_buffer == nullptr) {
LOG_INFO("Buffer manager is null");
continue;
}
AddSendBuffer(send_buffer);
}
}

public:
static BufferManager *GetInstance() {
static BufferManager instance;
return &instance;
}
SendBufferPtrT GetSendBuffer() {
std::lock_guard<std::mutex> lck(mutex_);
if (buffer_queue_.empty()) {
return nullptr;
}
SendBufferPtrT buf = buffer_queue_.front();
buffer_queue_.pop();
return buf;
}
void AddSendBuffer(const SendBufferPtrT &send_buffer) {
if (nullptr == send_buffer) {
return;
}
send_buffer->releaseBuf();
std::lock_guard<std::mutex> lck(mutex_);
if (buffer_queue_.size() > queue_limit_) {
return;
}
buffer_queue_.emplace(send_buffer);
}
};
} // namespace inlong
#endif // INLONG_BUFFER_MANAGER_H
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ static const int32_t kSendBufSize = 10240000;
static const int32_t kRecvBufSize = 10240000;
static const uint32_t kMaxGroupIdNum = 50;
static const uint32_t kMaxStreamIdNum = 100;
static const uint32_t kMaxCacheNum = 10;
static const uint32_t kMaxInstance = 30;

static const int32_t kDispatchIntervalZip = 8;
static const int32_t kDispatchIntervalSend = 10;
Expand Down