Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TUBEMQ-285]Replace C/C++ pthread's mutex to std::mutex #210

Merged
merged 1 commit into from
Jul 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <algorithm>
#include <list>
#include <map>
#include <mutex>
#include <string>
#include <vector>

Expand All @@ -34,6 +35,7 @@
namespace tubemq {

using std::map;
using std::mutex;
using std::string;
using std::vector;

Expand Down Expand Up @@ -94,12 +96,12 @@ class FlowCtrlRuleHandler {
const string& flowctrl_info);
bool GetCurDataLimit(int64_t last_datadlt, FlowCtrlResult& flowctrl_result) const;
int32_t GetCurFreqLimitTime(int32_t msg_zero_cnt, int32_t received_limit) const;
void GetFilterCtrlItem(FlowCtrlItem& result) const;
void GetFlowCtrlInfo(string& flowctrl_info) const;
int32_t GetMinZeroCnt() const { return this->min_zero_cnt_.Get(); }
int32_t GetQryPriorityId() const { return this->qrypriority_id_.Get(); }
void SetQryPriorityId(int32_t qrypriority_id) { this->qrypriority_id_.Set(qrypriority_id); }
int64_t GetFlowCtrlId() const { return this->flowctrl_id_.Get(); }
const FlowCtrlItem& GetFilterCtrlItem() const { return this->filter_ctrl_item_; }
const string& GetFlowCtrlInfo() const { return this->flowctrl_info_; }

private:
void initialStatisData();
Expand All @@ -124,17 +126,17 @@ class FlowCtrlRuleHandler {
int32_t& value);

private:
mutable mutex config_lock_;
string flowctrl_info_;
FlowCtrlItem filter_ctrl_item_;
map<int32_t, vector<FlowCtrlItem> > flowctrl_rules_;
int64_t last_update_time_;
AtomicLong flowctrl_id_;
AtomicInteger qrypriority_id_;
string flowctrl_info_;
AtomicInteger min_zero_cnt_;
AtomicLong min_datadlt_limt_;
AtomicInteger datalimit_start_time_;
AtomicInteger datalimit_end_time_;
FlowCtrlItem filter_ctrl_item_;
map<int32_t, vector<FlowCtrlItem> > flowctrl_rules_;
pthread_rwlock_t configrw_lock_;
int64_t last_update_time_;
};

} // namespace tubemq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
#ifndef TUBEMQ_CLIENT_RMT_DATA_CACHE_H_
#define TUBEMQ_CLIENT_RMT_DATA_CACHE_H_

#include <pthread.h>
#include <stdint.h>

#include <condition_variable>
#include <list>
#include <map>
#include <mutex>
#include <set>
#include <string>
#include <tuple>
Expand All @@ -41,9 +42,11 @@

namespace tubemq {

using std::condition_variable;
using std::map;
using std::set;
using std::list;
using std::mutex;
using std::string;
using std::tuple;

Expand Down Expand Up @@ -110,7 +113,7 @@ class RmtDataCacheCsm {

private:
// timer executor
ExecutorPtr executor_;
ExecutorPool executor_;
//
string consumer_id_;
string group_name_;
Expand All @@ -120,7 +123,7 @@ class RmtDataCacheCsm {
AtomicBoolean under_groupctrl_;
AtomicLong last_checktime_;
// meta info
pthread_rwlock_t meta_rw_lock_;
mutable mutex meta_lock_;
// partiton allocated map
map<string, PartitionExt> partitions_;
// topic partiton map
Expand All @@ -129,25 +132,23 @@ class RmtDataCacheCsm {
map<NodeInfo, set<string> > broker_partition_;
map<string, SubscribeInfo> part_subinfo_;
// for idle partitions occupy
pthread_mutex_t part_mutex_;
// for partiton idle map
list<string> index_partitions_;
// for partition used map
map<string, int64_t> partition_useds_;
// for partiton timer map
map<string, tuple<int64_t, SteadyTimerPtr> > partition_timeouts_;
// data book
pthread_mutex_t data_book_mutex_;
mutable mutex data_book_mutex_;
// for partition offset cache
map<string, int64_t> partition_offset_;
// for partiton register booked
map<string, bool> part_reg_booked_;

// event
pthread_mutex_t event_read_mutex_;
pthread_cond_t event_read_cond_;
mutable mutex event_read_mutex_;
condition_variable event_read_cond_;
list<ConsumerEvent> rebalance_events_;
pthread_mutex_t event_write_mutex_;
mutable mutex event_write_mutex_;
list<ConsumerEvent> rebalance_results_;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ class ConsumerConfig : public BaseConfig {
const map<string, set<string> >& subscribed_topic_and_filter_map,
const string& session_key, uint32_t source_count, bool is_select_big,
const map<string, int64_t>& part_offset_map);
bool IsBoundConsume() { return is_bound_consume_; }
const string& GetSessionKey() const { return session_key_; }
const uint32_t GetSourceCount() const { return source_count_; }
bool IsSelectBig() { return is_select_big_; }
const map<string, int64_t>& GetPartOffsetInfo() const { return part_offset_map_; }
const string& GetGroupName() const;
const map<string, set<string> >& GetSubTopicAndFilterMap() const;
void SetConsumePosition(ConsumePosition consume_from_where);
Expand Down
71 changes: 45 additions & 26 deletions tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
namespace tubemq {

using std::stringstream;
using std::lock_guard;

FlowCtrlResult::FlowCtrlResult() {
this->datasize_limit_ = tb_config::kMaxIntValue;
Expand Down Expand Up @@ -171,10 +172,11 @@ FlowCtrlRuleHandler::FlowCtrlRuleHandler() {
this->datalimit_start_time_.Set(2500);
this->datalimit_end_time_.Set(tb_config::kInvalidValue);
this->last_update_time_ = Utils::GetCurrentTimeMillis();
pthread_rwlock_init(&configrw_lock_, NULL);
}

FlowCtrlRuleHandler::~FlowCtrlRuleHandler() { pthread_rwlock_destroy(&configrw_lock_); }
FlowCtrlRuleHandler::~FlowCtrlRuleHandler() {
//
}

void FlowCtrlRuleHandler::UpdateDefFlowCtrlInfo(bool is_default, int32_t qrypriority_id,
int64_t flowctrl_id, const string& flowctrl_info) {
Expand All @@ -186,7 +188,7 @@ void FlowCtrlRuleHandler::UpdateDefFlowCtrlInfo(bool is_default, int32_t qryprio
if (flowctrl_info.length() > 0) {
parseFlowCtrlInfo(flowctrl_info, tmp_flowctrl_map);
}
pthread_rwlock_wrlock(&this->configrw_lock_);
lock_guard<mutex> lck(config_lock_);
this->flowctrl_id_.Set(flowctrl_id);
this->qrypriority_id_.Set(qrypriority_id);
clearStatisData();
Expand All @@ -199,7 +201,6 @@ void FlowCtrlRuleHandler::UpdateDefFlowCtrlInfo(bool is_default, int32_t qryprio
initialStatisData();
}
this->last_update_time_ = Utils::GetCurrentTimeMillis();
pthread_rwlock_unlock(&this->configrw_lock_);
if (is_default) {
LOG_INFO("[Flow Ctrl] Default FlowCtrl's flowctrl_id from %ld to %ld\n", curr_flowctrl_id,
flowctrl_id);
Expand Down Expand Up @@ -268,51 +269,69 @@ void FlowCtrlRuleHandler::clearStatisData() {
bool FlowCtrlRuleHandler::GetCurDataLimit(int64_t last_datadlt,
FlowCtrlResult& flowctrl_result) const {
struct tm utc_tm;
bool result = false;
vector<FlowCtrlItem>::const_iterator it_vec;
map<int, vector<FlowCtrlItem> >::const_iterator it_map;
// get current data limit
time_t cur_time = time(NULL);

gmtime_r(&cur_time, &utc_tm);
int curr_time = (utc_tm.tm_hour + 8) % 24 * 100 + utc_tm.tm_min;
if ((last_datadlt < this->min_datadlt_limt_.Get()) ||
(curr_time < this->datalimit_start_time_.Get()) ||
(curr_time > this->datalimit_end_time_.Get())) {
if ((last_datadlt < this->min_datadlt_limt_.Get())
|| (curr_time < this->datalimit_start_time_.Get())
|| (curr_time > this->datalimit_end_time_.Get())) {
return false;
}
// search total flowctrl rule
lock_guard<mutex> lck(config_lock_);
it_map = this->flowctrl_rules_.find(0);
if (it_map == this->flowctrl_rules_.end()) {
return false;
}
for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); ++it_vec) {
if (it_vec->GetDataLimit(last_datadlt, curr_time, flowctrl_result)) {
return true;
if (it_map != this->flowctrl_rules_.end()) {
for (it_vec = it_map->second.begin();it_vec != it_map->second.end(); ++it_vec) {
if (it_vec->GetDataLimit(last_datadlt, curr_time, flowctrl_result)) {
result = true;
break;
}
}
}
return false;
return result;
}

int32_t FlowCtrlRuleHandler::GetCurFreqLimitTime(int32_t msg_zero_cnt,
int32_t received_limit) const {
int32_t rule_val = -2;
int32_t limit_data = received_limit;
vector<FlowCtrlItem>::const_iterator it_vec;
map<int, vector<FlowCtrlItem> >::const_iterator it_map;

// check min zero count
if (msg_zero_cnt < this->min_zero_cnt_.Get()) {
return received_limit;
return limit_data;
}
// search rule allow value
lock_guard<mutex> lck(config_lock_);
it_map = this->flowctrl_rules_.find(1);
if (it_map == this->flowctrl_rules_.end()) {
return received_limit;
}
for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); ++it_vec) {
rule_val = it_vec->GetFreLimit(msg_zero_cnt);
if (rule_val >= 0) {
return rule_val;
if (it_map != this->flowctrl_rules_.end()) {
for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); ++it_vec) {
limit_data = it_vec->GetFreLimit(msg_zero_cnt);
if (limit_data >= 0) {
break;
}
}
}
return received_limit;
return limit_data;
}

void FlowCtrlRuleHandler::GetFilterCtrlItem(FlowCtrlItem& result) const {
result.Clear();
lock_guard<mutex> lck(config_lock_);
result = this->filter_ctrl_item_;
}

void FlowCtrlRuleHandler::GetFlowCtrlInfo(string& flowctrl_info) const {
flowctrl_info.clear();
lock_guard<mutex> lck(config_lock_);
flowctrl_info = this->flowctrl_info_;
}



bool FlowCtrlRuleHandler::compareDataLimitQueue(const FlowCtrlItem& o1, const FlowCtrlItem& o2) {
if (o1.GetStartTime() >= o2.GetStartTime()) {
return true;
Expand Down
4 changes: 2 additions & 2 deletions tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,9 @@ void PartitionExt::updateStrategyData(const FlowCtrlRuleHandler& def_flowctrl_ha
this->cur_flowctrl_.SetDataDltAndFreqLimit(tb_config::kMaxLongValue, 0);
}
}
this->cur_freqctrl_ = group_flowctrl_handler.GetFilterCtrlItem();
group_flowctrl_handler.GetFilterCtrlItem(this->cur_freqctrl_);
if (this->cur_freqctrl_.GetFreqMsLimit() < 0) {
this->cur_freqctrl_ = def_flowctrl_handler.GetFilterCtrlItem();
def_flowctrl_handler.GetFilterCtrlItem(this->cur_freqctrl_);
}
curr_time = Utils::GetCurrentTimeMillis();
}
Expand Down
Loading