Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support hotspot key detection in Pegasus #495

Closed
Smityz opened this issue Mar 11, 2020 · 8 comments
Closed

Support hotspot key detection in Pegasus #495

Smityz opened this issue Mar 11, 2020 · 8 comments
Assignees
Labels
type/enhancement Indicates new feature requests

Comments

@Smityz
Copy link
Contributor

Smityz commented Mar 11, 2020

Support hotspot key detection in Pegasus

Related PR #502

1 Background

At present, Pegasus lists the monitoring values for hotspot detection primitively, using manual methods for fault detection and troubleshooting, which is cumbersome for operation and maintenance. To make the data more visually presented, we have added a hotspot detection module #444 and #479, which scores relevant values of hotspot issues and feeds them back to falcon, reducing the burden of operation and maintenance. And in this Issue #495, I want to discuss the function of hotspot key detection.

This function is to find the possible hotspot key in the partition which hotspot detection algorithm recorded, and feed it back to the Ops, for timely troubleshooting.

At the same time, this work is also one of the solutions for hot data reading and writing problems.

2 Design

2.1 Timing diagram

hotspot key detection

2.2 How to start capturing data

According to #479, we can get a "hotspot_value" of each partition. We can set a threshold and if a partition's hotspot_value exceeded the threshold 3 times, the collector will send an RPC to the replica server.

2.2.1 First step

hotspot_calculator(in the collector) will check every partition's hotspot_value given by #479, if it is higher than THRESHOLD_OF_HOTSPOT_PARTITION_VALUE we set (normally it would be 4), it will be recorded in global_read_count/global_write_count. This function will be started every time hotspot_value updated, which is carried out once a minute.

2.2.2 Second step

hotspot_calculator(in the collector) will check global_read_count/global_write_count status. When global_read_count/global_write_count is higher than the THRESHOLD_OF_SEND_RPC_TO_DETECT_HOTKEY, which means these partitions are abnormal for threshold minutes, we will send an RPC to detect hotkey.

2.3 How to capture data traffic

There are two points we need to pay attention to Thread-safety and low resource occupation
To reduce the load, we introduce a two-level model to filter data.

2.3.1 performance pre-test

According to the performance test, #495 (comment). I found that shunt data flow before try_lock can significantly improve efficiency. Use random sampling can further reduce latency.

2.3.2 RPC

Ref #495 (comment)

2.3.3 hotkey_collector

hotkey_collector is used to capture and analyze data for one partition, it is running on the replica.hotkey_collector has four statuses:

  • stop: hotkey_collector is stopped, all its data has been cleared, we can reuse it when needed.
  • coarse_level: hotkey_collector is running, it will collect the hash value of data flow, without its specific content. If it finds the hot hash bucket successfully, its status will turn to fine_level. Otherwise, it will keep running.
  • fine_level: hotkey_collector is running, it will sampling survey the specific content of data flow in the hottest hash bucket, including hash_key and sort_key. If it finds the hotkey successfully, its status will turn to finish. Otherwise, it will keep running.
  • finish: hotkey_collector is waiting for hotkey_manager to store the result, after recording the result and send it back with the RPC, its status will turn to stop.
void capture_data(data)
{
    if (collector_status == coarse)
        coarse_capture_data(data);
    if (collector_status == fine)
        fine_capture_data(data);
}

void coarse_capture_data(data)
{
    hash_table[hash(data)]++;
}

void fine_capture_data(data)
{
    queue_number = rand(0-10); // Random range can be adjusted according to the actual situation
    if (mutex[queue_number].try_lock()){
        queue[queue_number].push(data); // When analyse data, we should merge these queue
    }
}

void analyse_data()
{
    if (collector_status == coarse)
        if (analyse_coarse_data_successful)
            collector_status = fine;
    if (collector_status == fine)
        if (analyse_fine_data_successful)
            collector_status = finish;
    if (collector_status == finish)
        if (send_RPC_back_successful)
            clear_data;
            collector_status = stop;
    if (time_out)
        clear_data;
        collector_status = stop;
}

capture_data() and analyse_data() have different frequencies, separate two functions contribute to improving efficiency.
capture_data() will be executed in https://github.com/XiaoMi/pegasus/blob/b7492caceae817cfdb50a1a71d42de9d16c4a234/src/server/pegasus_server_impl.cpp#L573 and https://github.com/XiaoMi/pegasus/blob/b7492caceae817cfdb50a1a71d42de9d16c4a234/src/server/pegasus_server_impl.cpp#L562

analyse_data() is a timing task.

2.4 How to stop capturing data traffic

It can be terminated by timed out or by manual(send a RPC)

2.5 How to notify the result of hotkey detection

By logging in derror(). We can search the specific error logs on the "hot" server or the collector.

@Smityz Smityz added the type/enhancement Indicates new feature requests label Mar 11, 2020
@neverchanje
Copy link
Contributor

neverchanje commented Mar 16, 2020

the collector would send an RPC to the replica server.

We can name this RPC to hotkey_detect_rpc. You should define the structure of the request as well as response, and include them in your proposal. Because the API structures are highly related to your design.

struct hotkey_detect_request {
  optional gpid partition;
}

struct hotkey_detect_response {
  optional int err;
  optional blob hashkey;
}

It can be terminated by timed out or by manual(send a RPC)

We can name it stop_hotkey_detect_rpc

struct stop_hotkey_detect_request {
  optional gpid partition;
}

struct stop_hotkey_detect_response {
  optional int err;
}

@neverchanje
Copy link
Contributor

neverchanje commented Mar 16, 2020

   for (partition : suspected_read_partition){
       if (_read_points[partition]>THRESHOLD_1){
           global_read_count[partition]++;
           if (global_read_count[partition]>THRESHOLD_2){
               suspected_read_partition.push_back(partition);
          }
       }
   }

What is THRESHOLD_1 and THRESHOLD_2?

if (suspected_partition in read_watch_list){

What is read_watch_list?

I recommend that you can describe the data structures first, the pseudocodes like functions or classes should not be our first concern. Of course, it will be fine to include them, you can give a highly-descriptive function name without implementation, rather than give something like "excpetion_read_check". I don't really know what it means.

@foreverneverer
Copy link
Contributor

foreverneverer commented Mar 17, 2020

  • When you find suspected hot spot partition, you will re-hash the key of the partiton. But It should be care: if the partition is indeed hot spot and only one repeat key, re-hash would't achieve goals that you want decrease the calc time. So you may still consider the proportional sampling before re-hash
  • You need consider the thread security and the efficiency when different thread calc one key count as we discussed before

@Smityz
Copy link
Contributor Author

Smityz commented Mar 19, 2020

Update

2.2.1 How to start capturing data

benchmark
For the reason to further explore the implementation of the scheme, I did a benchmark to test the different ways to capture and analyse data. The two main ideas are optimizing try_lock and using atomic. And here is the result of the benchmark:

Environment:
GCC/OS: g++ (Ubuntu 7.5.0-3ubuntu1~18.04) 7.5.0
CPU: Intel(R) Core(TM) i5-8250U CPU @ 1.60GHz
RAM: 8192 MB

benchmark

  test1 test2 test3 avg qps
lock_free(normal data) 7933.03ms 7958.41ms 8013.13ms 7968.19ms 50,200,803.21
lock_free(hotspot data) 7889.16ms 7902.19ms 7908.76ms 7900.04ms 50,632,911.39
atomic(normal data) 17957.5ms 17575.1ms 18112.1ms 17881.57ms 22,370,113.52
atomic(hotspot data) 20764.7ms 20985.3ms 20280ms 20676.67ms 19,346,101.76
test_try_lock(normal data) 65484.2ms/71549112 71451.2ms/62879825 56493ms/88043207 64476.13ms/74157381 1,150,154.80
test_try_lock(hotspot data) 65778.3ms/72520846 73553.3ms/64252630 58553.3ms/81606050 65961.63ms/72793175 1,103,579.01
test_try_lock_random(normal data) 29997.5ms/42620480 35545ms/38234607 24183.2ms/24459466 29908.56ms/35104851 1,173,739.25
test_try_lock_random(hotspot data) 29400.7ms/41056169 29335.1ms/40157626 26746.5ms/33302722 28494.10ms/38172172 1,339,651.78
test_try_lock_split(normal data) 65504.8ms/209902105 65157.2ms/220578017 64537.7ms/220200261 65066.56ms/216893461 3,333,409.06
test_try_lock_split(hotspot data) 75059.9ms/212746057 71759.6ms/208519921 74664.2ms/211368797 73827.90ms/210878258 2,856,349.13
test_try_lock_split_random(normal data) 32150.1ms/109737265 32817.8ms/105833686 32475ms/107312941 32480.96ms/107627964 3,313,570.90
test_try_lock_split_random((hotspot data) 31861ms/69960417 27350.5ms/60935536 27007.9ms/72499596 28739.8ms/67798516 2,359,046.20

Test code: https://github.com/Smityz/multithreading

Result analysis
According to the results we can know, we should use atomic as much as possible, which has far more advance to lock. So, we can use atomic in coarse_level_capture.
But it is difficult to use atomic in fine_level_capture, since there is no atomic_map we can use. So I suggest using try_lock_split in fine_level_capture, and if we use randomize to limit data flow, it can also accelerate operation.
By the way rand() in <stdlib.h> is deprecated for it's performance.

@Smityz Smityz closed this as completed Mar 19, 2020
@Smityz Smityz reopened this Mar 20, 2020
@levy5307
Copy link
Contributor

levy5307 commented Mar 23, 2020

Maybe you can use atomic<T> in fine_level_capture, and define a struct, which has a map member

@acelyc111
Copy link
Member

When either coarse_level or fine_level run a period of time but no hot sub-partition or hot key detected, you should turn off hotkey_collector, it's costly to run it all the time.

@Smityz
Copy link
Contributor Author

Smityz commented Mar 24, 2020

the collector would send an RPC to the replica server.

We can name this RPC to hotkey_detect_rpc. You should define the structure of the request as well as response, and include them in your proposal. Because the API structures are highly related to your design.

struct hotkey_detect_request {
  optional gpid partition;
}

struct hotkey_detect_response {
  optional int err;
  optional blob hashkey;
}

It can be terminated by timed out or by manual(send a RPC)

We can name it stop_hotkey_detect_rpc

struct stop_hotkey_detect_request {
  optional gpid partition;
}

struct stop_hotkey_detect_response {
  optional int err;
}

@Smityz Smityz closed this as completed Mar 24, 2020
Features Developement automation moved this from To do to Done Mar 24, 2020
@Smityz Smityz reopened this Mar 24, 2020
Features Developement automation moved this from Done to In progress Mar 24, 2020
@neverchanje neverchanje pinned this issue Apr 2, 2020
@Smityz
Copy link
Contributor Author

Smityz commented Apr 24, 2020

A new idea of capture hotkey string(capture_fine_data)

In the old design, we use try_lock and random selection to reduce the probability of lockup. It's a good idea but still has a lock. In the new design, we will have an exciting lock-free method to capture the huge data flow.

Thread structure

+------------------------------+      +------------------------------+
|   THREAD_POOL_LOCAL_APP      |      |   THREAD_POOL_LOCAL_APP      |
|         thread_1             |      |         thread_2             |
|                              |      |                              |
|    +----------------+        |      |    +----------------+        |
|    |                |        |      |    |                |        |
|    |    on_get()    |        |      |    |    on_get()    |        |
|    |                |        |      |    |                |        |
|    +-------+--------+        |      |    +-------+--------+        |
|            |                 |      |            |                 |
|            | push            |      |            | push            |
|            |(parallel)       |      |            |(parallel)       |
|            v                 |      |            v                 |
|       +----+----+            |      |       +----+----+            |
|       |         |            |      |       |         |            |
|       |  data   |            |      |       |  data   |            |
|       +---------+            |      |       +---------+            |
|       |         |            |      |       |         |            |
|       |  data   |            |      |       |  data   |            |
|       +---------+ read-write |      |       +---------+ read-write |
|       |         |  queue[1]  |      |       |         |  queue[2]  |
|       |  data   |            |      |       |  data   |            |
|       +---------+            |      |       +---------+            |
|       |         |            |      |       |         |            |
|       |         |            |      |       |         |            |
|       |         |            |      |       |         |            |
|       |         |            |      |       |         |            |
|       +----+----+            |      |       +----+----+            |
|            |                 |      |            |                 |
|            | pop             |      |            | pop             |
|            |(order by order) |      |            |(order by order) |
|            |                 |      |            |                 |
|            |                 |      |            |                 |
+------------------------------+      +------------------------------+
             |                                     |
             |                                     |
+---------------------------------------------------------------------+
|            |          THREAD_POOL_DEFAULT        |                  |
|            |                                     |                  |
|            v                                     v                  |
|          +-+-------------------------------------+---+              |
|          | key1| key2| key3|                         |              |
|          +-------------------------------------------+              |
|          |count|count|count|                         |              |
|          +-------------------------------------------+              |
|                         count_hash_map                              |
|                                                                     |
+---------------------------------------------------------------------+


How to allocate queue to the thread

By CAS we can handle this easily

int find_queue(int threadID)
{
    int t_size = size.load(memory_order_seq_cst);
    for (int i=0;i<t_size;i++){
       if (v[i]==threadID) return i;
    }
    while (!size.compare_exchange_weak(t_size, t_size + 1));
        v[t_size] = threadID;
    return t_size;
}

How to ensure the thread-safety of queues

We can use the producer-consumer queue to ensure thread safety meanwhile keeping high performance.

https://github.com/cameron314/readerwriterqueue is recommended.

img

img

We can also use boost::lockfree:queue or folly::ProducerConsumerQueue to instead.

@hycdong hycdong moved this from In progress to Done in Features Developement Jul 26, 2021
@hycdong hycdong unpinned this issue Jul 26, 2021
@Smityz Smityz closed this as completed Aug 27, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement Indicates new feature requests
Projects
No open projects
Development

No branches or pull requests

5 participants