Skip to content

Commit

Permalink
MB-51875: Add support for collecting metering data [2/n]
Browse files Browse the repository at this point in the history
Add the skeleton for collecting metering information. Note that
all commands implemented in the engine itself would need to
update the cookie with the amount of data read / written

It is "easy" to track the write path as a successful document write
contains the WCU. We can't really use the bucket_get() methods for
counting the RCU's as we do multiple of them and might not return
the data to the user (also to implement retry logic on the server
for dealing with cas conflicts)

The intention with this patch is to get the infrastructure in
place, so that we can fan out the work to:

1) Get the RCU/WCU pushed to prometheus
2) Create a spec on how to calculate the RCU/WCU for the
   various commands
3) Account for RCU and WCU for the various commands according
   to 2.

Change-Id: I20d4b7779db95d73236c667255e232b09771f786
Reviewed-on: https://review.couchbase.org/c/kv_engine/+/173672
Tested-by: Build Bot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
  • Loading branch information
trondn committed Apr 21, 2022
1 parent 7e74655 commit e2baf05
Show file tree
Hide file tree
Showing 16 changed files with 182 additions and 9 deletions.
34 changes: 33 additions & 1 deletion daemon/buckets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
#include <platform/timeutils.h>
#include <utilities/engine_errc_2_mcbp.h>

static std::atomic<size_t> read_compute_unit_size;
static std::atomic<size_t> write_compute_unit_size;

Bucket::Bucket() = default;

void Bucket::reset() {
Expand All @@ -34,6 +37,8 @@ void Bucket::reset() {
clusterConfiguration.reset();
max_document_size = default_max_item_size;
supportedFeatures = {};
read_compute_units_used = 0;
write_compute_units_used = 0;
for (auto& c : responseCounters) {
c.reset();
}
Expand All @@ -58,6 +63,8 @@ nlohmann::json Bucket::to_json() const {
json["clients"] = clients.load();
json["name"] = name;
json["type"] = to_string(type);
json["rcu"] = read_compute_units_used.load();
json["wcu"] = write_compute_units_used.load();
} catch (const std::exception& e) {
LOG_ERROR("Failed to generate bucket details: {}", e.what());
}
Expand All @@ -83,6 +90,16 @@ void Bucket::setEngine(unique_engine_ptr engine_) {
bucketDcp = dynamic_cast<DcpIface*>(engine.get());
}

void Bucket::commandExecuted(const Cookie& cookie) {
const auto [read, write] = cookie.getDocumentRWBytes();
const auto rcu =
(read + read_compute_unit_size - 1) / read_compute_unit_size;
const auto wcu =
(write + write_compute_unit_size - 1) / write_compute_unit_size;
read_compute_units_used += rcu;
write_compute_units_used += wcu;
}

namespace BucketValidator {
std::string validateBucketName(std::string_view name) {
if (name.empty()) {
Expand Down Expand Up @@ -516,7 +533,22 @@ void BucketManager::destroyAll() {
}

BucketManager::BucketManager() {
size_t numthread = Settings::instance().getNumWorkerThreads() + 1;
auto& settings = Settings::instance();
read_compute_unit_size = settings.getReadComputeUnitSize();
write_compute_unit_size = settings.getWriteComputeUnitSize();

settings.addChangeListener("read_compute_unit_size",
[](const std::string&, Settings& s) -> void {
read_compute_unit_size =
s.getReadComputeUnitSize();
});
settings.addChangeListener("write_compute_unit_size",
[](const std::string&, Settings& s) -> void {
write_compute_unit_size =
s.getWriteComputeUnitSize();
});

size_t numthread = settings.getNumWorkerThreads() + 1;
for (auto& b : all_buckets) {
b.stats.resize(numthread);
}
Expand Down
11 changes: 10 additions & 1 deletion daemon/buckets.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2015-Present Couchbase, Inc.
*
Expand Down Expand Up @@ -171,6 +170,10 @@ class Bucket {
*/
void reset();

/// Notify the bucket that the provided command completed execution in
/// the bucket
void commandExecuted(const Cookie& cookie);

protected:
unique_engine_ptr engine;

Expand All @@ -179,6 +182,12 @@ class Bucket {
* connected bucket doesn't support DCP.
*/
DcpIface* bucketDcp{nullptr};

/// The number of RCUs being used in this bucket
std::atomic<std::size_t> read_compute_units_used{0};

/// The number of WCUs being used in this bucket
std::atomic<std::size_t> write_compute_units_used{0};
};

std::string to_string(Bucket::State state);
Expand Down
5 changes: 4 additions & 1 deletion daemon/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,10 @@ void Connection::processNotifiedCookie(Cookie& cookie, cb::engine_errc status) {
}
}

void Connection::commandExecuted() {
void Connection::commandExecuted(Cookie& cookie) {
if (!internal) {
getBucket().commandExecuted(cookie);
}
if (tenant) {
tenant->executed();
}
Expand Down
2 changes: 1 addition & 1 deletion daemon/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ class Connection : public DcpMessageProducersIface {
void processNotifiedCookie(Cookie& cookie, cb::engine_errc status);

/// Notify that a command was executed (needed for command rate limiting)
void commandExecuted();
void commandExecuted(Cookie& cookie);

protected:
/**
Expand Down
4 changes: 3 additions & 1 deletion daemon/cookie.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ bool Cookie::execute(bool useStartTime) {
tracer.record(cb::tracing::Code::Execute, ts, te);

if (done) {
connection.commandExecuted();
collectTimings(te);
connection.commandExecuted(*this);
return true;
}
return false;
Expand Down Expand Up @@ -552,6 +552,8 @@ cb::mcbp::Status Cookie::validate() {
}

void Cookie::reset() {
document_bytes_read = 0;
document_bytes_written = 0;
event_id.clear();
error_context.clear();
json_message.clear();
Expand Down
6 changes: 3 additions & 3 deletions daemon/memcached.cc
Original file line number Diff line number Diff line change
Expand Up @@ -985,13 +985,13 @@ int memcached_main(int argc, char** argv) {

startExecutorPool();

LOG_INFO_RAW("Starting Tenant stats collecting");
TenantManager::startup();

// Schedule the StaleTraceRemover
startStaleTraceDumpRemover(std::chrono::minutes(1),
std::chrono::minutes(5));

LOG_INFO_RAW("Starting Tenant stats collecting");
TenantManager::startup();

/* Initialise memcached time keeping */
mc_time_init(main_base->getLibeventBase());

Expand Down
1 change: 1 addition & 0 deletions daemon/protocol/mcbp/arithmetic_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ cb::engine_errc ArithmeticCommandContext::sendResult() {
extras = {};
}

cookie.addDocumentReadBytes(extras.size() + value.size());
cookie.sendResponse(cb::mcbp::Status::Success,
extras,
{},
Expand Down
4 changes: 4 additions & 0 deletions daemon/protocol/mcbp/engine_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ cb::engine_errc bucket_store(
add(cookie,
document_state == DocumentState::Alive ? Operation::Modify
: Operation::Delete);
cookie.addDocumentWriteBytes(item_->getValueView().size());
} else if (ret == cb::engine_errc::disconnect) {
LOG_WARNING("{}: {} bucket_store return cb::engine_errc::disconnect",
c.getId(),
Expand Down Expand Up @@ -151,6 +152,7 @@ cb::EngineErrorCasPair bucket_store_if(
add(cookie,
document_state == DocumentState::Alive ? Operation::Modify
: Operation::Delete);
cookie.addDocumentWriteBytes(item_->getValueView().size());
} else if (ret.status == cb::engine_errc::disconnect) {
LOG_WARNING("{}: {} store_if return cb::engine_errc::disconnect",
c.getId(),
Expand All @@ -174,6 +176,8 @@ cb::engine_errc bucket_remove(
if (ret == cb::engine_errc::success) {
cb::audit::document::add(cookie,
cb::audit::document::Operation::Delete);
// @todo it should be 1 WCU for the tombstone?
cookie.addDocumentWriteBytes(1);
} else if (ret == cb::engine_errc::disconnect) {
LOG_WARNING("{}: {} bucket_remove return cb::engine_errc::disconnect",
c.getId(),
Expand Down
1 change: 1 addition & 0 deletions daemon/protocol/mcbp/gat_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ cb::engine_errc GatCommandContext::sendResponse() {
}
}

cookie.addDocumentReadBytes(payload.size());
connection.sendResponse(
cookie,
cb::mcbp::Status::Success,
Expand Down
1 change: 1 addition & 0 deletions daemon/protocol/mcbp/get_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ cb::engine_errc GetCommandContext::sendResponse() {
}
}

cookie.addDocumentReadBytes(payload.size());
connection.sendResponse(
cookie,
cb::mcbp::Status::Success,
Expand Down
1 change: 1 addition & 0 deletions daemon/protocol/mcbp/get_locked_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ cb::engine_errc GetLockedCommandContext::sendResponse() {
}
}

cookie.addDocumentReadBytes(payload.size());
connection.sendResponse(
cookie,
cb::mcbp::Status::Success,
Expand Down
7 changes: 7 additions & 0 deletions daemon/subdocument.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1304,6 +1304,9 @@ static void subdoc_single_response(Cookie& cookie, SubdocCmdContext& context) {
// bucket_store.
if (!context.traits.is_mutator) {
cb::audit::document::add(cookie, cb::audit::document::Operation::Read);
if (context.overall_status == cb::mcbp::Status::Success) {
cookie.addDocumentReadBytes(value.size());
}
}

// Add mutation descr to response buffer if requested.
Expand Down Expand Up @@ -1494,6 +1497,10 @@ static void subdoc_multi_lookup_response(Cookie& cookie,
h.setLength((mloc.length));
connection.copyToOutputStream(h.getBuffer(),
{mloc.at, mloc.length});

if (context.overall_status == cb::mcbp::Status::Success) {
cookie.addDocumentReadBytes(mloc.length);
}
} else {
connection.copyToOutputStream(h.getBuffer());
}
Expand Down
20 changes: 20 additions & 0 deletions include/memcached/cookie_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#pragma once

#include <memcached/tracer.h>
#include <atomic>
#include <memory>
#include <optional>

Expand Down Expand Up @@ -97,4 +98,23 @@ class CookieIface : public cb::tracing::Traceable {

/// Get the payload from the command.
virtual std::string_view getInflatedInputPayload() const = 0;

/// Add the number of document bytes read
void addDocumentReadBytes(size_t nread) {
document_bytes_read += nread;
}

/// Add the number of document bytes written
void addDocumentWriteBytes(size_t nwrite) {
document_bytes_written += nwrite;
}

std::pair<size_t, size_t> getDocumentRWBytes() const {
return {document_bytes_read.load(std::memory_order_acquire),
document_bytes_written.load(std::memory_order_acquire)};
}

protected:
std::atomic<size_t> document_bytes_read = 0;
std::atomic<size_t> document_bytes_written = 0;
};
1 change: 1 addition & 0 deletions tests/testapp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ set(memcached_testapp_SOURCES
testapp_dcp_consumer.cc
testapp_deprecated_commands.cc
testapp_durability.cc
testapp_elixir.cc
testapp_environment.cc
testapp_environment.h
testapp_errmap.cc
Expand Down
89 changes: 89 additions & 0 deletions tests/testapp/testapp_elixir.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2022-Present Couchbase, Inc.
*
* Use of this software is governed by the Business Source License included
* in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
* in that file, in accordance with the Business Source License, use of this
* software will be governed by the Apache License, Version 2.0, included in
* the file licenses/APL2.txt.
*/

#include "testapp.h"
#include "testapp_client_test.h"

#include <nlohmann/json.hpp>
#include <protocol/connection/frameinfo.h>

class ElixirTest : public TestappXattrClientTest {
public:
/// Calculate the number of compute units the provided value should occupy
std::size_t calc_num_cu(std::size_t val) {
return val / 1024 + (val % 1024 ? 1 : 0);
}

protected:
std::pair<std::size_t, std::size_t> getComputeUnits() {
std::size_t rcu = 0, wcu = 0;
bool found = false;

adminConnection->stats(
[this, &rcu, &wcu, &found](const auto& k, const auto& v) {
if (!v.empty()) {
auto json = nlohmann::json::parse(v);
for (nlohmann::json& bucket : json["buckets"]) {
if (bucket["name"] == bucketName) {
rcu = bucket["rcu"].get<std::size_t>();
wcu = bucket["wcu"].get<std::size_t>();
found = true;
}
}
}
},
"bucket_details");
if (found) {
return {rcu, wcu};
}
throw std::runtime_error("getComputeUnits(): Bucket not found");
}

/// Execute a command and verify that RCU and WCU was updated to the
/// correct values
void execute(std::function<std::pair<std::size_t, std::size_t>()> command,
const char* message) {
const auto [pre_rcu, pre_wcu] = getComputeUnits();
auto [rcu, wcu] = command();
const auto [post_rcu, post_wcu] = getComputeUnits();
EXPECT_EQ(pre_rcu + rcu, post_rcu)
<< "Expected " << message << " to increase RCU with " << rcu;
EXPECT_EQ(pre_wcu + wcu, post_wcu)
<< "Expected " << message << " to increase WCU with " << wcu;
}
};

INSTANTIATE_TEST_SUITE_P(
TransportProtocols,
ElixirTest,
::testing::Combine(::testing::Values(TransportProtocols::McbpSsl),
::testing::Values(XattrSupport::Yes),
::testing::Values(ClientJSONSupport::Yes),
::testing::Values(ClientSnappySupport::Yes)),
PrintToStringCombinedName());

TEST_P(ElixirTest, TestAddGet) {
const auto ncu = calc_num_cu(document.value.size());
execute(
[ncu, this]() -> std::pair<std::size_t, std::size_t> {
userConnection->mutate(document, Vbid{0}, MutationType::Add);
return {0, ncu};
},
"ADD");

execute(
[ncu, this]() -> std::pair<std::size_t, std::size_t> {
userConnection->get(document.info.id, Vbid{0});
return {ncu, 0};
},
"GET");
}

// @todo add more tests
4 changes: 3 additions & 1 deletion tests/testapp/testapp_stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -348,12 +348,14 @@ TEST_P(StatsTest, TestBucketDetails) {
// Validate each bucket entry (I should probably extend it with checking
// of the actual values
for (const auto& bucket : array) {
EXPECT_EQ(5, bucket.size());
EXPECT_EQ(7, bucket.size());
EXPECT_NE(bucket.end(), bucket.find("index"));
EXPECT_NE(bucket.end(), bucket.find("state"));
EXPECT_NE(bucket.end(), bucket.find("clients"));
EXPECT_NE(bucket.end(), bucket.find("name"));
EXPECT_NE(bucket.end(), bucket.find("type"));
EXPECT_NE(bucket.end(), bucket.find("rcu"));
EXPECT_NE(bucket.end(), bucket.find("wcu"));
}
}

Expand Down

0 comments on commit e2baf05

Please sign in to comment.