Skip to content

Commit

Permalink
[Enhancement] Add datacache memory tracker to trace the datacache mem…
Browse files Browse the repository at this point in the history
…ory usage. (#38884)

Signed-off-by: Gavin <yangguansuo@starrocks.com>
(cherry picked from commit 729b4ed)

# Conflicts:
#	be/src/block_cache/block_cache.h
#	be/src/http/action/memory_metrics_action.cpp
#	be/src/runtime/exec_env.cpp
#	be/src/script/script.cpp
  • Loading branch information
GavinMar authored and mergify[bot] committed Jan 19, 2024
1 parent a2ca674 commit 2e09dda
Show file tree
Hide file tree
Showing 10 changed files with 314 additions and 3 deletions.
5 changes: 4 additions & 1 deletion be/src/block_cache/block_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ Status BlockCache::init(const CacheOptions& options) {
LOG(ERROR) << "unsupported block cache engine: " << options.engine;
return Status::NotSupported("unsupported block cache engine");
}
return _kv_cache->init(options);
RETURN_IF_ERROR(_kv_cache->init(options));
_initialized.store(true, std::memory_order_relaxed);
return Status::OK();
}

Status BlockCache::write_cache(const CacheKey& cache_key, off_t offset, const IOBuffer& buffer, size_t ttl_seconds,
Expand Down Expand Up @@ -139,6 +141,7 @@ Status BlockCache::remove_cache(const CacheKey& cache_key, off_t offset, size_t
Status BlockCache::shutdown() {
Status st = _kv_cache->shutdown();
_kv_cache = nullptr;
_initialized.store(false, std::memory_order_relaxed);
return st;
}

Expand Down
8 changes: 8 additions & 0 deletions be/src/block_cache/block_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,21 @@ class BlockCache {

size_t block_size() const { return _block_size; }

<<<<<<< HEAD
=======
bool is_initialized() { return _initialized.load(std::memory_order_relaxed); }

static const size_t MAX_BLOCK_SIZE;

>>>>>>> 729b4edd95 ([Enhancement] Add datacache memory tracker to trace the datacache memory usage. (#38884))
private:
#ifndef BE_TEST
BlockCache() = default;
#endif

size_t _block_size = 0;
std::unique_ptr<KvCache> _kv_cache;
std::atomic<bool> _initialized = false;
};

} // namespace starrocks
17 changes: 15 additions & 2 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include <gperftools/malloc_extension.h>
#endif

#include "block_cache/block_cache.h"
#include "column/column_helper.h"
#include "column/column_pool.h"
#include "common/config.h"
Expand Down Expand Up @@ -150,6 +151,7 @@ void gc_memory(void* arg_this) {
* 3. max io util of all disks
* 4. max network send bytes rate
* 5. max network receive bytes rate
* 6. datacache memory usage
*/
void calculate_metrics(void* arg_this) {
int64_t last_ts = -1L;
Expand Down Expand Up @@ -206,18 +208,29 @@ void calculate_metrics(void* arg_this) {
&lst_net_receive_bytes);
}

// update datacache mem_tracker
auto datacache_mem_tracker = GlobalEnv::GetInstance()->datacache_mem_tracker();
int64_t datacache_mem_bytes = 0;
BlockCache* block_cache = BlockCache::instance();
if (block_cache->is_initialized()) {
auto datacache_metrics = block_cache->cache_metrics();
datacache_mem_bytes = datacache_metrics.mem_used_bytes + datacache_metrics.meta_used_bytes;
}
datacache_mem_tracker->set(datacache_mem_bytes);

auto* mem_metrics = StarRocksMetrics::instance()->system_metrics()->memory_metrics();

LOG(INFO) << fmt::format(
"Current memory statistics: process({}), query_pool({}), load({}), "
"metadata({}), compaction({}), schema_change({}), column_pool({}), "
"page_cache({}), update({}), chunk_allocator({}), clone({}), consistency({})",
"page_cache({}), update({}), chunk_allocator({}), clone({}), consistency({}), "
"datacache({})",
mem_metrics->process_mem_bytes.value(), mem_metrics->query_mem_bytes.value(),
mem_metrics->load_mem_bytes.value(), mem_metrics->metadata_mem_bytes.value(),
mem_metrics->compaction_mem_bytes.value(), mem_metrics->schema_change_mem_bytes.value(),
mem_metrics->column_pool_mem_bytes.value(), mem_metrics->storage_page_cache_mem_bytes.value(),
mem_metrics->update_mem_bytes.value(), mem_metrics->chunk_allocator_mem_bytes.value(),
mem_metrics->clone_mem_bytes.value(), mem_metrics->consistency_mem_bytes.value());
mem_metrics->clone_mem_bytes.value(), mem_metrics->consistency_mem_bytes.value(), datacache_mem_bytes);

nap_sleep(15, [daemon] { return daemon->stopped(); });
}
Expand Down
99 changes: 99 additions & 0 deletions be/src/http/action/memory_metrics_action.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "http/action/memory_metrics_action.h"

#include <runtime/exec_env.h>
#include <runtime/mem_tracker.h>

#include "common/tracer.h"
#include "http/http_channel.h"
#include "http/http_headers.h"
#include "http/http_request.h"

namespace starrocks {

void MemoryMetricsAction::handle(HttpRequest* req) {
LOG(INFO) << "Start collect memory metrics.";
auto scoped_span = trace::Scope(Tracer::Instance().start_trace("http_handle_memory_metrics"));
MemTracker* process_mem_tracker = GlobalEnv::GetInstance()->process_mem_tracker();
std::stringstream result;
std::vector<std::string> metric_labels_to_print = {"process",
"query_pool",
"load",
"metadata",
"tablet_metadata",
"rowset_metadata",
"segment_metadata",
"column_metadata",
"tablet_schema",
"segment_zonemap",
"short_key_index",
"column_zonemap_index",
"ordinal_index",
"bitmap_index",
"bloom_filter_index",
"compaction",
"schema_change",
"column_pool",
"page_cache",
"datacache",
"update",
"chunk_allocator",
"clone",
"consistency",
"rowset_update_state",
"index_cache",
"del_vec_cache",
"compaction_state"};
result << "[";
getMemoryMetricTree(process_mem_tracker, result, process_mem_tracker->consumption(), metric_labels_to_print);
result << ",";
getMemoryMetricTree(GlobalEnv::GetInstance()->metadata_mem_tracker(), result, process_mem_tracker->consumption(),
metric_labels_to_print);
result << ",";
getMemoryMetricTree(GlobalEnv::GetInstance()->update_mem_tracker(), result, process_mem_tracker->consumption(),
metric_labels_to_print);
result << "]";
req->add_output_header(HttpHeaders::CONTENT_TYPE, "text/plain; version=0.0.4");
LOG(INFO) << "End collect memory metrics. " << result.str();

HttpChannel::send_reply(req, result.str());
}
void MemoryMetricsAction::getMemoryMetricTree(MemTracker* memTracker, std::stringstream& result, int64_t total_size,
std::vector<std::string> metric_labels_to_print) {
result << "{";
result << R"("name":")" << memTracker->label() << "\",";
result << R"("size":")" << memTracker->consumption() << "\",";
result << R"("percent":")" << std::setprecision(3)
<< static_cast<double>(memTracker->consumption()) / total_size * 100 << "%\",";
result << "\"child\":[";
for (const auto& child : memTracker->getChild()) {
if (find(metric_labels_to_print.begin(), metric_labels_to_print.end(), child->label()) ==
metric_labels_to_print.end()) {
break;
}
if (child != memTracker->getChild().front()) {
result << ",";
}
getMemoryMetricTree(child, result, total_size, metric_labels_to_print);
}

result << "]}";
}

} // namespace starrocks
3 changes: 3 additions & 0 deletions be/src/http/default_path_handlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ void mem_tracker_handler(MemTracker* mem_tracker, const WebPageHandler::Argument
} else if (iter->second == "consistency") {
start_mem_tracker = ExecEnv::GetInstance()->consistency_mem_tracker();
cur_level = 2;
} else if (iter->second == "datacache") {
start_mem_tracker = GlobalEnv::GetInstance()->datacache_mem_tracker();
cur_level = 2;
} else {
start_mem_tracker = mem_tracker;
cur_level = 1;
Expand Down
138 changes: 138 additions & 0 deletions be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,145 @@ bool ExecEnv::is_init() {
return _is_init;
}

<<<<<<< HEAD
Status ExecEnv::_init(const std::vector<StorePath>& store_paths, bool as_cn) {
=======
Status GlobalEnv::init() {
RETURN_IF_ERROR(_init_mem_tracker());
_is_init = true;
return Status::OK();
}

Status GlobalEnv::_init_mem_tracker() {
int64_t bytes_limit = 0;
std::stringstream ss;
// --mem_limit="" means no memory limit
bytes_limit = ParseUtil::parse_mem_spec(config::mem_limit, MemInfo::physical_mem());
// use 90% of mem_limit as the soft mem limit of BE
bytes_limit = bytes_limit * 0.9;
if (bytes_limit <= 0) {
ss << "Failed to parse mem limit from '" + config::mem_limit + "'.";
return Status::InternalError(ss.str());
}

if (bytes_limit > MemInfo::physical_mem()) {
LOG(WARNING) << "Memory limit " << PrettyPrinter::print(bytes_limit, TUnit::BYTES)
<< " exceeds physical memory of " << PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES)
<< ". Using physical memory instead";
bytes_limit = MemInfo::physical_mem();
}

if (bytes_limit <= 0) {
ss << "Invalid mem limit: " << bytes_limit;
return Status::InternalError(ss.str());
}

_process_mem_tracker = regist_tracker(MemTracker::PROCESS, bytes_limit, "process");
int64_t query_pool_mem_limit =
calc_max_query_memory(_process_mem_tracker->limit(), config::query_max_memory_limit_percent);
_query_pool_mem_tracker =
regist_tracker(MemTracker::QUERY_POOL, query_pool_mem_limit, "query_pool", this->process_mem_tracker());
_connector_scan_pool_mem_tracker =
regist_tracker(MemTracker::QUERY_POOL, query_pool_mem_limit * config::connector_scan_use_query_mem_ratio,
"query_pool/connector_scan", nullptr);

int64_t load_mem_limit = calc_max_load_memory(_process_mem_tracker->limit());
_load_mem_tracker = regist_tracker(MemTracker::LOAD, load_mem_limit, "load", process_mem_tracker());

// Metadata statistics memory statistics do not use new mem statistics framework with hook
_metadata_mem_tracker = regist_tracker(-1, "metadata", nullptr);

_tablet_metadata_mem_tracker = regist_tracker(-1, "tablet_metadata", _metadata_mem_tracker.get());
_rowset_metadata_mem_tracker = regist_tracker(-1, "rowset_metadata", _metadata_mem_tracker.get());
_segment_metadata_mem_tracker = regist_tracker(-1, "segment_metadata", _metadata_mem_tracker.get());
_column_metadata_mem_tracker = regist_tracker(-1, "column_metadata", _metadata_mem_tracker.get());

_tablet_schema_mem_tracker = regist_tracker(-1, "tablet_schema", _tablet_metadata_mem_tracker.get());
_segment_zonemap_mem_tracker = regist_tracker(-1, "segment_zonemap", _segment_metadata_mem_tracker.get());
_short_key_index_mem_tracker = regist_tracker(-1, "short_key_index", _segment_metadata_mem_tracker.get());
_column_zonemap_index_mem_tracker = regist_tracker(-1, "column_zonemap_index", _column_metadata_mem_tracker.get());
_ordinal_index_mem_tracker = regist_tracker(-1, "ordinal_index", _column_metadata_mem_tracker.get());
_bitmap_index_mem_tracker = regist_tracker(-1, "bitmap_index", _column_metadata_mem_tracker.get());
_bloom_filter_index_mem_tracker = regist_tracker(-1, "bloom_filter_index", _column_metadata_mem_tracker.get());

int64_t compaction_mem_limit = calc_max_compaction_memory(_process_mem_tracker->limit());
_compaction_mem_tracker = regist_tracker(compaction_mem_limit, "compaction", _process_mem_tracker.get());
_schema_change_mem_tracker = regist_tracker(-1, "schema_change", _process_mem_tracker.get());
_column_pool_mem_tracker = regist_tracker(-1, "column_pool", _process_mem_tracker.get());
_page_cache_mem_tracker = regist_tracker(-1, "page_cache", _process_mem_tracker.get());
int32_t update_mem_percent = std::max(std::min(100, config::update_memory_limit_percent), 0);
_update_mem_tracker = regist_tracker(bytes_limit * update_mem_percent / 100, "update", nullptr);
_chunk_allocator_mem_tracker = regist_tracker(-1, "chunk_allocator", _process_mem_tracker.get());
_clone_mem_tracker = regist_tracker(-1, "clone", _process_mem_tracker.get());
int64_t consistency_mem_limit = calc_max_consistency_memory(_process_mem_tracker->limit());
_consistency_mem_tracker = regist_tracker(consistency_mem_limit, "consistency", _process_mem_tracker.get());
_datacache_mem_tracker = regist_tracker(-1, "datacache", _process_mem_tracker.get());
_replication_mem_tracker = regist_tracker(-1, "replication", _process_mem_tracker.get());

MemChunkAllocator::init_instance(_chunk_allocator_mem_tracker.get(), config::chunk_reserved_bytes_limit);

SetMemTrackerForColumnPool op(_column_pool_mem_tracker);
ForEach<ColumnPoolList>(op);
_init_storage_page_cache(); // TODO: move to StorageEngine
return Status::OK();
}

void GlobalEnv::_reset_tracker() {
for (auto iter = _mem_trackers.rbegin(); iter != _mem_trackers.rend(); ++iter) {
iter->reset();
}
}

void GlobalEnv::_init_storage_page_cache() {
int64_t storage_cache_limit = get_storage_page_cache_size();
storage_cache_limit = check_storage_page_cache_size(storage_cache_limit);
StoragePageCache::create_global_cache(page_cache_mem_tracker(), storage_cache_limit);
}

int64_t GlobalEnv::get_storage_page_cache_size() {
std::lock_guard<std::mutex> l(*config::get_mstring_conf_lock());
int64_t mem_limit = MemInfo::physical_mem();
if (process_mem_tracker()->has_limit()) {
mem_limit = process_mem_tracker()->limit();
}
return ParseUtil::parse_mem_spec(config::storage_page_cache_limit, mem_limit);
}

int64_t GlobalEnv::check_storage_page_cache_size(int64_t storage_cache_limit) {
if (storage_cache_limit > MemInfo::physical_mem()) {
LOG(WARNING) << "Config storage_page_cache_limit is greater than memory size, config="
<< config::storage_page_cache_limit << ", memory=" << MemInfo::physical_mem();
}
if (!config::disable_storage_page_cache) {
if (storage_cache_limit < kcacheMinSize) {
LOG(WARNING) << "Storage cache limit is too small, use default size.";
storage_cache_limit = kcacheMinSize;
}
LOG(INFO) << "Set storage page cache size " << storage_cache_limit;
}
return storage_cache_limit;
}

template <class... Args>
std::shared_ptr<MemTracker> GlobalEnv::regist_tracker(Args&&... args) {
auto mem_tracker = std::make_shared<MemTracker>(std::forward<Args>(args)...);
_mem_trackers.emplace_back(mem_tracker);
return mem_tracker;
}

int64_t GlobalEnv::calc_max_query_memory(int64_t process_mem_limit, int64_t percent) {
if (process_mem_limit <= 0) {
// -1 means no limit
return -1;
}
if (percent < 0 || percent > 100) {
percent = 90;
}
return process_mem_limit * percent / 100;
}

Status ExecEnv::init(const std::vector<StorePath>& store_paths, bool as_cn) {
>>>>>>> 729b4edd95 ([Enhancement] Add datacache memory tracker to trace the datacache memory usage. (#38884))
_store_paths = store_paths;
_external_scan_context_mgr = new ExternalScanContextMgr(this);
_metrics = StarRocksMetrics::instance()->metrics();
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ class ExecEnv {
MemTracker* clone_mem_tracker() { return _clone_mem_tracker.get(); }
MemTracker* consistency_mem_tracker() { return _consistency_mem_tracker.get(); }
MemTracker* replication_mem_tracker() { return _replication_mem_tracker.get(); }
MemTracker* datacache_mem_tracker() { return _datacache_mem_tracker.get(); }
std::vector<std::shared_ptr<MemTracker>>& mem_trackers() { return _mem_trackers; }

PriorityThreadPool* thread_pool() { return _thread_pool; }
Expand Down Expand Up @@ -314,6 +315,9 @@ class ExecEnv {

std::shared_ptr<MemTracker> _replication_mem_tracker;

// The memory used for datacache
std::shared_ptr<MemTracker> _datacache_mem_tracker;

std::vector<std::shared_ptr<MemTracker>> _mem_trackers;

PriorityThreadPool* _thread_pool = nullptr;
Expand Down

0 comments on commit 2e09dda

Please sign in to comment.