Skip to content

Commit

Permalink
ncement] Add datacache memory tracker to trace the datacache memory u…
Browse files Browse the repository at this point in the history
…sage. (backport StarRocks#38884)

Signed-off-by: Gavin <yangguansuo@starrocks.com>
  • Loading branch information
GavinMar committed Jan 19, 2024
1 parent d57a092 commit c478845
Show file tree
Hide file tree
Showing 13 changed files with 65 additions and 10 deletions.
18 changes: 16 additions & 2 deletions be/src/block_cache/block_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ namespace starrocks {

namespace fs = std::filesystem;

// The cachelib doesn't support a item (key+valueu+attribute) larger than 4 MB without chain.
// So, we check and limit the block_size configured by users to avoid unexpected errors.
// For starcache, in theory we doesn't have a hard limitation for block size, but a very large
// block_size may cause heavy read amplification. So, we also limit it to 2 MB as an empirical value.
const size_t BlockCache::MAX_BLOCK_SIZE = 2 * 1024 * 1024;

BlockCache* BlockCache::instance() {
static BlockCache cache;
return &cache;
Expand All @@ -57,7 +63,7 @@ Status BlockCache::init(const CacheOptions& options) {
}
}
}
_block_size = options.block_size;
_block_size = std::min(options.block_size, MAX_BLOCK_SIZE);
#ifdef WITH_CACHELIB
if (options.engine == "cachelib") {
_kv_cache = std::make_unique<CacheLibWrapper>();
Expand All @@ -74,7 +80,10 @@ Status BlockCache::init(const CacheOptions& options) {
LOG(ERROR) << "unsupported block cache engine: " << options.engine;
return Status::NotSupported("unsupported block cache engine");
}
return _kv_cache->init(options);

RETURN_IF_ERROR(_kv_cache->init(options));
_initialized.store(true, std::memory_order_relaxed);
return Status::OK();
}

Status BlockCache::write_cache(const CacheKey& cache_key, off_t offset, const IOBuffer& buffer, size_t ttl_seconds,
Expand Down Expand Up @@ -136,9 +145,14 @@ Status BlockCache::remove_cache(const CacheKey& cache_key, off_t offset, size_t
return _kv_cache->remove_cache(block_key);
}

const DataCacheMetrics BlockCache::cache_metrics() const {
return _kv_cache->cache_metrics();
}

Status BlockCache::shutdown() {
Status st = _kv_cache->shutdown();
_kv_cache = nullptr;
_initialized.store(false, std::memory_order_relaxed);
return st;
}

Expand Down
7 changes: 7 additions & 0 deletions be/src/block_cache/block_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,25 @@ class BlockCache {
// Remove data from cache. The offset and size must be aligned by block size
Status remove_cache(const CacheKey& cache_key, off_t offset, size_t size);

const DataCacheMetrics cache_metrics() const;

// Shutdown the cache instance to save some state meta
Status shutdown();

size_t block_size() const { return _block_size; }

bool is_initialized() { return _initialized.load(std::memory_order_relaxed); }

static const size_t MAX_BLOCK_SIZE;

private:
#ifndef BE_TEST
BlockCache() = default;
#endif

size_t _block_size = 0;
std::unique_ptr<KvCache> _kv_cache;
std::atomic<bool> _initialized = false;
};

} // namespace starrocks
6 changes: 6 additions & 0 deletions be/src/block_cache/cachelib_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ std::unordered_map<std::string, double> CacheLibWrapper::cache_stats() {
return navy_stats;
}

const DataCacheMetrics CacheLibWrapper::cache_metrics() {
// not implemented
DataCacheMetrics metrics{};
return metrics;
}

Status CacheLibWrapper::shutdown() {
if (_cache) {
_dump_cache_stats();
Expand Down
2 changes: 2 additions & 0 deletions be/src/block_cache/cachelib_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class CacheLibWrapper : public KvCache {

std::unordered_map<std::string, double> cache_stats() override;

const DataCacheMetrics cache_metrics() override;

Status shutdown() override;

private:
Expand Down
5 changes: 4 additions & 1 deletion be/src/block_cache/kv_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
#include "block_cache/cache_options.h"
#include "block_cache/io_buffer.h"
#include "common/status.h"
#include "starcache/star_cache.h"

namespace starrocks {

using DataCacheMetrics = starcache::CacheMetrics;

class KvCache {
public:
virtual ~KvCache() = default;
Expand All @@ -37,7 +40,7 @@ class KvCache {
// Remove data from cache. The offset must be aligned by block size
virtual Status remove_cache(const std::string& key) = 0;

virtual std::unordered_map<std::string, double> cache_stats() = 0;
virtual const DataCacheMetrics cache_metrics() = 0;

virtual Status shutdown() = 0;
};
Expand Down
6 changes: 2 additions & 4 deletions be/src/block_cache/starcache_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,8 @@ Status StarCacheWrapper::remove_cache(const std::string& key) {
return Status::OK();
}

std::unordered_map<std::string, double> StarCacheWrapper::cache_stats() {
// TODO: fill some statistics information
std::unordered_map<std::string, double> stats;
return stats;
const DataCacheMetrics StarCacheWrapper::cache_metrics() {
return _cache->metrics();
}

Status StarCacheWrapper::shutdown() {
Expand Down
2 changes: 1 addition & 1 deletion be/src/block_cache/starcache_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class StarCacheWrapper : public KvCache {

Status remove_cache(const std::string& key) override;

std::unordered_map<std::string, double> cache_stats() override;
const DataCacheMetrics cache_metrics() override;

Status shutdown() override;

Expand Down
18 changes: 16 additions & 2 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include <gperftools/malloc_extension.h>
#endif

#include "block_cache/block_cache.h"
#include "column/column_helper.h"
#include "column/column_pool.h"
#include "common/config.h"
Expand Down Expand Up @@ -150,6 +151,7 @@ void gc_memory(void* arg_this) {
* 3. max io util of all disks
* 4. max network send bytes rate
* 5. max network receive bytes rate
* 6. datacache memory usage
*/
void calculate_metrics(void* arg_this) {
int64_t last_ts = -1L;
Expand Down Expand Up @@ -206,18 +208,30 @@ void calculate_metrics(void* arg_this) {
&lst_net_receive_bytes);
}

// update datacache mem_tracker
auto datacache_mem_tracker = ExecEnv::GetInstance()->datacache_mem_tracker();
int64_t datacache_mem_bytes = 0;
BlockCache* block_cache = BlockCache::instance();
if (block_cache->is_initialized()) {
auto datacache_metrics = block_cache->cache_metrics();
datacache_mem_bytes = datacache_metrics.mem_used_bytes + datacache_metrics.meta_used_bytes;
}
datacache_mem_tracker->set(datacache_mem_bytes);

auto* mem_metrics = StarRocksMetrics::instance()->system_metrics()->memory_metrics();

LOG(INFO) << fmt::format(
"Current memory statistics: process({}), query_pool({}), load({}), "
"metadata({}), compaction({}), schema_change({}), column_pool({}), "
"page_cache({}), update({}), chunk_allocator({}), clone({}), consistency({})",
"page_cache({}), update({}), chunk_allocator({}), clone({}), consistency({}), "
"datacache({})",
mem_metrics->process_mem_bytes.value(), mem_metrics->query_mem_bytes.value(),
mem_metrics->load_mem_bytes.value(), mem_metrics->metadata_mem_bytes.value(),
mem_metrics->compaction_mem_bytes.value(), mem_metrics->schema_change_mem_bytes.value(),
mem_metrics->column_pool_mem_bytes.value(), mem_metrics->storage_page_cache_mem_bytes.value(),
mem_metrics->update_mem_bytes.value(), mem_metrics->chunk_allocator_mem_bytes.value(),
mem_metrics->clone_mem_bytes.value(), mem_metrics->consistency_mem_bytes.value());
mem_metrics->clone_mem_bytes.value(), mem_metrics->consistency_mem_bytes.value(),
datacache_mem_bytes);

nap_sleep(15, [daemon] { return daemon->stopped(); });
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/http/default_path_handlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ void mem_tracker_handler(MemTracker* mem_tracker, const WebPageHandler::Argument
} else if (iter->second == "consistency") {
start_mem_tracker = ExecEnv::GetInstance()->consistency_mem_tracker();
cur_level = 2;
} else if (iter->second == "datacache") {
start_mem_tracker = ExecEnv::GetInstance()->datacache_mem_tracker();
cur_level = 2;
} else {
start_mem_tracker = mem_tracker;
cur_level = 1;
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ Status ExecEnv::init_mem_tracker() {
_clone_mem_tracker = regist_tracker(-1, "clone", process_mem_tracker());
int64_t consistency_mem_limit = calc_max_consistency_memory(process_mem_tracker()->limit());
_consistency_mem_tracker = regist_tracker(consistency_mem_limit, "consistency", process_mem_tracker());
_datacache_mem_tracker = regist_tracker(-1, "datacache", _process_mem_tracker.get());
_replication_mem_tracker = regist_tracker(-1, "replication", _process_mem_tracker.get());

MemChunkAllocator::init_instance(_chunk_allocator_mem_tracker.get(), config::chunk_reserved_bytes_limit);
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ class ExecEnv {
MemTracker* clone_mem_tracker() { return _clone_mem_tracker.get(); }
MemTracker* consistency_mem_tracker() { return _consistency_mem_tracker.get(); }
MemTracker* replication_mem_tracker() { return _replication_mem_tracker.get(); }
MemTracker* datacache_mem_tracker() { return _datacache_mem_tracker.get(); }
std::vector<std::shared_ptr<MemTracker>>& mem_trackers() { return _mem_trackers; }

PriorityThreadPool* thread_pool() { return _thread_pool; }
Expand Down Expand Up @@ -314,6 +315,9 @@ class ExecEnv {

std::shared_ptr<MemTracker> _replication_mem_tracker;

// The memory used for datacache
std::shared_ptr<MemTracker> _datacache_mem_tracker;

std::vector<std::shared_ptr<MemTracker>> _mem_trackers;

PriorityThreadPool* _thread_pool = nullptr;
Expand Down
2 changes: 2 additions & 0 deletions be/src/util/system_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ void SystemMetrics::_install_memory_metrics(MetricRegistry* registry) {
registry->register_metric("chunk_allocator_mem_bytes", &_memory_metrics->chunk_allocator_mem_bytes);
registry->register_metric("clone_mem_bytes", &_memory_metrics->clone_mem_bytes);
registry->register_metric("consistency_mem_bytes", &_memory_metrics->consistency_mem_bytes);
registry->register_metric("datacache_mem_bytes", &_memory_metrics->datacache_mem_bytes);

registry->register_metric("total_column_pool_bytes", &_memory_metrics->column_pool_total_bytes);
registry->register_metric("local_column_pool_bytes", &_memory_metrics->column_pool_local_bytes);
Expand Down Expand Up @@ -361,6 +362,7 @@ void SystemMetrics::_update_memory_metrics() {
SET_MEM_METRIC_VALUE(clone_mem_tracker, clone_mem_bytes)
SET_MEM_METRIC_VALUE(column_pool_mem_tracker, column_pool_mem_bytes)
SET_MEM_METRIC_VALUE(consistency_mem_tracker, consistency_mem_bytes)
SET_MEM_METRIC_VALUE(datacache_mem_tracker, datacache_mem_bytes)
#undef SET_MEM_METRIC_VALUE

#define UPDATE_COLUMN_POOL_METRIC(var, type) \
Expand Down
1 change: 1 addition & 0 deletions be/src/util/system_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class MemoryMetrics {
METRIC_DEFINE_INT_GAUGE(chunk_allocator_mem_bytes, MetricUnit::BYTES);
METRIC_DEFINE_INT_GAUGE(clone_mem_bytes, MetricUnit::BYTES);
METRIC_DEFINE_INT_GAUGE(consistency_mem_bytes, MetricUnit::BYTES);
METRIC_DEFINE_INT_GAUGE(datacache_mem_bytes, MetricUnit::BYTES);

// column pool metrics.
METRIC_DEFINE_INT_GAUGE(column_pool_total_bytes, MetricUnit::BYTES);
Expand Down

0 comments on commit c478845

Please sign in to comment.