Skip to content

Commit

Permalink
Adding support to plug custom throttler in WDT.
Browse files Browse the repository at this point in the history
Summary:
Noticed that default throttler in WDT lib isn't working as expected in some use cases. We have a scenario where we download several snapshots across different clients at once and default throttler keeps on reseting limits on each new sender/receiver thread.

Current approach requires custom throttler to inherit from Throttler class but better approach should be to expose a pure abstract class. I've introduced one but not using at all of the places. Thoughts?

Reviewed By: ami17

Differential Revision: D13844254

fbshipit-source-id: 5a3bf2c96b6a33a25524f939e06e040875c5f3cc
  • Loading branch information
Atul Goyal authored and facebook-github-bot committed Feb 1, 2019
1 parent 61b9ccb commit 072af3b
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 46 deletions.
29 changes: 10 additions & 19 deletions Throttler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,7 @@ const double kTimeMultiplier = 0.25;

std::shared_ptr<Throttler> Throttler::makeThrottler(
const ThrottlerOptions& options) {
double avgRatePerSec = options.avg_rate_per_sec;
double peakRatePerSec = options.max_rate_per_sec;
double bucketLimit = options.throttler_bucket_limit;
int64_t singleRequestLimit = options.single_request_limit;
Throttler* throttler =
new Throttler(avgRatePerSec, peakRatePerSec, bucketLimit,
singleRequestLimit, options.throttler_log_time_millis);
return std::shared_ptr<Throttler>(throttler);
return std::make_shared<Throttler>(options);
}

void Throttler::configureOptions(double& avgRatePerSec, double& peakRatePerSec,
Expand All @@ -46,22 +39,20 @@ void Throttler::configureOptions(double& avgRatePerSec, double& peakRatePerSec,
}
}

Throttler::Throttler(double avgRatePerSec, double peakRatePerSec,
double bucketLimit, int64_t singleRequestLimit,
int64_t throttlerLogTimeMillis)
: avgRatePerSec_(avgRatePerSec) {
bucketRatePerSec_ = peakRatePerSec;
Throttler::Throttler(const ThrottlerOptions& options)
: avgRatePerSec_(options.avg_rate_per_sec) {
bucketRatePerSec_ = options.max_rate_per_sec;
tokenBucketLimit_ = kTimeMultiplier * kBucketMultiplier * bucketRatePerSec_;
/* We keep the number of tokens generated as zero initially
* It could be argued that we keep this filled when we created the
* bucket. However the startTime is passed in this case and the hope is
* that we will have enough number of tokens by the time we send the data
*/
tokenBucket_ = 0;
if (bucketLimit > 0) {
tokenBucketLimit_ = bucketLimit;
if (options.throttler_bucket_limit > 0) {
tokenBucketLimit_ = options.throttler_bucket_limit;
}
if (avgRatePerSec > 0) {
if (avgRatePerSec_ > 0) {
WLOG(INFO) << "Average rate " << avgRatePerSec_;
} else {
WLOG(INFO) << "No average rate specified";
Expand All @@ -72,9 +63,9 @@ Throttler::Throttler(double avgRatePerSec, double peakRatePerSec,
} else {
WLOG(INFO) << "No peak rate specified";
}
WDT_CHECK_GT(singleRequestLimit, 0);
singleRequestLimit_ = singleRequestLimit;
throttlerLogTimeMillis_ = throttlerLogTimeMillis;
WDT_CHECK_GT(options.single_request_limit, 0);
singleRequestLimit_ = options.single_request_limit;
throttlerLogTimeMillis_ = options.throttler_log_time_millis;
}

void Throttler::setThrottlerRates(double& avgRatePerSec,
Expand Down
21 changes: 2 additions & 19 deletions Throttler.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class Throttler {
static std::shared_ptr<Throttler> makeThrottler(
const ThrottlerOptions& options);

explicit Throttler(const ThrottlerOptions& options);

/**
* Calls calculateSleep which is a thread safe method. Finds out the
* time thread has to sleep and makes it sleep.
Expand Down Expand Up @@ -134,25 +136,6 @@ class Throttler {
const Throttler& throttler);

private:
/**
* @param averageRatePerSec Average rate at which tokens should be
* consumed
* @param peakRatePerSec Max burst rate allowed by the
* token bucket
* @param bucketLimit Max size of bucket, specify 0 for auto
* configure. In auto mode, it will be twice
* the data you send in 1/4th of a second
* at the peak rate
* @param singleRequestLimit Internal limit to the maximum number of
* tokens that can be throttled in one call.
* If more tokens are requested to be
* throttled, that requested gets broken down
* and it is treated as multiple throttle
* calls.
*/
Throttler(double avgRatePerSec, double peakRatePerSec, double bucketLimit,
int64_t singleRequestLimit, int64_t throttlerLogTimeMillis = 0);

/**
* Sometimes the options passed to throttler might not make sense so this
* method tries to auto configure them
Expand Down
10 changes: 8 additions & 2 deletions Wdt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ Wdt::Wdt() {
resourceController_ = std::make_unique<WdtResourceController>(options_);
}

Wdt::Wdt(std::shared_ptr<Throttler> throttler) {
WdtFlags::initializeFromFlags(options_);
resourceController_ =
std::make_unique<WdtResourceController>(options_, throttler);
}

std::string Wdt::getSenderIdentifier(const WdtTransferRequest &req) {
if (req.destIdentifier.empty()) {
return req.hostName;
Expand Down Expand Up @@ -223,5 +229,5 @@ void Wdt::releaseWdt(const std::string &appName) {
}
s_wdtMap.erase(it);
}
}
} // namespaces
} // namespace wdt
} // namespace facebook
1 change: 1 addition & 0 deletions Wdt.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class Wdt {

/// Private constructor
explicit Wdt();
explicit Wdt(std::shared_ptr<Throttler> throttler);

/// Not copyable
Wdt(const Wdt &) = delete;
Expand Down
2 changes: 1 addition & 1 deletion WdtBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ bool WdtBase::hasStarted() {
void WdtBase::configureThrottler() {
WDT_CHECK(!throttler_);
WVLOG(1) << "Configuring throttler options";
throttler_ = Throttler::makeThrottler(options_.getThrottlerOptions());
throttler_ = std::make_shared<Throttler>(options_.getThrottlerOptions());
if (throttler_) {
WLOG(INFO) << "Enabling throttling " << *throttler_;
} else {
Expand Down
11 changes: 9 additions & 2 deletions WdtResourceController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,18 @@ WdtNamespaceController::~WdtNamespaceController() {
// release is done by parent shutdown
}

WdtResourceController::WdtResourceController(const WdtOptions &options)
WdtResourceController::WdtResourceController(
const WdtOptions &options, std::shared_ptr<Throttler> throttler)
: WdtControllerBase("_root controller_"), options_(options) {
updateMaxSendersLimit(options.global_sender_limit);
updateMaxReceiversLimit(options.global_receiver_limit);
throttler_ = Throttler::makeThrottler(options.getThrottlerOptions());
throttler_ = throttler;
}

WdtResourceController::WdtResourceController(const WdtOptions &options)
: WdtResourceController(
options,
std::make_shared<Throttler>(options.getThrottlerOptions())) {
}

WdtResourceController::WdtResourceController()
Expand Down
2 changes: 2 additions & 0 deletions WdtResourceController.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ class WdtResourceController : public WdtControllerBase {
public:
/// resource controller should take the option as reference so that it can be
/// changed later from the parent object
WdtResourceController(const WdtOptions &options,
std::shared_ptr<Throttler> throttler);
explicit WdtResourceController(const WdtOptions &options);
WdtResourceController();

Expand Down
4 changes: 2 additions & 2 deletions test/ThrottlerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ TEST(ThrottlerTest, RATE_CHANGE) {
WdtOptions options;
options.avg_mbytes_per_sec = 50;
std::shared_ptr<Throttler> throttler =
Throttler::makeThrottler(options.getThrottlerOptions());
std::make_shared<Throttler>(options.getThrottlerOptions());
throttler->startTransfer();

testThrottling(throttler, 50);
Expand All @@ -47,7 +47,7 @@ TEST(ThrottlerTest, FAIRNESS) {
options.avg_mbytes_per_sec = 60;
options.buffer_size = 256 * 1024;
std::shared_ptr<Throttler> throttler =
Throttler::makeThrottler(options.getThrottlerOptions());
std::make_shared<Throttler>(options.getThrottlerOptions());

const int numThread = 40;
const int testDurationSec = 5;
Expand Down
2 changes: 1 addition & 1 deletion test/WdtMiscTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ TEST(BasicTest, ThrottlerWithoutReporting) {
WdtOptions options;
options.avg_mbytes_per_sec = 1;
shared_ptr<Throttler> throttler =
Throttler::makeThrottler(options.getThrottlerOptions());
std::make_shared<Throttler>(options.getThrottlerOptions());
const int toWrite = 2 * kMbToB;
const int blockSize = 1024;
int written = 0;
Expand Down

0 comments on commit 072af3b

Please sign in to comment.