diff --git a/src/test/common/Throttle.cc b/src/test/common/Throttle.cc index bd30471a809a7..a2001214dcbc1 100644 --- a/src/test/common/Throttle.cc +++ b/src/test/common/Throttle.cc @@ -28,6 +28,13 @@ #include "global/global_init.h" #include +#include +#include +#include +#include +#include +#include + class ThrottleTest : public ::testing::Test { protected: @@ -252,6 +259,159 @@ TEST_F(ThrottleTest, destructor) { } } +std::pair > test_backoff( + double low_threshhold, + double high_threshhold, + double expected_throughput, + double high_multiple, + double max_multiple, + uint64_t max, + double put_delay_per_count, + unsigned getters, + unsigned putters) +{ + std::mutex l; + std::condition_variable c; + uint64_t total = 0; + std::list in_queue; + bool stop = false; + + auto wait_time = std::chrono::duration(0); + uint64_t waits = 0; + + uint64_t total_observed_total = 0; + uint64_t total_observations = 0; + + BackoffThrottle throttle(5); + bool valid = throttle.set_params( + low_threshhold, + high_threshhold, + expected_throughput, + high_multiple, + max_multiple, + max, + 0); + assert(valid); + + auto getter = [&]() { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dis(0, 10); + + std::unique_lock g(l); + while (!stop) { + g.unlock(); + + uint64_t to_get = dis(gen); + auto waited = throttle.get(to_get); + + g.lock(); + wait_time += waited; + waits += to_get; + total += to_get; + in_queue.push_back(to_get); + c.notify_one(); + } + }; + + auto putter = [&]() { + std::unique_lock g(l); + while (!stop) { + while (in_queue.empty()) + c.wait(g); + + uint64_t c = in_queue.front(); + + total_observed_total += total; + total_observations++; + in_queue.pop_front(); + assert(total <= max); + + g.unlock(); + std::this_thread::sleep_for( + c * std::chrono::duration(put_delay_per_count*putters)); + g.lock(); + + total -= c; + throttle.put(c); + } + }; + + vector gts(getters); + for (auto &&i: gts) i = std::thread(getter); + + vector pts(putters); + for (auto &&i: pts) i = std::thread(putter); + + std::this_thread::sleep_for(std::chrono::duration(5)); + { + std::unique_lock g(l); + stop = true; + } + for (auto &&i: gts) i.join(); + gts.clear(); + for (auto &&i: pts) i.join(); + pts.clear(); + + return make_pair( + ((double)total_observed_total)/((double)total_observations), + wait_time / waits); +} + +TEST(BackoffThrottle, undersaturated) +{ + auto results = test_backoff( + 0.4, + 0.6, + 1000, + 2, + 10, + 100, + 0.0001, + 3, + 6); + ASSERT_LT(results.first, 45); + ASSERT_GT(results.first, 35); + ASSERT_LT(results.second.count(), 0.0002); + ASSERT_GT(results.second.count(), 0.00005); +} + +TEST(BackoffThrottle, balanced) +{ + auto results = test_backoff( + 0.4, + 0.6, + 1000, + 2, + 10, + 100, + 0.001, + 7, + 2); + ASSERT_LT(results.first, 60); + ASSERT_GT(results.first, 40); + ASSERT_LT(results.second.count(), 0.002); + ASSERT_GT(results.second.count(), 0.0005); +} + +TEST(BackoffThrottle, oversaturated) +{ + auto results = test_backoff( + 0.4, + 0.6, + 10000000, + 2, + 10, + 100, + 0.001, + 1, + 3); + ASSERT_LT(results.first, 101); + ASSERT_GT(results.first, 85); + ASSERT_LT(results.second.count(), 0.002); + ASSERT_GT(results.second.count(), 0.0005); +} + int main(int argc, char **argv) { vector args; argv_to_vec(argc, (const char **)argv, args);