Skip to content

Commit

Permalink
MB-38978: Gather "dcp" stats group in a background task
Browse files Browse the repository at this point in the history
The "dcp" stats group includes aggregated values which may become
expensive to collect if there are lots of dcp streams.

To avoid this processing holding up a front end thread, move this
aggregation to a task.

Change-Id: Iea9b901b8d27164a0f0141967361553f0596f784
Reviewed-on: http://review.couchbase.org/c/kv_engine/+/156699
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: James H <james.harrison@couchbase.com>
  • Loading branch information
jameseh96 authored and daverigby committed Jul 5, 2021
1 parent 461d0a8 commit 5f84774
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 15 deletions.
100 changes: 87 additions & 13 deletions engines/ep/src/ep_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3869,26 +3869,101 @@ cb::engine_errc EventuallyPersistentEngine::doConnAggStats(
return cb::engine_errc::success;
}

class StatDCPTask : public GlobalTask {
public:
using Callback = std::function<cb::engine_errc(
EventuallyPersistentEngine* e, const void* cookie)>;
StatDCPTask(EventuallyPersistentEngine* e,
const void* cookie,
Callback callback)
: GlobalTask(e, TaskId::StatDCPTask, 0, false),
e(e),
cookie(cookie),
callback(std::move(callback)) {
}
bool run() override {
TRACE_EVENT0("ep-engine/task", "StatDCPTask");
cb::engine_errc result = cb::engine_errc::failed;
try {
result = callback(e, cookie);
} catch (const std::exception& e) {
EP_LOG_WARN(
"StatDCPTask: callback threw exception: \"{}\" task "
"desc:\"{}\"",
e.what(),
getDescription());
}
e->notifyIOComplete(cookie, result);
return false;
}

std::string getDescription() const override {
return "bucket-level aggregated DCP stats";
}

std::chrono::microseconds maxExpectedDuration() const override {
// Task aggregates all dcp connections, of which there can be many, so
// set limit of 100ms.
return std::chrono::milliseconds(100);
}

private:
EventuallyPersistentEngine* e;
const void* cookie;
Callback callback;
};

cb::engine_errc EventuallyPersistentEngine::doDcpStats(
const void* cookie, const AddStatFn& add_stat, std::string_view value) {
void* engineSpecific = getEngineSpecific(cookie);
if (engineSpecific == nullptr) {
ExTask task = std::make_shared<StatDCPTask>(
this,
cookie,
[add_stat, filter = std::string(value)](
EventuallyPersistentEngine* ep, const void* cookie) {
ep->doDcpStatsInner(cookie, add_stat, filter);
return cb::engine_errc::success;
});
ExecutorPool::get()->schedule(task);
storeEngineSpecific(cookie, this);
return cb::engine_errc::would_block;
} else {
storeEngineSpecific(cookie, nullptr);
}

return cb::engine_errc::success;
}

void EventuallyPersistentEngine::doDcpStatsInner(const void* cookie,
const AddStatFn& add_stat,
std::string_view value) {
ConnStatBuilder dcpVisitor(cookie, add_stat, DcpStatsFilter{value});
dcpConnMap_->each(dcpVisitor);

const auto& aggregator = dcpVisitor.getCounter();

add_casted_stat("ep_dcp_count", aggregator.totalConns, add_stat, cookie);
add_casted_stat("ep_dcp_producer_count", aggregator.totalProducers, add_stat, cookie);
add_casted_stat("ep_dcp_total_bytes", aggregator.conn_totalBytes, add_stat, cookie);
add_casted_stat("ep_dcp_total_uncompressed_data_size", aggregator.conn_totalUncompressedDataSize,
add_stat, cookie);
add_casted_stat("ep_dcp_total_queue", aggregator.conn_queue,
add_stat, cookie);
add_casted_stat("ep_dcp_queue_fill", aggregator.conn_queueFill,
add_stat, cookie);
add_casted_stat("ep_dcp_items_sent", aggregator.conn_queueDrain,
add_stat, cookie);
add_casted_stat("ep_dcp_items_remaining", aggregator.conn_queueRemaining,
add_stat, cookie);
add_casted_stat("ep_dcp_producer_count",
aggregator.totalProducers,
add_stat,
cookie);
add_casted_stat(
"ep_dcp_total_bytes", aggregator.conn_totalBytes, add_stat, cookie);
add_casted_stat("ep_dcp_total_uncompressed_data_size",
aggregator.conn_totalUncompressedDataSize,
add_stat,
cookie);
add_casted_stat(
"ep_dcp_total_queue", aggregator.conn_queue, add_stat, cookie);
add_casted_stat(
"ep_dcp_queue_fill", aggregator.conn_queueFill, add_stat, cookie);
add_casted_stat(
"ep_dcp_items_sent", aggregator.conn_queueDrain, add_stat, cookie);
add_casted_stat("ep_dcp_items_remaining",
aggregator.conn_queueRemaining,
add_stat,
cookie);
add_casted_stat("ep_dcp_num_running_backfills",
dcpConnMap_->getNumRunningBackfills(),
add_stat,
Expand All @@ -3899,7 +3974,6 @@ cb::engine_errc EventuallyPersistentEngine::doDcpStats(
cookie);

dcpConnMap_->addStats(add_stat, cookie);
return cb::engine_errc::success;
}

cb::engine_errc EventuallyPersistentEngine::doEvictionStats(
Expand Down
6 changes: 6 additions & 0 deletions engines/ep/src/ep_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,12 @@ class EventuallyPersistentEngine : public EngineIface, public DcpIface {
cb::engine_errc doDcpStats(const void* cookie,
const AddStatFn& add_stat,
std::string_view value);
/**
* Immediately collect DCP stats, without scheduling a background task.
*/
void doDcpStatsInner(const void* cookie,
const AddStatFn& add_stat,
std::string_view value);
cb::engine_errc doEvictionStats(const void* cookie,
const AddStatFn& add_stat);
cb::engine_errc doConnAggStats(const BucketStatCollector& collector,
Expand Down
4 changes: 2 additions & 2 deletions engines/ep/src/kv_bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1188,10 +1188,10 @@ static void snapshot_add_stat(std::string_view key,
void KVBucket::snapshotStats(bool shuttingDown) {
snapshot_add_stat_cookie snap;
bool rv = engine.getStats(&snap, {}, {}, snapshot_add_stat) ==
cb::engine_errc::success &&
engine.getStats(&snap, "dcp", {}, snapshot_add_stat) ==
cb::engine_errc::success;

engine.doDcpStatsInner(&snap, snapshot_add_stat, {});

nlohmann::json snapshotStats(snap.smap);
if (rv && shuttingDown) {
snapshotStats["ep_force_shutdown"] =
Expand Down
1 change: 1 addition & 0 deletions engines/ep/src/tasks.def.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ TASK(ClosedUnrefCheckpointRemoverTask, NONIO_TASK_IDX, 6)
TASK(ClosedUnrefCheckpointRemoverVisitorTask, NONIO_TASK_IDX, 6)
TASK(VBucketMemoryDeletionTask, NONIO_TASK_IDX, 6)
TASK(StatCheckpointTask, NONIO_TASK_IDX, 7)
TASK(StatDCPTask, NONIO_TASK_IDX, 7)
TASK(DefragmenterTask, NONIO_TASK_IDX, 7)
TASK(ItemCompressorTask, NONIO_TASK_IDX, 7)
TASK(EphTombstoneHTCleaner, NONIO_TASK_IDX, 7)
Expand Down

0 comments on commit 5f84774

Please sign in to comment.