Skip to content

Commit

Permalink
MB-29928: [BP] Implement auto controller logic for the defragmenter
Browse files Browse the repository at this point in the history
With changes in 7.0 to memory tracking, we now have visibility of
an individual bucket's fragmentation, whereas pre 7.0 we only had
visibility of the entire process.

This commit makes use of the bucket fragmentation to calculate
the sleep interval of the defragger, the overall idea being that
as a bucket's defragmentation gets worse, the sleep time reduces.
The defragger is then running more frequently, visiting more items
and bringing the fragmentation down.

The commit introduces two new modes of automatic calculation. The reason
for this is that the second, PID mode, is more experimental. Ultimately
once it's had some soak time, one mode can remain in code.

The two modes are as follows and can be selected in the bucket config
(a future patch makes them runtime switchable via cbepctl).

1) auto - Use a 'static' and predictable calculation for converting
fragmentation into a reduction in sleep time.

2) auto_pid - Use a PID controller to calculate reductions in
fragmentation. This is less predictable as real time is a factor
in the calculation, scheduling delays etc... results in unpredictable
outputs.

The existing mode (just use defragmenter_interval) is named "static".

Both modes of auto controller work by taking the bucket fragmentation
as a percentage and then using the bucket's low-water mark creating
a 'score' which is then used for determining how the sleep interval
maybe calculated. The result is that when fragmentation maybe high,
but rss is actually small (lots of headroom before low-water mark)
the score is low, whilst as we approach the low-water mark the score
increases.

E.g.

fragmentation 23% (allocated:500, rss:650), then with a low-water
mark of n the value used in calculations (score):

    n    | score
    600  | 23   (rss > low-water)
    1000 | 14.95
    2000 | 7.4
    3000 | 4.98
    5000 | 2.99

A spreadsheet with numerous scenarios and the score can be found here:

https://docs.google.com/spreadsheets/d/1W72N2vbrfa5xOVFmS0e3tpFCcEyd8kPk8fqMNmuM1k8/edit#gid=0

auto: This mode takes the score and a range. Below the range
and the maximum sleep is used, above the range and the minimum sleep is
used. When the score is within the range we find how far in the range
the score is, e.g. 20% and map that to be 20% between min and max sleep.

Here the following configuration parameters are being used:

* defragmenter_auto_min_sleep 0.0
* defragmenter_auto_max_sleep 10.0
* defragmenter_auto_lower_threshold 0.07
* defragmenter_auto_upper_threshold 0.25

auto_pid: This mode uses a single configurable threshold and when the
score exceeds that threshold the PID calculates an output. The returned
sleep time is the maximum - output, but capped at the configuration
minimum. The PID itself is configured at runtime and the commit uses
values for P, I, D and dt based on examination of the "pathogen"
performance test and use of the `pid_runner` program which allows for
some examination of P, I and D. The assumption is that fragmentation
doesn't increase quickly, hence the I and dt term forces the PID to
only recalculate every 10 seconds with a 'slow' output.

Here the following configuration parameters are being used:

* defragmenter_auto_min_sleep 0.0
* defragmenter_auto_max_sleep 10.0
* defragmenter_auto_lower_threshold 0.07
* defragmenter_auto_pid_p 0.3
* defragmenter_auto_pid_i 0.0000197
* defragmenter_auto_pid_d 0.0
* defragmenter_auto_pid_dt 30000

These values have been used in the pid_runner test and were chosen based
on the observation that fragmentation in real workloads increases
slowly. The pathogen test is useful for testing defragmentation, but
may not be truly representative of real fragmentation growth, for
example that test achieves fragmentation greater than 35% in a very
short time, but is operating on a small amount of data, mem_used
ranges from ~200MB to ~600MB.

First dt: With the observation that fragmentation generally increases
slowly The dt term controls the rate at which the PID reads the Process
Variable (PV or in our case scored fragmentation) and reacts. Thus 30
seconds will elapse before the PID computes a new output value. If the
PV were changing at faster rates, the dt term would be reduced.

P I D values:

Using pid_runner (in its committed state) a number of scenarios were
compared where the PV is at a fixed percentage above the SP. These
scenarios guided the current values of P I and D.

For example when the PV is 1.1x of SP it would take the PID ~20 hours to
reduce the sleep interval to min (0.0).

When the PV is 2.6x of SP it would take the PID 75 minutes to reduce the
sleep interval to min (0.0).

    PV x | time to min sleep
    1.1  | 20h:8m:31s
    1.2  | 10h:4m:31s
    1.5  | 4h:1m:31s
    1.8  | 2h:31m:1s
    2.0  | 2h:1m:1s
    2.3  | 1h:33m:1s
    2.6  | 1h:15m:31s
    2.9  | 1h:3m:31s
    3.0  | 1h:0m:31s
    3.3  | 0h:52m:31s
    3.5  | 0h:48m:31s

A final note on the use of a PID. Typical use of a PID would be in
systems where the 'process variable' can be influenced in positive and
negative ways. E.g. a temperature could be controlled by heating or not
heating (or forced cooling). In our use-case we can influence
fragmentation down (by running the defragger), but we cannot raise
fragmentation to the set-point. i.e. our use of a PID cannot maintain
a level of fragmentation. This is why in the code, once the
fragmentation (score) drops below the lower threshold, the PID just
resets and the max sleep is used.

Change-Id: I0a2137c095ff02b7b5adead7e6bd150ceb9e6b2b
Reviewed-on: http://review.couchbase.org/c/kv_engine/+/157229
Tested-by: Build Bot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
  • Loading branch information
jimwwalker authored and daverigby committed Jul 15, 2021
1 parent 1e8f948 commit 255219e
Show file tree
Hide file tree
Showing 9 changed files with 785 additions and 20 deletions.
12 changes: 12 additions & 0 deletions engines/ep/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ ENDIF (WIN32)

add_executable(gencode tools/gencode.cc)
add_executable(genconfig tools/genconfig.cc)
add_executable(pid_runner tools/pid_runner.cc $<TARGET_OBJECTS:ep_objs>)

if (WIN32)
# windows need getopt
target_link_libraries(gencode platform)
Expand Down Expand Up @@ -124,6 +126,15 @@ target_link_libraries(kvstore_gen
${LIBEVENT_LIBRARIES}
)

target_link_libraries(pid_runner
ep-engine_collections
mcbp
mcd_time
mcd_tracing
statistics
xattr
${EP_STORAGE_LIBS})

ADD_CUSTOM_COMMAND(OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/src/stats-info.c
${CMAKE_CURRENT_BINARY_DIR}/src/stats-info.h
COMMAND
Expand Down Expand Up @@ -339,6 +350,7 @@ ADD_LIBRARY(ep_objs OBJECT
src/mutation_log_entry.cc
src/paging_visitor.cc
src/persistence_callback.cc
src/pid_controller.cc
src/pre_link_document_context.cc
src/pre_link_document_context.h
src/progress_tracker.cc
Expand Down
68 changes: 64 additions & 4 deletions engines/ep/configuration.json
Original file line number Diff line number Diff line change
Expand Up @@ -315,10 +315,9 @@
"dcp_noop_mandatory_for_v5_features": {
"default": "true",
"descr": "Forces clients to enable noop for v5 features",
"dynamic": true,
"dynamic": true,
"type": "bool"
},

"defragmenter_enabled": {
"default": "true",
"descr": "True if defragmenter task is enabled",
Expand All @@ -328,7 +327,7 @@
"defragmenter_interval": {
"default": "10.0",
"descr": "How often defragmenter task should be run (in seconds).",
"dynamic": true,
"dynamic": true,
"type": "float"
},
"defragmenter_age_threshold": {
Expand All @@ -346,14 +345,75 @@
"defragmenter_chunk_duration": {
"default": "20",
"descr": "Maximum time (in ms) defragmentation task will run for before being paused (and resumed at the next defragmenter_interval).",
"dynamic": true,
"dynamic": true,
"type": "size_t",
"validator": {
"range": {
"min": 1
}
}
},
"defragmenter_mode" : {
"default": "auto_pid",
"descr": "Determines how the defragmenter controls its sleep interval. When static defragmenter_interval is used. When auto_linear, scale the sleep time using a scored defragmentation when it falls between defragmenter_auto_lower_trigger and defragmenter_auto_upper_trigger. When auto_pid use a PID controller to computer reductions in the sleep interval when scored fragmentation is above defragmenter_auto_lower_trigger.",
"dynamic": false,
"type": "std::string",
"validator": {
"enum": [
"static",
"auto_linear",
"auto_pid"
]
}
},
"defragmenter_auto_lower_threshold" : {
"default": "0.07",
"descr": "When mode is not static and scored fragmentation is above this value, a sleep time between defragmenter_auto_min_sleep and defragmenter_auto_max_sleep will be used",
"dynamic": false,
"type": "float"
},
"defragmenter_auto_upper_threshold" : {
"default": "0.25",
"descr": "When mode is auto_linear and scored fragmentation is above this value, the defragmenter will use defragmenter_auto_min_sleep",
"dynamic": false,
"type": "float"
},
"defragmenter_auto_max_sleep" : {
"default": "10.0",
"descr": "The maximum sleep that the auto controller can set",
"dynamic": false,
"type": "float"
},
"defragmenter_auto_min_sleep" : {
"default": "0.0",
"descr": "The minimum sleep that the auto controller can set",
"dynamic": false,
"type": "float"
},
"defragmenter_auto_pid_p" : {
"default": "0.3",
"descr": "The p term for the PID controller",
"dynamic": false,
"type": "float"
},
"defragmenter_auto_pid_i" : {
"default": "0.0000197",
"descr": "The i term for the PID controller",
"dynamic": false,
"type": "float"
},
"defragmenter_auto_pid_d" : {
"default": "0.0",
"descr": "The d term for the PID controller",
"dynamic": false,
"type": "float"
},
"defragmenter_auto_pid_dt" : {
"default": "30000",
"descr": "The dt (interval) term for the PID controller. Value represents milliseconds",
"dynamic": false,
"type": "size_t"
},
"durability_timeout_task_interval": {
"default": "25",
"descr": "Interval (in ms) between subsequent runs of the DurabilityTimeoutTask",
Expand Down
134 changes: 122 additions & 12 deletions engines/ep/src/defragmenter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,40 @@ DefragmenterTask::DefragmenterTask(EventuallyPersistentEngine* e,
EPStats& stats_)
: GlobalTask(e, TaskId::DefragmenterTask, 0, false),
stats(stats_),
epstore_position(engine->getKVBucket()->startPosition()) {
epstore_position(engine->getKVBucket()->startPosition()),
pid(engine->getConfiguration().getDefragmenterAutoLowerThreshold(),
engine->getConfiguration().getDefragmenterAutoPidP(),
engine->getConfiguration().getDefragmenterAutoPidI(),
engine->getConfiguration().getDefragmenterAutoPidD(),
std::chrono::milliseconds{
engine->getConfiguration().getDefragmenterAutoPidDt()}) {
}

bool DefragmenterTask::run() {
TRACE_EVENT0("ep-engine/task", "DefragmenterTask");
std::chrono::duration<double> sleepTime;
if (engine->getConfiguration().isDefragmenterEnabled()) {
defrag();
sleepTime = defrag();
} else {
sleepTime = std::chrono::duration<double>{
engine->getConfiguration().getDefragmenterInterval()};
}
snooze(getSleepTime());
snooze(sleepTime.count());
if (engine->getEpStats().isShutdown) {
return false;
}
return true;
}

void DefragmenterTask::defrag() {
std::chrono::duration<double> DefragmenterTask::defrag() {
auto currentFragStats = cb::ArenaMalloc::getFragmentationStats(
engine->getArenaMallocClient());

auto sleepAndRun = calculateSleepTimeAndRunState(currentFragStats);
if (!sleepAndRun.runDefragger) {
return sleepAndRun.sleepTime;
}

// Get our pause/resume visitor. If we didn't finish the previous pass,
// then resume from where we last were, otherwise create a new visitor
// starting from the beginning.
Expand All @@ -61,11 +79,9 @@ void DefragmenterTask::defrag() {
ss << " resuming from " << epstore_position << ", ";
ss << prAdapter->getHashtablePosition() << ".";
}
auto fragStats = cb::ArenaMalloc::getFragmentationStats(
engine->getArenaMallocClient());
ss << " Using chunk_duration=" << getChunkDuration().count() << " ms."
<< " mem_used=" << stats.getEstimatedTotalMemoryUsed() << ", "
<< fragStats;
<< currentFragStats;
EP_LOG_DEBUG("{}", ss.str());
}

Expand Down Expand Up @@ -119,20 +135,22 @@ void DefragmenterTask::defrag() {
std::chrono::microseconds duration =
std::chrono::duration_cast<std::chrono::microseconds>(end -
start);
auto fragStats = cb::ArenaMalloc::getFragmentationStats(
engine->getArenaMallocClient());

ss << " Took " << duration.count() << " us."
<< " moved " << visitor.getDefragCount() << "/"
<< visitor.getVisitedCount() << " visited documents."
<< " mem_used=" << stats.getEstimatedTotalMemoryUsed() << ", "
<< fragStats << ". Sleeping for " << getSleepTime() << " seconds.";
<< cb::ArenaMalloc::getFragmentationStats(
engine->getArenaMallocClient())
<< ". Sleeping for " << sleepAndRun.sleepTime.count() << " seconds.";
EP_LOG_DEBUG("{}", ss.str());
}

// Delete(reset) visitor if it finished.
if (completed) {
prAdapter.reset();
}
return sleepAndRun.sleepTime;
}

void DefragmenterTask::stop() {
Expand All @@ -154,8 +172,17 @@ std::chrono::microseconds DefragmenterTask::maxExpectedDuration() const {
return getChunkDuration() * 10;
}

double DefragmenterTask::getSleepTime() const {
return engine->getConfiguration().getDefragmenterInterval();
DefragmenterTask::SleepTimeAndRunState
DefragmenterTask::calculateSleepTimeAndRunState(
const cb::FragmentationStats& fragStats) {
if (engine->getConfiguration().getDefragmenterMode() == "auto_linear") {
return calculateSleepLinear(fragStats);
} else if (engine->getConfiguration().getDefragmenterMode() == "auto_pid") {
return calculateSleepPID(fragStats);
}
return {std::chrono::duration<double>{
engine->getConfiguration().getDefragmenterInterval()},
true};
}

size_t DefragmenterTask::getAgeThreshold() const {
Expand Down Expand Up @@ -197,3 +224,86 @@ std::chrono::milliseconds DefragmenterTask::getChunkDuration() const {
DefragmentVisitor& DefragmenterTask::getDefragVisitor() {
return dynamic_cast<DefragmentVisitor&>(prAdapter->getHTVisitor());
}

float DefragmenterTask::getScoredFragmentation(
const cb::FragmentationStats& fragStats) const {
auto lowWater = stats.mem_low_wat.load();
auto rss = fragStats.getResidentBytes() > lowWater
? lowWater
: fragStats.getResidentBytes();
return fragStats.getFragmentationRatio() * (double(rss) / double(lowWater));
}

DefragmenterTask::SleepTimeAndRunState DefragmenterTask::calculateSleepLinear(
const cb::FragmentationStats& fragStats) {
auto score = getScoredFragmentation(fragStats);
bool runDefragger = true;

const auto& conf = engine->getConfiguration();
double rv = 0.0;
auto maxSleep = conf.getDefragmenterAutoMaxSleep();
auto minSleep = conf.getDefragmenterAutoMinSleep();
auto lower = conf.getDefragmenterAutoLowerThreshold();
auto upper = conf.getDefragmenterAutoUpperThreshold();

// Is the 'score' in the range where we will look to reduce sleep by
// some amount in relation to how 'bad' the score is?
if (score > lower && score < upper) {
// Calculate the error (distance from lower)
auto error = (score - lower);

// How many % of our error range is that?
auto ePerc = (error / (upper - lower)) * 100.0;

// And now find the % of the sleep range
auto t = ((maxSleep - minSleep) / 100) * ePerc;

// Finally we will return maxSleep - t. As t gets larger the sleep time
// is smaller
rv = maxSleep - t;
} else if (score < lower) {
rv = maxSleep;
runDefragger = false;
} else {
rv = minSleep;
}

return {std::chrono::duration<double>{rv}, runDefragger};
}

DefragmenterTask::SleepTimeAndRunState DefragmenterTask::calculateSleepPID(
const cb::FragmentationStats& fragStats) {
auto score = getScoredFragmentation(fragStats);
const auto& conf = engine->getConfiguration();
auto maxSleep = conf.getDefragmenterAutoMaxSleep();
auto minSleep = conf.getDefragmenterAutoMinSleep();

// If fragmentation goes below our set-point (SP), we can't continue to use
// the PID. More general usage and it would be used to "speed up/slow down"
// to reach the SP. We can't now force defragmentation up, we're just happy
// it's below the SP. In this case reset and when we go over again begin
// the ramping
if (score < conf.getDefragmenterAutoLowerThreshold()) {
// Reset the PID ready for the next time fragmentation increases
pid.reset();
return {std::chrono::duration<double>{maxSleep}, false};
}

// Above setpoint, use the PID to calculate a correction. This will return
// a negative value
auto correction = stepPid(score);

// Add the negative to produce a sleep time
auto rv = maxSleep + correction;

// Don't go below the minimum sleep
if (rv < minSleep) {
rv = minSleep;
}

return {std::chrono::duration<double>{rv}, true};
}

float DefragmenterTask::stepPid(float pv) {
return pid.step(pv);
}
Loading

0 comments on commit 255219e

Please sign in to comment.