forked from facebookarchive/treadmill
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Scheduler.cpp
145 lines (125 loc) · 4.29 KB
/
Scheduler.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/*
* Copyright (c) 2014, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#include "treadmill/Scheduler.h"
#include <folly/futures/Future.h>
#include <folly/futures/Promise.h>
#include <folly/Memory.h>
#include "treadmill/Util.h"
DEFINE_bool(wait_for_runner_ready,
false,
"If true, wait for a 'resume' message before sending requests.");
namespace facebook {
namespace windtunnel {
namespace treadmill {
Scheduler::Scheduler(uint32_t rps, uint32_t number_of_workers,
uint32_t logging_threshold)
: logging_threshold_(logging_threshold), rps_(rps),
logged_(number_of_workers, 1), queues_(number_of_workers) {
}
Scheduler::~Scheduler() {
}
folly::Future<folly::Unit> Scheduler::run() {
state_.store(FLAGS_wait_for_runner_ready ? PAUSED : RUNNING,
std::memory_order_relaxed);
thread_ = std::make_unique<std::thread>([this] { this->loop(); });
return promise_.getFuture();
}
void Scheduler::pause() {
RunState expected = RUNNING;
state_.compare_exchange_strong(expected, PAUSED);
}
void Scheduler::resume() {
RunState expected = PAUSED;
state_.compare_exchange_strong(expected, RUNNING);
}
void Scheduler::setPhase(const std::string& phase_name) {
if (FLAGS_wait_for_runner_ready) {
CHECK_EQ(state_, PAUSED);
}
messageAllWorkers(Event(EventType::SET_PHASE, phase_name));
}
void Scheduler::stop() {
state_.store(STOPPING);
}
void Scheduler::join() {
CHECK(state_ == STOPPING);
thread_->join();
}
folly::NotificationQueue<Event>& Scheduler::getWorkerQueue(uint32_t id) {
return queues_[id];
}
void Scheduler::setRps(int32_t rps) {
rps_ = rps;
}
double Scheduler::randomExponentialInterval(double mean) {
static std::mt19937* rng = new std::mt19937();
std::uniform_real_distribution<double> dist(0, 1.0);
/* Cap the lower end so that we don't return infinity */
return - log(std::max(dist(*rng), 1e-9)) * mean;
}
void Scheduler::waitNs(int64_t ns) {
/* We need to have *precise* timing, and it's not achievable with any other
means like 'nanosleep' or EventBase.
"pause" instruction would hint processor that this is a spin-loop, it
will burn as much CPU as possible. The processor will use this hint
to avoid memory order violation, which greatly improves its performance.
http://siyobik.info.gf/main/reference/instruction/PAUSE */
for (auto start = nowNs(); nowNs() - start < ns;) {
asm volatile("pause");
}
}
void Scheduler::messageAllWorkers(Event event) {
for (int i = 0; i < queues_.size(); ++i) {
queues_[i].putMessage(event);
}
}
/**
* Responsible for generating requests events.
* Requests are randomly spaced (intervals are drawn from an
* exponential distribution) to achieve the target throughput rate.
* Events would be put into notification queues, which would be selected in
* round-robin fashion.
*/
void Scheduler::loop() {
do {
messageAllWorkers(Event(EventType::RESET));
next_ = 0;
int64_t interval_ns = 1.0/rps_ * k_ns_per_s;
int64_t a = 0, b = 0, budget = randomExponentialInterval(interval_ns);
while (state_ == RUNNING) {
b = nowNs();
if (a) {
/* Account for time spent sending the message */
budget -= (b - a);
}
waitNs(std::max(budget, 0L));
a = nowNs();
/* Decrease the sleep budget by the exact time slept (could have been
more than the budget value), increase by the next interval */
budget += randomExponentialInterval(interval_ns) - (a - b);
queues_[next_].putMessage(Event(EventType::SEND_REQUEST));
if (queues_[next_].size() > logging_threshold_ * logged_[next_]) {
LOG(INFO) << "Notification queue for worker " << next_
<< " is overloaded by factor of " << logged_[next_];
logged_[next_] *= 2;
}
++next_;
if (next_ == queues_.size()) {
next_ = 0;
}
}
while (state_ == PAUSED) waitNs(1000);
} while (state_ != STOPPING);
messageAllWorkers(Event(EventType::STOP));
promise_.setValue(folly::Unit());
}
} // namespace treadmill
} // namespace windtunnel
} // namespace facebook