Skip to content

Commit

Permalink
feat(hotspot): new algorithm of hotspot detection (#479)
Browse files Browse the repository at this point in the history
  • Loading branch information
Smityz authored and neverchanje committed Mar 31, 2020
1 parent 8eae8a5 commit 19dae33
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 11 deletions.
24 changes: 21 additions & 3 deletions src/server/info_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ info_collector::info_collector()
"storage_size_fetch_interval_seconds",
3600, // default value 1h
"storage size fetch interval seconds");
_hotspot_detect_algorithm = dsn_config_get_value_string("pegasus.collector",
"hotspot_detect_algorithm",
"hotspot_algo_qps_variance",
"hotspot_detect_algorithm");
// _storage_size_retry_wait_seconds is in range of [1, 60]
_storage_size_retry_wait_seconds =
std::min(60u, std::max(1u, _storage_size_fetch_interval_seconds / 10));
Expand Down Expand Up @@ -148,6 +152,9 @@ void info_collector::on_app_stat()
// hotspot_calculator is to detect hotspots
hotspot_calculator *hotspot_calculator =
get_hotspot_calculator(app_rows.first, app_rows.second.size());
if (!hotspot_calculator) {
continue;
}
hotspot_calculator->aggregate(app_rows.second);
// new policy can be designed by strategy pattern in hotspot_partition_data.h
hotspot_calculator->start_alg();
Expand Down Expand Up @@ -285,9 +292,20 @@ hotspot_calculator *info_collector::get_hotspot_calculator(const std::string &ap
if (iter != _hotspot_calculator_store.end()) {
return iter->second;
}
hotspot_calculator *calculator_address = new hotspot_calculator(app_name, partition_num);
_hotspot_calculator_store[app_name] = calculator_address;
return calculator_address;
std::unique_ptr<hotspot_policy> policy;
if (_hotspot_detect_algorithm == "hotspot_algo_qps_variance") {
policy.reset(new hotspot_algo_qps_variance());
} else if (_hotspot_detect_algorithm == "hotspot_algo_qps_skew") {
policy.reset(new hotspot_algo_qps_skew());
} else {
dwarn("hotspot detection is disabled");
_hotspot_calculator_store[app_name] = nullptr;
return nullptr;
}
hotspot_calculator *calculator =
new hotspot_calculator(app_name, partition_num, std::move(policy));
_hotspot_calculator_store[app_name] = calculator;
return calculator;
}

} // namespace server
Expand Down
1 change: 1 addition & 0 deletions src/server/info_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class info_collector
uint32_t _storage_size_fetch_interval_seconds;
uint32_t _storage_size_retry_wait_seconds;
uint32_t _storage_size_retry_max_count;
std::string _hotspot_detect_algorithm;
::dsn::task_ptr _storage_size_stat_timer_task;
::dsn::utils::ex_lock_nr _capacity_unit_update_info_lock;
// mapping 'node address' --> 'last updated timestamp'
Expand Down
66 changes: 59 additions & 7 deletions src/server/table_hotspot_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#include <algorithm>
#include <gtest/gtest_prod.h>
#include <math.h>

#include <dsn/perf_counter/perf_counter.h>

namespace pegasus {
Expand All @@ -20,24 +22,71 @@ class hotspot_policy
// vector is used to save the partitions' data of this app
// hotspot_partition_data is used to save data of one partition
virtual void analysis(const std::queue<std::vector<hotspot_partition_data>> &hotspot_app_data,
std::vector<::dsn::perf_counter_wrapper> &hot_points) = 0;
std::vector<::dsn::perf_counter_wrapper> &perf_counters) = 0;
};

class hotspot_algo_qps_skew : public hotspot_policy
{
public:
void analysis(const std::queue<std::vector<hotspot_partition_data>> &hotspot_app_data,
std::vector<::dsn::perf_counter_wrapper> &hot_points)
std::vector<::dsn::perf_counter_wrapper> &perf_counters)
{
const auto &anly_data = hotspot_app_data.back();
double min_total_qps = INT_MAX;
for (auto partition_anly_data : anly_data) {
min_total_qps = std::min(min_total_qps, partition_anly_data.total_qps);
}
min_total_qps = std::max(1.0, min_total_qps);
dassert(anly_data.size() == hot_points.size(), "partition counts error, please check");
for (int i = 0; i < hot_points.size(); i++) {
hot_points[i]->set(anly_data[i].total_qps / min_total_qps);
dassert(anly_data.size() == perf_counters.size(), "partition counts error, please check");
for (int i = 0; i < perf_counters.size(); i++) {
perf_counters[i]->set(anly_data[i].total_qps / min_total_qps);
}
}
};

// PauTa Criterion
class hotspot_algo_qps_variance : public hotspot_policy
{
public:
void analysis(const std::queue<std::vector<hotspot_partition_data>> &hotspot_app_data,
std::vector<::dsn::perf_counter_wrapper> &perf_counters)
{
dassert(hotspot_app_data.back().size() == perf_counters.size(),
"partition counts error, please check");
std::vector<double> data_samples;
data_samples.reserve(hotspot_app_data.size() * perf_counters.size());
auto temp_data = hotspot_app_data;
double total = 0, sd = 0, avg = 0;
int sample_count = 0;
// avg: Average number
// sd: Standard deviation
// sample_count: Number of samples
while (!temp_data.empty()) {
for (auto partition_data : temp_data.front()) {
if (partition_data.total_qps - 1.00 > 0) {
data_samples.push_back(partition_data.total_qps);
total += partition_data.total_qps;
sample_count++;
}
}
temp_data.pop();
}
if (sample_count == 0) {
ddebug("hotspot_app_data size == 0");
return;
}
avg = total / sample_count;
for (auto data_sample : data_samples) {
sd += pow((data_sample - avg), 2);
}
sd = sqrt(sd / sample_count);
const auto &anly_data = hotspot_app_data.back();
for (int i = 0; i < perf_counters.size(); i++) {
double hot_point = (anly_data[i].total_qps - avg) / sd;
// perf_counter->set can only be unsigned __int64
// use ceil to guarantee conversion results
hot_point = ceil(std::max(hot_point, double(0)));
perf_counters[i]->set(hot_point);
}
}
};
Expand All @@ -46,8 +95,10 @@ class hotspot_algo_qps_skew : public hotspot_policy
class hotspot_calculator
{
public:
hotspot_calculator(const std::string &app_name, const int partition_num)
: _app_name(app_name), _points(partition_num), _policy(new hotspot_algo_qps_skew())
hotspot_calculator(const std::string &app_name,
const int partition_num,
std::unique_ptr<hotspot_policy> policy)
: _app_name(app_name), _points(partition_num), _policy(std::move(policy))
{
init_perf_counter(partition_num);
}
Expand All @@ -62,6 +113,7 @@ class hotspot_calculator
std::unique_ptr<hotspot_policy> _policy;
static const int kMaxQueueSize = 100;

FRIEND_TEST(table_hotspot_policy, hotspot_algo_qps_variance);
FRIEND_TEST(table_hotspot_policy, hotspot_algo_qps_skew);
};
} // namespace server
Expand Down
26 changes: 25 additions & 1 deletion src/server/test/pegasus_tablehotspot_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ TEST(table_hotspot_policy, hotspot_algo_qps_skew)
std::vector<row_data> test_rows(2);
test_rows[0].get_qps = 1234.0;
test_rows[1].get_qps = 4321.0;
hotspot_calculator test_hotspot_calculator("TEST", 2);
std::unique_ptr<hotspot_policy> policy(new hotspot_algo_qps_skew());
hotspot_calculator test_hotspot_calculator("TEST", 2, std::move(policy));
test_hotspot_calculator.aggregate(test_rows);
test_hotspot_calculator.start_alg();
std::vector<double> result(2);
Expand All @@ -25,5 +26,28 @@ TEST(table_hotspot_policy, hotspot_algo_qps_skew)
ASSERT_EQ(expect_vector, result);
}

TEST(table_hotspot_policy, hotspot_algo_qps_variance)
{
std::vector<row_data> test_rows(8);
test_rows[0].get_qps = 1000.0;
test_rows[1].get_qps = 1000.0;
test_rows[2].get_qps = 1000.0;
test_rows[3].get_qps = 1000.0;
test_rows[4].get_qps = 1000.0;
test_rows[5].get_qps = 1000.0;
test_rows[6].get_qps = 1000.0;
test_rows[7].get_qps = 5000.0;
std::unique_ptr<hotspot_policy> policy(new hotspot_algo_qps_variance());
hotspot_calculator test_hotspot_calculator("TEST", 8, std::move(policy));
test_hotspot_calculator.aggregate(test_rows);
test_hotspot_calculator.start_alg();
std::vector<double> result(8);
for (int i = 0; i < test_hotspot_calculator._points.size(); i++) {
result[i] = test_hotspot_calculator._points[i]->get_value();
}
std::vector<double> expect_vector{0, 0, 0, 0, 0, 0, 0, 3};
ASSERT_EQ(expect_vector, result);
}

} // namespace server
} // namespace pegasus

0 comments on commit 19dae33

Please sign in to comment.