diff --git a/src/v/kafka/CMakeLists.txt b/src/v/kafka/CMakeLists.txt index 0fcdd3a05e13d..012503558cb52 100644 --- a/src/v/kafka/CMakeLists.txt +++ b/src/v/kafka/CMakeLists.txt @@ -38,6 +38,7 @@ v_cc_library( server/group.cc server/group_router.cc server/group_manager.cc + server/usage_manager.cc server/rm_group_frontend.cc server/connection_context.cc server/server.cc diff --git a/src/v/kafka/server/fwd.h b/src/v/kafka/server/fwd.h index cf314a040b908..f5eff27ceb818 100644 --- a/src/v/kafka/server/fwd.h +++ b/src/v/kafka/server/fwd.h @@ -24,5 +24,6 @@ class snc_quota_manager; class request_context; class rm_group_frontend; class rm_group_proxy_impl; +class usage_manager; } // namespace kafka diff --git a/src/v/kafka/server/usage_manager.cc b/src/v/kafka/server/usage_manager.cc new file mode 100644 index 0000000000000..8644815d8ef7b --- /dev/null +++ b/src/v/kafka/server/usage_manager.cc @@ -0,0 +1,384 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "kafka/server/usage_manager.h" + +#include "config/configuration.h" +#include "kafka/server/logger.h" +#include "storage/api.h" +#include "vlog.h" + +namespace kafka { + +static constexpr std::string_view period_key{"period"}; +static constexpr std::string_view max_duration_key{"max_duration"}; +static constexpr std::string_view buckets_key{"buckets"}; + +static bytes key_to_bytes(std::string_view sv) { + bytes k; + k.append(reinterpret_cast(sv.begin()), sv.size()); + return k; +} + +struct persisted_state { + std::chrono::seconds configured_period; + size_t configured_windows; + fragmented_vector current_state; +}; + +static ss::future<> +persist_to_disk(storage::kvstore& kvstore, persisted_state s) { + using kv_ks = storage::kvstore::key_space; + + co_await kvstore.put( + kv_ks::usage, + key_to_bytes(period_key), + serde::to_iobuf(s.configured_period)); + co_await kvstore.put( + kv_ks::usage, + key_to_bytes(max_duration_key), + serde::to_iobuf(s.configured_windows)); + co_await kvstore.put( + kv_ks::usage, + key_to_bytes(buckets_key), + serde::to_iobuf(std::move(s.current_state))); +} + +static std::optional +restore_from_disk(storage::kvstore& kvstore) { + using kv_ks = storage::kvstore::key_space; + std::optional period, windows, data; + try { + period = kvstore.get(kv_ks::usage, key_to_bytes(period_key)); + windows = kvstore.get(kv_ks::usage, key_to_bytes(max_duration_key)); + data = kvstore.get(kv_ks::usage, key_to_bytes(buckets_key)); + } catch (const std::exception& ex) { + vlog( + klog.debug, + "Encountered exception when retriving usage data from disk: {}", + ex); + return std::nullopt; + } + if (!period && !windows && !data) { + /// Data didn't exist + return std::nullopt; + } else if (!period || !windows || !data) { + vlog( + klog.error, + "Inconsistent usage_manager on disk state detected, failed to " + "recover state"); + return std::nullopt; + } + return persisted_state{ + .configured_period = serde::from_iobuf( + std::move(*period)), + .configured_windows = serde::from_iobuf(std::move(*windows)), + .current_state = serde::from_iobuf>( + std::move(*data))}; +} + +static ss::future<> clear_persisted_state(storage::kvstore& kvstore) { + using kv_ks = storage::kvstore::key_space; + try { + co_await kvstore.remove(kv_ks::usage, key_to_bytes(period_key)); + co_await kvstore.remove(kv_ks::usage, key_to_bytes(max_duration_key)); + co_await kvstore.remove(kv_ks::usage, key_to_bytes(buckets_key)); + } catch (const std::exception& ex) { + vlog(klog.debug, "Ignoring exception from storage layer: {}", ex); + } +} + +static auto epoch_time_secs( + ss::lowres_system_clock::time_point now = ss::lowres_system_clock::now()) { + return std::chrono::duration_cast( + now.time_since_epoch()) + .count(); +} + +void usage_window::reset(ss::lowres_system_clock::time_point now) { + begin = epoch_time_secs(now); + end = 0; + u.bytes_sent = 0; + u.bytes_received = 0; + u.bytes_cloud_storage = 0; +} + +usage usage::operator+(const usage& other) const { + return usage{ + .bytes_sent = bytes_sent + other.bytes_sent, + .bytes_received = bytes_received + other.bytes_received, + .bytes_cloud_storage = bytes_cloud_storage + other.bytes_cloud_storage}; +} + +usage_manager::accounting_fiber::accounting_fiber( + ss::sharded& um, + ss::sharded& storage, + size_t usage_num_windows, + std::chrono::seconds usage_window_width_interval, + std::chrono::seconds usage_disk_persistance_interval) + : _usage_num_windows(usage_num_windows) + , _usage_window_width_interval(usage_window_width_interval) + , _usage_disk_persistance_interval(usage_disk_persistance_interval) + , _kvstore(storage.local().kvs()) + , _um(um) { + vlog( + klog.info, + "Starting accounting fiber with settings, {{usage_num_windows: {} " + "usage_window_width_interval: {} " + "usage_disk_persistance_interval:{}}}", + usage_num_windows, + usage_window_width_interval, + usage_disk_persistance_interval); + /// TODO: This should be refactored when fragmented_vector::resize is + /// implemented + for (size_t i = 0; i < _usage_num_windows; ++i) { + _buckets.push_back(usage_window{}); + } + _buckets[_current_window].reset(ss::lowres_system_clock::now()); +} + +ss::future<> usage_manager::accounting_fiber::start() { + /// In the event of a quick restart, reset_state() will set the + /// _current_index to where it was before restart, however the total time + /// until the next window must be accounted for. This is the duration for + /// which redpanda was down. + auto h = _gate.hold(); + auto last_window_delta = std::chrono::seconds(0); + auto state = restore_from_disk(_kvstore); + if (state) { + if ( + state->configured_period != _usage_window_width_interval + || state->configured_windows != _usage_num_windows) { + vlog( + klog.info, + "Persisted usage state had been configured with different " + "options, clearing state and restarting with current " + "configuration options"); + co_await clear_persisted_state(_kvstore); + } else { + last_window_delta = reset_state(std::move(state->current_state)); + } + } + _persist_disk_timer.set_callback([this] { + ssx::background + = ssx::spawn_with_gate_then( + _gate, + [this] { + return persist_to_disk( + _kvstore, + persisted_state{ + .configured_period = _usage_window_width_interval, + .configured_windows = _usage_num_windows, + .current_state = _buckets.copy()}); + }) + .then([this] { + if (!_gate.is_closed()) { + _persist_disk_timer.arm( + ss::lowres_clock::now() + + _usage_disk_persistance_interval); + } + }) + .handle_exception([this](std::exception_ptr eptr) { + using namespace std::chrono_literals; + vlog( + klog.debug, + "Encountered exception when persisting usage data to disk: " + "{} , retrying", + eptr); + if (!_gate.is_closed()) { + const auto retry = std::min( + _usage_disk_persistance_interval, 5s); + _persist_disk_timer.arm(ss::lowres_clock::now() + retry); + } + }); + }); + _timer.set_callback([this] { + ssx::background = ssx::spawn_with_gate_then(_gate, [this]() { + return close_window(); + }).finally([this] { + if (!_gate.is_closed()) { + _timer.arm( + ss::lowres_clock::now() + _usage_window_width_interval); + } + }); + }); + const auto now = ss::lowres_clock::now(); + vassert( + last_window_delta <= _usage_window_width_interval, + "Error correctly detecting last window delta"); + _timer.arm((now + _usage_window_width_interval) - last_window_delta); + _persist_disk_timer.arm(now + _usage_disk_persistance_interval); +} + +std::vector +usage_manager::accounting_fiber::get_usage_stats() const { + std::vector stats; + for (size_t i = 1; i < _buckets.size(); ++i) { + const auto idx = (_current_window + i) % _usage_num_windows; + if (!_buckets[idx].is_uninitialized()) { + stats.push_back(_buckets[idx]); + } + } + /// Open bucket last ensures ordering from oldest to newest + stats.push_back(_buckets[_current_window]); + /// std::reverse returns results in ordering from newest to oldest + std::reverse(stats.begin(), stats.end()); + return stats; +} + +ss::future<> usage_manager::accounting_fiber::stop() { + _timer.cancel(); + _persist_disk_timer.cancel(); + try { + co_await persist_to_disk( + _kvstore, + persisted_state{ + .configured_period = _usage_window_width_interval, + .configured_windows = _usage_num_windows, + .current_state = _buckets.copy()}); + } catch (const std::exception& ex) { + vlog( + klog.debug, + "Encountered exception when persisting usage data to disk: {}", + ex); + } + co_await _gate.close(); +} + +ss::future<> usage_manager::accounting_fiber::close_window() { + const auto now = ss::lowres_system_clock::now(); + _buckets[_current_window].u = co_await _um.map_reduce0( + [](usage_manager& um) { return um.sample(); }, + _buckets[_current_window].u, + [](const usage& acc, const usage& x) { return acc + x; }); + _buckets[_current_window].end = epoch_time_secs(now); + _current_window = (_current_window + 1) % _buckets.size(); + _buckets[_current_window].reset(now); +} + +std::chrono::seconds usage_manager::accounting_fiber::reset_state( + fragmented_vector buckets) { + /// called after restart to determine which bucket is the 'current' bucket + auto last_window_delta = std::chrono::seconds(0); + _current_window = 0; + if (!buckets.empty()) { + std::optional open_index; + for (size_t i = 0; i < buckets.size(); ++i) { + /// There will always be exactly 1 open_window in the result set + if (buckets[i].is_open()) { + vassert(!open_index, "Data serialization was incorrect"); + open_index = i; + break; + } + } + vassert(open_index, "Data serialization was incorrect"); + /// Optimization to begin picking up if wall time is within + /// window interval + const auto begin = std::chrono::seconds(buckets[*open_index].begin); + const auto now_ts = ss::lowres_system_clock::now(); + const auto now = std::chrono::seconds(epoch_time_secs(now_ts)); + const auto delta = now - begin; + if (begin > now || delta < _usage_window_width_interval) { + /// Clock drift has resulted in the later time appearing before the + /// beginning timestamp or time delta is within interval, continue + /// from open bucket + _current_window = *open_index; + last_window_delta = delta; + } else { + /// Close window and open a new one + buckets[*open_index].end = epoch_time_secs(now_ts); + _current_window = (*open_index + 1) % buckets.size(); + buckets[_current_window].reset(now_ts); + } + } + _buckets = std::move(buckets); + return last_window_delta; +} + +usage_manager::usage_manager(ss::sharded& storage) + : _usage_enabled(config::shard_local_cfg().enable_usage.bind()) + , _usage_num_windows(config::shard_local_cfg().usage_num_windows.bind()) + , _usage_window_width_interval( + config::shard_local_cfg().usage_window_width_interval_sec.bind()) + , _usage_disk_persistance_interval( + config::shard_local_cfg().usage_disk_persistance_interval_sec.bind()) + , _storage(storage) { + _usage_enabled.watch([this] { (void)reset(); }); + _usage_num_windows.watch([this] { (void)reset(); }); + _usage_window_width_interval.watch([this] { (void)reset(); }); + _usage_disk_persistance_interval.watch([this] { (void)reset(); }); +} + +ss::future<> usage_manager::reset() { + oncore_debug_verify(_verify_shard); + try { + auto h = _background_gate.hold(); + auto u = _background_mutex.get_units(); + if (_accounting_fiber) { + /// Deallocate the accounting_fiber if the feature is disabled, + /// otherwise it will keep in memory the number of configured + /// historical buckets + auto accounting_fiber = std::exchange(_accounting_fiber, nullptr); + co_await accounting_fiber->stop(); + } + co_await start(); + } catch (ss::gate_closed_exception&) { + // shutting down + } +} + +ss::future<> usage_manager::start() { + if (ss::this_shard_id() != usage_manager_main_shard) { + co_return; /// Async work only occurs on shard-0 + } + if (_accounting_fiber) { + co_return; /// Double start called, do-nothing + } + if (!_usage_enabled()) { + co_return; /// Feature is disabled in config + } + _accounting_fiber = std::make_unique( + this->container(), + _storage, + _usage_num_windows(), + _usage_window_width_interval(), + _usage_disk_persistance_interval()); + + co_await _accounting_fiber->start(); + + /// Reset all state + co_await container().invoke_on_all( + [](usage_manager& um) { (void)um.sample(); }); +} + +ss::future<> usage_manager::stop() { + co_await _background_gate.close(); + if (!_accounting_fiber) { + co_return; + } + /// Logic could only possibly execute on core-0 since no other cores would + /// initialize a local _accounting_fiber + co_await _accounting_fiber->stop(); +} + +std::vector usage_manager::get_usage_stats() const { + if (ss::this_shard_id() != usage_manager_main_shard) { + throw std::runtime_error( + "Attempt to query results of " + "kafka::usage_manager off the main accounting shard"); + } + if (!_accounting_fiber) { + return {}; // no stats + } + return _accounting_fiber->get_usage_stats(); +} + +} // namespace kafka diff --git a/src/v/kafka/server/usage_manager.h b/src/v/kafka/server/usage_manager.h new file mode 100644 index 0000000000000..9bf8cc0e76070 --- /dev/null +++ b/src/v/kafka/server/usage_manager.h @@ -0,0 +1,169 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once +#include "bytes/oncore.h" +#include "config/property.h" +#include "storage/kvstore.h" +#include "utils/fragmented_vector.h" +#include "utils/mutex.h" + +#include +#include + +namespace kafka { + +/// Main structure of statistics that are being accounted for. These are +/// periodically serialized to disk, hence why the struct inherits from the +/// serde::envelope +struct usage + : serde::envelope, serde::compat_version<0>> { + uint64_t bytes_sent{0}; + uint64_t bytes_received{0}; + uint64_t bytes_cloud_storage{0}; + usage operator+(const usage&) const; + auto serde_fields() { + return std::tie(bytes_sent, bytes_received, bytes_cloud_storage); + } +}; + +struct usage_window + : serde::envelope, serde::compat_version<0>> { + uint64_t begin{0}; + uint64_t end{0}; + usage u; + + bool is_uninitialized() const { return begin == 0 && end == 0; } + bool is_open() const { return begin != 0 && end == 0; } + void reset(ss::lowres_system_clock::time_point now); + auto serde_fields() { return std::tie(begin, end, u); } +}; + +/// Class that manages all usage statistics. Usage stats are more accurate +/// representation of node usage. This class maintains these stats windowed, +/// a static number of windows that last some predefined interval, all of which +/// are options that can be configured at runtime. +/// +/// Periodically these stats are written to disk using the kvstore at a 5min +/// interval, that way they are somewhat durable in the event of a node crash, +/// or restart. +class usage_manager : public ss::peering_sharded_service { +public: + static constexpr ss::shard_id usage_manager_main_shard{0}; + + class accounting_fiber { + public: + accounting_fiber( + ss::sharded& um, + ss::sharded& storage, + size_t usage_num_windows, + std::chrono::seconds usage_window_width_interval, + std::chrono::seconds usage_disk_persistance_interval); + + /// Starts a fiber that manage window interval evolution and disk + /// persistance + ss::future<> start(); + + /// Stops the fiber and flushes current in memory results to disk + ss::future<> stop(); + + /// Returns aggregate of all \ref usage stats across cores + std::vector get_usage_stats() const; + + private: + std::chrono::seconds + reset_state(fragmented_vector buckets); + ss::future<> close_window(); + + private: + size_t _usage_num_windows; + std::chrono::seconds _usage_window_width_interval; + std::chrono::seconds _usage_disk_persistance_interval; + + /// Timers for controlling window closure and disk persistance + ss::timer _timer; + ss::timer _persist_disk_timer; + + ss::gate _gate; + size_t _current_window{0}; + fragmented_vector _buckets; + storage::kvstore& _kvstore; + ss::sharded& _um; + }; + + /// Class constructor + /// + /// Context is to be in a sharded service, will grab \ref usage_num_windows + /// and \ref usage_window_sec configuration parameters from cluster config + explicit usage_manager(ss::sharded& storage); + + /// Allocates and starts the accounting fiber + ss::future<> start(); + + /// Safely shuts down accounting fiber and deallocates it + ss::future<> stop(); + + /// Adds bytes to current open window + /// + /// Should be called at the kafka layer to account for bytes sent via kafka + /// port + void add_bytes_sent(size_t sent) { _current_bucket.bytes_sent += sent; } + + /// Adds bytes received to current open window + /// + /// Should be called at the kafka layer to account for bytes received via + /// kafka port + void add_bytes_recv(size_t recv) { _current_bucket.bytes_received += recv; } + + /// Obtain all current stats - for all shards + /// + std::vector get_usage_stats() const; + + /// Obtain all current stats - for 'this' shard, resets window + /// + usage sample() { + usage u{}; + std::swap(_current_bucket, u); + return u; + } + +private: + /// Called when config options are modified + ss::future<> reset(); + + expression_in_debug_mode(oncore _verify_shard); + +private: + /// Config bindings, usage is visibility::user, others ::tunable + config::binding _usage_enabled; + config::binding _usage_num_windows; + config::binding _usage_window_width_interval; + config::binding _usage_disk_persistance_interval; + + ss::sharded& _storage; + + /// Per-core metric, shard-0 aggregates these values across shards + usage _current_bucket; + mutex _background_mutex; + ss::gate _background_gate; + + /// Valid on core-0 when usage_enabled() == true + std::unique_ptr _accounting_fiber; +}; + +/// Bytes accounted for by the kafka::usage_manager will not take into +/// consideration data batches via produce/fetch requests to these topics +static const auto usage_excluded_topics = std::to_array( + {model::topic("_schemas"), + model::topic("__audit"), + model::topic("__redpanda_e2e_probe")}); + +} // namespace kafka