Skip to content

Commit

Permalink
[Enhancement] Add datacache memory tracker to trace the datacache mem…
Browse files Browse the repository at this point in the history
…ory usage. (backport #38884) (#39569)

Signed-off-by: Gavin <yangguansuo@starrocks.com>
  • Loading branch information
GavinMar committed Jan 22, 2024
1 parent a7e7c29 commit af619c5
Show file tree
Hide file tree
Showing 15 changed files with 56 additions and 9 deletions.
9 changes: 8 additions & 1 deletion be/src/block_cache/block_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ Status BlockCache::init(const CacheOptions& options) {
LOG(ERROR) << "unsupported block cache engine: " << options.engine;
return Status::NotSupported("unsupported block cache engine");
}
return _kv_cache->init(options);
RETURN_IF_ERROR(_kv_cache->init(options));
_initialized.store(true, std::memory_order_relaxed);
return Status::OK();
}

Status BlockCache::write_buffer(const CacheKey& cache_key, off_t offset, const IOBuffer& buffer,
Expand Down Expand Up @@ -166,9 +168,14 @@ void BlockCache::record_read_cache(size_t size, int64_t lateny_us) {
_kv_cache->record_read_cache(size, lateny_us);
}

const DataCacheMetrics BlockCache::cache_metrics() const {
return _kv_cache->cache_metrics();
}

Status BlockCache::shutdown() {
Status st = _kv_cache->shutdown();
_kv_cache = nullptr;
_initialized.store(false, std::memory_order_relaxed);
return st;
}

Expand Down
5 changes: 5 additions & 0 deletions be/src/block_cache/block_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,15 @@ class BlockCache {

void record_read_cache(size_t size, int64_t lateny_us);

const DataCacheMetrics cache_metrics() const;

// Shutdown the cache instance to save some state meta
Status shutdown();

size_t block_size() const { return _block_size; }

bool is_initialized() { return _initialized.load(std::memory_order_relaxed); }

static const size_t MAX_BLOCK_SIZE;

private:
Expand All @@ -75,6 +79,7 @@ class BlockCache {

size_t _block_size = 0;
std::unique_ptr<KvCache> _kv_cache;
std::atomic<bool> _initialized = false;
};

} // namespace starrocks
6 changes: 6 additions & 0 deletions be/src/block_cache/cachelib_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ std::unordered_map<std::string, double> CacheLibWrapper::cache_stats() {
return navy_stats;
}

const DataCacheMetrics CacheLibWrapper::cache_metrics() {
// not implemented
DataCacheMetrics metrics{};
return metrics;
}

Status CacheLibWrapper::write_object(const std::string& key, const void* ptr, size_t size,
std::function<void()> deleter, CacheHandle* handle, WriteCacheOptions* options) {
return Status::NotSupported("not supported write object in cachelib");
Expand Down
2 changes: 2 additions & 0 deletions be/src/block_cache/cachelib_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class CacheLibWrapper : public KvCache {

std::unordered_map<std::string, double> cache_stats() override;

const DataCacheMetrics cache_metrics() override;

void record_read_remote(size_t size, int64_t lateny_us) override;

void record_read_cache(size_t size, int64_t lateny_us) override;
Expand Down
5 changes: 4 additions & 1 deletion be/src/block_cache/kv_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
#include "block_cache/cache_options.h"
#include "block_cache/io_buffer.h"
#include "common/status.h"
#include "starcache/star_cache.h"

using DataCacheMetrics = starcache::CacheMetrics;

namespace starrocks {

Expand All @@ -44,7 +47,7 @@ class KvCache {
// Remove data from cache. The offset must be aligned by block size
virtual Status remove(const std::string& key) = 0;

virtual std::unordered_map<std::string, double> cache_stats() = 0;
virtual const DataCacheMetrics cache_metrics() = 0;

virtual void record_read_remote(size_t size, int64_t lateny_us) = 0;

Expand Down
6 changes: 2 additions & 4 deletions be/src/block_cache/starcache_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,8 @@ Status StarCacheWrapper::remove(const std::string& key) {
return Status::OK();
}

std::unordered_map<std::string, double> StarCacheWrapper::cache_stats() {
// TODO: fill some statistics information
std::unordered_map<std::string, double> stats;
return stats;
const DataCacheMetrics StarCacheWrapper::cache_metrics() {
return _cache->metrics();
}

void StarCacheWrapper::record_read_remote(size_t size, int64_t lateny_us) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/block_cache/starcache_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class StarCacheWrapper : public KvCache {

Status remove(const std::string& key) override;

std::unordered_map<std::string, double> cache_stats() override;
const DataCacheMetrics cache_metrics() override;

void record_read_remote(size_t size, int64_t lateny_us) override;

Expand Down
17 changes: 15 additions & 2 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

#include <gflags/gflags.h>

#include "block_cache/block_cache.h"
#include "column/column_helper.h"
#include "column/column_pool.h"
#include "common/config.h"
Expand Down Expand Up @@ -116,6 +117,7 @@ void gc_memory(void* arg_this) {
* 3. max io util of all disks
* 4. max network send bytes rate
* 5. max network receive bytes rate
* 6. datacache memory usage
*/
void calculate_metrics(void* arg_this) {
int64_t last_ts = -1L;
Expand Down Expand Up @@ -172,18 +174,29 @@ void calculate_metrics(void* arg_this) {
&lst_net_receive_bytes);
}

// update datacache mem_tracker
auto datacache_mem_tracker = GlobalEnv::GetInstance()->datacache_mem_tracker();
int64_t datacache_mem_bytes = 0;
BlockCache* block_cache = BlockCache::instance();
if (block_cache->is_initialized()) {
auto datacache_metrics = block_cache->cache_metrics();
datacache_mem_bytes = datacache_metrics.mem_used_bytes + datacache_metrics.meta_used_bytes;
}
datacache_mem_tracker->set(datacache_mem_bytes);

auto* mem_metrics = StarRocksMetrics::instance()->system_metrics()->memory_metrics();

LOG(INFO) << fmt::format(
"Current memory statistics: process({}), query_pool({}), load({}), "
"metadata({}), compaction({}), schema_change({}), column_pool({}), "
"page_cache({}), update({}), chunk_allocator({}), clone({}), consistency({})",
"page_cache({}), update({}), chunk_allocator({}), clone({}), consistency({}), "
"datacache({})",
mem_metrics->process_mem_bytes.value(), mem_metrics->query_mem_bytes.value(),
mem_metrics->load_mem_bytes.value(), mem_metrics->metadata_mem_bytes.value(),
mem_metrics->compaction_mem_bytes.value(), mem_metrics->schema_change_mem_bytes.value(),
mem_metrics->column_pool_mem_bytes.value(), mem_metrics->storage_page_cache_mem_bytes.value(),
mem_metrics->update_mem_bytes.value(), mem_metrics->chunk_allocator_mem_bytes.value(),
mem_metrics->clone_mem_bytes.value(), mem_metrics->consistency_mem_bytes.value());
mem_metrics->clone_mem_bytes.value(), mem_metrics->consistency_mem_bytes.value(), datacache_mem_bytes);

nap_sleep(15, [daemon] { return daemon->stopped(); });
}
Expand Down
1 change: 1 addition & 0 deletions be/src/http/action/memory_metrics_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ void MemoryMetricsAction::handle(HttpRequest* req) {
"schema_change",
"column_pool",
"page_cache",
"datacache",
"update",
"chunk_allocator",
"clone",
Expand Down
3 changes: 3 additions & 0 deletions be/src/http/default_path_handlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ void mem_tracker_handler(MemTracker* mem_tracker, const WebPageHandler::Argument
} else if (iter->second == "consistency") {
start_mem_tracker = GlobalEnv::GetInstance()->consistency_mem_tracker();
cur_level = 2;
} else if (iter->second == "datacache") {
start_mem_tracker = GlobalEnv::GetInstance()->datacache_mem_tracker();
cur_level = 2;
} else {
start_mem_tracker = mem_tracker;
cur_level = 1;
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ Status GlobalEnv::_init_mem_tracker() {
_clone_mem_tracker = regist_tracker(-1, "clone", _process_mem_tracker.get());
int64_t consistency_mem_limit = calc_max_consistency_memory(_process_mem_tracker->limit());
_consistency_mem_tracker = regist_tracker(consistency_mem_limit, "consistency", _process_mem_tracker.get());
_datacache_mem_tracker = regist_tracker(-1, "datacache", _process_mem_tracker.get());
_replication_mem_tracker = regist_tracker(-1, "replication", _process_mem_tracker.get());

MemChunkAllocator::init_instance(_chunk_allocator_mem_tracker.get(), config::chunk_reserved_bytes_limit);
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ class GlobalEnv {
MemTracker* clone_mem_tracker() { return _clone_mem_tracker.get(); }
MemTracker* consistency_mem_tracker() { return _consistency_mem_tracker.get(); }
MemTracker* replication_mem_tracker() { return _replication_mem_tracker.get(); }
MemTracker* datacache_mem_tracker() { return _datacache_mem_tracker.get(); }
std::vector<std::shared_ptr<MemTracker>>& mem_trackers() { return _mem_trackers; }

int64_t get_storage_page_cache_size();
Expand Down Expand Up @@ -213,6 +214,9 @@ class GlobalEnv {

std::shared_ptr<MemTracker> _replication_mem_tracker;

// The memory used for datacache
std::shared_ptr<MemTracker> _datacache_mem_tracker;

std::vector<std::shared_ptr<MemTracker>> _mem_trackers;
};

Expand Down
1 change: 1 addition & 0 deletions be/src/script/script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ void bind_exec_env(ForeignModule& m) {
REG_METHOD(GlobalEnv, clone_mem_tracker);
REG_METHOD(GlobalEnv, consistency_mem_tracker);
REG_METHOD(GlobalEnv, connector_scan_pool_mem_tracker);
REG_METHOD(GlobalEnv, datacache_mem_tracker);

// level 2
REG_METHOD(GlobalEnv, tablet_metadata_mem_tracker);
Expand Down
2 changes: 2 additions & 0 deletions be/src/util/system_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ void SystemMetrics::_install_memory_metrics(MetricRegistry* registry) {
registry->register_metric("chunk_allocator_mem_bytes", &_memory_metrics->chunk_allocator_mem_bytes);
registry->register_metric("clone_mem_bytes", &_memory_metrics->clone_mem_bytes);
registry->register_metric("consistency_mem_bytes", &_memory_metrics->consistency_mem_bytes);
registry->register_metric("datacache_mem_bytes", &_memory_metrics->datacache_mem_bytes);

registry->register_metric("total_column_pool_bytes", &_memory_metrics->column_pool_total_bytes);
registry->register_metric("local_column_pool_bytes", &_memory_metrics->column_pool_local_bytes);
Expand Down Expand Up @@ -326,6 +327,7 @@ void SystemMetrics::_update_memory_metrics() {
SET_MEM_METRIC_VALUE(clone_mem_tracker, clone_mem_bytes)
SET_MEM_METRIC_VALUE(column_pool_mem_tracker, column_pool_mem_bytes)
SET_MEM_METRIC_VALUE(consistency_mem_tracker, consistency_mem_bytes)
SET_MEM_METRIC_VALUE(datacache_mem_tracker, datacache_mem_bytes)
#undef SET_MEM_METRIC_VALUE

#define UPDATE_COLUMN_POOL_METRIC(var, type) \
Expand Down
1 change: 1 addition & 0 deletions be/src/util/system_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class MemoryMetrics {
METRIC_DEFINE_INT_GAUGE(chunk_allocator_mem_bytes, MetricUnit::BYTES);
METRIC_DEFINE_INT_GAUGE(clone_mem_bytes, MetricUnit::BYTES);
METRIC_DEFINE_INT_GAUGE(consistency_mem_bytes, MetricUnit::BYTES);
METRIC_DEFINE_INT_GAUGE(datacache_mem_bytes, MetricUnit::BYTES);

// column pool metrics.
METRIC_DEFINE_INT_GAUGE(column_pool_total_bytes, MetricUnit::BYTES);
Expand Down

0 comments on commit af619c5

Please sign in to comment.