Skip to content

Commit

Permalink
dd a method to get doris current memory usage (apache#6979)
Browse files Browse the repository at this point in the history
Add all memory usage check when TryConsume memory
  • Loading branch information
yangzhg committed Nov 24, 2021
1 parent ad0d2b8 commit e2d3d01
Show file tree
Hide file tree
Showing 18 changed files with 147 additions and 85 deletions.
4 changes: 3 additions & 1 deletion be/src/exec/hash_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ void HashTable::resize_buckets(int64_t num_buckets) {

int64_t old_num_buckets = _num_buckets;
int64_t delta_bytes = (num_buckets - old_num_buckets) * sizeof(Bucket);
if (!_mem_tracker->TryConsume(delta_bytes)) {
Status st = _mem_tracker->TryConsume(delta_bytes);
WARN_IF_ERROR(st, "resize bucket failed");
if (!st) {
mem_limit_exceeded(delta_bytes);
return;
}
Expand Down
4 changes: 3 additions & 1 deletion be/src/exec/partitioned_hash_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,9 @@ Status PartitionedHashTableCtx::ExprValuesCache::Init(RuntimeState* state,
MAX_EXPR_VALUES_ARRAY_SIZE / expr_values_bytes_per_row_));

int mem_usage = MemUsage(capacity_, expr_values_bytes_per_row_, num_exprs_);
if (UNLIKELY(!tracker->TryConsume(mem_usage))) {
Status st = tracker->TryConsume(mem_usage);
WARN_IF_ERROR(st, "PartitionedHashTableCtx::ExprValuesCache failed");
if (UNLIKELY(!st)) {
capacity_ = 0;
string details = Substitute(
"PartitionedHashTableCtx::ExprValuesCache failed to allocate $0 bytes.", mem_usage);
Expand Down
23 changes: 15 additions & 8 deletions be/src/runtime/buffered_block_mgr2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,9 @@ bool BufferedBlockMgr2::consume_memory(Client* client, int64_t size) {
}
int buffers_needed = BitUtil::ceil(size, max_block_size());
unique_lock<mutex> lock(_lock);

if (size < max_block_size() && _mem_tracker->TryConsume(size)) {
Status st = _mem_tracker->TryConsume(size);
WARN_IF_ERROR(st, "consume failed");
if (size < max_block_size() && st) {
// For small allocations (less than a block size), just let the allocation through.
client->_tracker->ConsumeLocal(size, client->_query_tracker.get());
// client->_tracker->Consume(size);
Expand All @@ -335,8 +336,9 @@ bool BufferedBlockMgr2::consume_memory(Client* client, int64_t size) {
if (available_buffers(client) + client->_num_tmp_reserved_buffers < buffers_needed) {
return false;
}

if (_mem_tracker->TryConsume(size)) {
st = _mem_tracker->TryConsume(size);
WARN_IF_ERROR(st, "consume failed");
if (st) {
// There was still unallocated memory, don't need to recycle allocated blocks.
client->_tracker->ConsumeLocal(size, client->_query_tracker.get());
// client->_tracker->Consume(size);
Expand Down Expand Up @@ -393,7 +395,9 @@ bool BufferedBlockMgr2::consume_memory(Client* client, int64_t size) {

DCHECK_GE(buffers_acquired * max_block_size(), size);
_mem_tracker->Release(buffers_acquired * max_block_size());
if (!_mem_tracker->TryConsume(size)) {
st = _mem_tracker->TryConsume(size);
WARN_IF_ERROR(st, "consume failed");
if (!st) {
return false;
}
client->_tracker->ConsumeLocal(size, client->_query_tracker.get());
Expand Down Expand Up @@ -465,7 +469,9 @@ Status BufferedBlockMgr2::get_new_block(Client* client, Block* unpin_block, Bloc

if (len > 0 && len < _max_block_size) {
DCHECK(unpin_block == nullptr);
if (client->_tracker->TryConsume(len)) {
Status st = client->_tracker->TryConsume(len);
WARN_IF_ERROR(st, "get_new_block failed");
if (st) {
// TODO: Have a cache of unused blocks of size 'len' (0, _max_block_size)
uint8_t* buffer = new uint8_t[len];
// Descriptors for non-I/O sized buffers are deleted when the block is deleted.
Expand Down Expand Up @@ -1088,9 +1094,10 @@ Status BufferedBlockMgr2::find_buffer_for_block(Block* block, bool* in_mem) {
Status BufferedBlockMgr2::find_buffer(unique_lock<mutex>& lock, BufferDescriptor** buffer_desc) {
*buffer_desc = nullptr;

Status st = _mem_tracker->TryConsume(_max_block_size);
WARN_IF_ERROR(st, "try to allocate a new buffer failed");
// First, try to allocate a new buffer.
if (_free_io_buffers.size() < _block_write_threshold &&
_mem_tracker->TryConsume(_max_block_size)) {
if (_free_io_buffers.size() < _block_write_threshold && st) {
uint8_t* new_buffer = new uint8_t[_max_block_size];
*buffer_desc = _obj_pool.add(new BufferDescriptor(new_buffer, _max_block_size));
(*buffer_desc)->all_buffers_it =
Expand Down
4 changes: 3 additions & 1 deletion be/src/runtime/bufferpool/reservation_tracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,9 @@ bool ReservationTracker::TryConsumeFromMemTracker(int64_t reservation_increase)
if (GetParentMemTracker() == nullptr) {
// At the topmost link, which may be a MemTracker with a limit, we need to use
// TryConsume() to check the limit.
return mem_tracker_->TryConsume(reservation_increase);
Status st = mem_tracker_->TryConsume(reservation_increase);
WARN_IF_ERROR(st, "TryConsumeFromMemTracker failed");
return st.ok();
} else {
// For lower links, there shouldn't be a limit to enforce, so we just need to
// update the consumption of the linked MemTracker since the reservation is
Expand Down
76 changes: 40 additions & 36 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scanner_thread_pool_queue_size, MetricUnit::N
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(etl_thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_thread_num, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(query_mem_consumption, MetricUnit::BYTES, "",
mem_consumption, Labels({{"type", "query"}}));
DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(query_mem_consumption, MetricUnit::BYTES, "", mem_consumption,
Labels({{"type", "query"}}));

Status ExecEnv::init(ExecEnv* env, const std::vector<StorePath>& store_paths) {
return env->_init(store_paths);
Expand All @@ -95,20 +95,20 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_pool_mem_trackers = new PoolMemTrackerRegistry();
_thread_mgr = new ThreadResourceMgr();
_scan_thread_pool = new PriorityThreadPool(config::doris_scanner_thread_pool_thread_num,
config::doris_scanner_thread_pool_queue_size);
config::doris_scanner_thread_pool_queue_size);

ThreadPoolBuilder("LimitedScanThreadPool")
.set_min_threads(1)
.set_min_threads(1)
.set_max_threads(config::doris_scanner_thread_pool_thread_num)
.set_max_queue_size(config::doris_scanner_thread_pool_queue_size)
.build(&_limited_scan_thread_pool);
.build(&_limited_scan_thread_pool);

ThreadPoolBuilder("SendBatchThreadPool")
.set_min_threads(1)
.set_max_threads(config::send_batch_thread_pool_thread_num)
.set_max_queue_size(config::send_batch_thread_pool_queue_size)
.build(&_send_batch_thread_pool);

_etl_thread_pool = new PriorityThreadPool(config::etl_thread_pool_size,
config::etl_thread_pool_queue_size);
_cgroups_mgr = new CgroupsMgr(this, config::doris_cgroups);
Expand Down Expand Up @@ -158,34 +158,38 @@ Status ExecEnv::_init_mem_tracker() {
int64_t global_memory_limit_bytes = 0;
bool is_percent = false;
std::stringstream ss;
global_memory_limit_bytes = ParseUtil::parse_mem_spec(config::mem_limit, -1, &is_percent);
global_memory_limit_bytes =
ParseUtil::parse_mem_spec(config::mem_limit, -1, MemInfo::physical_mem(), &is_percent);
if (global_memory_limit_bytes <= 0) {
ss << "Failed to parse mem limit from '" + config::mem_limit + "'.";
return Status::InternalError(ss.str());
}

if (global_memory_limit_bytes > MemInfo::physical_mem()) {
LOG(WARNING) << "Memory limit " << PrettyPrinter::print(global_memory_limit_bytes, TUnit::BYTES)
LOG(WARNING) << "Memory limit "
<< PrettyPrinter::print(global_memory_limit_bytes, TUnit::BYTES)
<< " exceeds physical memory of "
<< PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES)
<< ". Using physical memory instead";
global_memory_limit_bytes = MemInfo::physical_mem();
}
_mem_tracker = MemTracker::CreateTracker(global_memory_limit_bytes, "Process", MemTracker::GetRootTracker(),
false, false, MemTrackerLevel::OVERVIEW);
REGISTER_HOOK_METRIC(query_mem_consumption, [this]() {
return _mem_tracker->consumption();
});
LOG(INFO) << "Using global memory limit: " << PrettyPrinter::print(global_memory_limit_bytes, TUnit::BYTES)
<< ", origin config value: " << config::mem_limit;
_mem_tracker = MemTracker::CreateTracker(global_memory_limit_bytes, "Process",
MemTracker::GetRootTracker(), false, false,
MemTrackerLevel::OVERVIEW);
REGISTER_HOOK_METRIC(query_mem_consumption, [this]() { return _mem_tracker->consumption(); });
LOG(INFO) << "Using global memory limit: "
<< PrettyPrinter::print(global_memory_limit_bytes, TUnit::BYTES)
<< ", origin config value: " << config::mem_limit;

// 2. init buffer pool
if (!BitUtil::IsPowerOf2(config::min_buffer_size)) {
ss << "Config min_buffer_size must be a power-of-two: " << config::min_buffer_size;
return Status::InternalError(ss.str());
}

int64_t buffer_pool_limit = ParseUtil::parse_mem_spec(config::buffer_pool_limit, global_memory_limit_bytes, &is_percent);
int64_t buffer_pool_limit =
ParseUtil::parse_mem_spec(config::buffer_pool_limit, global_memory_limit_bytes,
MemInfo::physical_mem(), &is_percent);
if (buffer_pool_limit <= 0) {
ss << "Invalid config buffer_pool_limit value, must be a percentage or "
"positive bytes value or percentage: "
Expand All @@ -201,7 +205,8 @@ Status ExecEnv::_init_mem_tracker() {
}

int64_t clean_pages_limit =
ParseUtil::parse_mem_spec(config::buffer_pool_clean_pages_limit, buffer_pool_limit, &is_percent);
ParseUtil::parse_mem_spec(config::buffer_pool_clean_pages_limit, buffer_pool_limit,
MemInfo::physical_mem(), &is_percent);
if (clean_pages_limit <= 0) {
ss << "Invalid buffer_pool_clean_pages_limit value, must be a percentage or "
"positive bytes value or percentage: "
Expand All @@ -213,22 +218,25 @@ Status ExecEnv::_init_mem_tracker() {
clean_pages_limit = clean_pages_limit / 2;
}
_init_buffer_pool(config::min_buffer_size, buffer_pool_limit, clean_pages_limit);
LOG(INFO) << "Buffer pool memory limit: " << PrettyPrinter::print(buffer_pool_limit, TUnit::BYTES)
<< ", origin config value: " << config::buffer_pool_limit
<< ". clean pages limit: " << PrettyPrinter::print(clean_pages_limit, TUnit::BYTES)
<< ", origin config value: " << config::buffer_pool_clean_pages_limit;
LOG(INFO) << "Buffer pool memory limit: "
<< PrettyPrinter::print(buffer_pool_limit, TUnit::BYTES)
<< ", origin config value: " << config::buffer_pool_limit
<< ". clean pages limit: " << PrettyPrinter::print(clean_pages_limit, TUnit::BYTES)
<< ", origin config value: " << config::buffer_pool_clean_pages_limit;

// 3. init storage page cache
int64_t storage_cache_limit =
ParseUtil::parse_mem_spec(config::storage_page_cache_limit, global_memory_limit_bytes, &is_percent);
ParseUtil::parse_mem_spec(config::storage_page_cache_limit, global_memory_limit_bytes,
MemInfo::physical_mem(), &is_percent);
while (!is_percent && storage_cache_limit > global_memory_limit_bytes / 2) {
// Reason same as buffer_pool_limit
storage_cache_limit = storage_cache_limit / 2;
}
int32_t index_page_cache_percentage = config::index_page_cache_percentage;
StoragePageCache::create_global_cache(storage_cache_limit, index_page_cache_percentage);
LOG(INFO) << "Storage page cache memory limit: " << PrettyPrinter::print(storage_cache_limit, TUnit::BYTES)
<< ", origin config value: " << config::storage_page_cache_limit;
LOG(INFO) << "Storage page cache memory limit: "
<< PrettyPrinter::print(storage_cache_limit, TUnit::BYTES)
<< ", origin config value: " << config::storage_page_cache_limit;

SegmentLoader::create_global_instance(config::segment_cache_capacity);

Expand All @@ -250,21 +258,17 @@ void ExecEnv::_init_buffer_pool(int64_t min_page_size, int64_t capacity,
}

void ExecEnv::_register_metrics() {
REGISTER_HOOK_METRIC(scanner_thread_pool_queue_size, [this]() {
return _scan_thread_pool->get_queue_size();
});
REGISTER_HOOK_METRIC(scanner_thread_pool_queue_size,
[this]() { return _scan_thread_pool->get_queue_size(); });

REGISTER_HOOK_METRIC(etl_thread_pool_queue_size, [this]() {
return _etl_thread_pool->get_queue_size();
});
REGISTER_HOOK_METRIC(etl_thread_pool_queue_size,
[this]() { return _etl_thread_pool->get_queue_size(); });

REGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num, [this]() {
return _send_batch_thread_pool->num_threads();
});
REGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num,
[this]() { return _send_batch_thread_pool->num_threads(); });

REGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size, [this]() {
return _send_batch_thread_pool->get_queue_size();
});
REGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size,
[this]() { return _send_batch_thread_pool->get_queue_size(); });
}

void ExecEnv::_deregister_metrics() {
Expand Down
4 changes: 3 additions & 1 deletion be/src/runtime/mem_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ bool MemPool::find_chunk(size_t min_size, bool check_limits) {

chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size);
if (check_limits) {
if (!mem_tracker_->TryConsume(chunk_size)) return false;
Status st = mem_tracker_->TryConsume(chunk_size);
WARN_IF_ERROR(st, "try to allocate a new buffer failed");
if (!st) return false;
} else {
mem_tracker_->Consume(chunk_size);
}
Expand Down
27 changes: 17 additions & 10 deletions be/src/runtime/mem_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include "common/status.h"
#include "gen_cpp/Types_types.h" // for TUniqueId
#include "util/mem_info.h"
#include "util/metrics.h"
#include "util/runtime_profile.h"
#include "util/spinlock.h"
Expand Down Expand Up @@ -166,11 +167,16 @@ class MemTracker : public std::enable_shared_from_this<MemTracker> {
/// other callers that may not tolerate allocation failures have a better chance
/// of success. Returns true if the consumption was successfully updated.
WARN_UNUSED_RESULT
bool TryConsume(int64_t bytes, MemLimit mode = MemLimit::HARD) {
Status TryConsume(int64_t bytes, MemLimit mode = MemLimit::HARD) {
// DCHECK_GE(bytes, 0);
if (bytes <= 0) {
Release(-bytes);
return true;
return Status::OK();
}
if (MemInfo::current_mem() + bytes >= MemInfo::mem_limit()) {
return Status::MemoryLimitExceeded(fmt::format(
"{}: TryConsume failed, bytes={} process whole consumption={} mem limit={}",
label_, bytes, MemInfo::current_mem(), MemInfo::mem_limit()));
}
// if (UNLIKELY(bytes == 0)) return true;
// if (UNLIKELY(bytes < 0)) return false; // needed in RELEASE, hits DCHECK in DEBUG
Expand All @@ -189,26 +195,27 @@ class MemTracker : public std::enable_shared_from_this<MemTracker> {
while (true) {
if (LIKELY(tracker->consumption_->try_add(bytes, limit))) break;

VLOG_RPC << "TryConsume failed, bytes=" << bytes
<< " consumption=" << tracker->consumption_->current_value()
<< " limit=" << limit << " attempting to GC";
if (UNLIKELY(tracker->GcMemory(limit - bytes))) {
DCHECK_GE(i, 0);
// Failed for this mem tracker. Roll back the ones that succeeded.
for (int j = all_trackers_.size() - 1; j > i; --j) {
all_trackers_[j]->consumption_->add(-bytes);
}
return false;
return Status::MemoryLimitExceeded(fmt::format(
"{}: TryConsume failed, bytes={} consumption={} imit={} "
"attempting to GC",
tracker->label(), bytes, tracker->consumption_->current_value(),
limit));
}
VLOG_RPC << "GC succeeded, TryConsume bytes=" << bytes
<< " consumption=" << tracker->consumption_->current_value()
<< " limit=" << limit;
VLOG_NOTICE << "GC succeeded, TryConsume bytes=" << bytes
<< " consumption=" << tracker->consumption_->current_value()
<< " limit=" << limit;
}
}
}
// Everyone succeeded, return.
DCHECK_EQ(i, -1);
return true;
return Status::OK();
}

/// Decreases consumption of this tracker and its ancestors by 'bytes'.
Expand Down
4 changes: 4 additions & 0 deletions be/src/service/doris_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ int main(int argc, char** argv) {
#if defined(LEAK_SANITIZER)
__lsan_do_leak_check();
#endif

#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER)
doris::MemInfo::refresh_current_mem();
#endif
sleep(10);
}

Expand Down
13 changes: 11 additions & 2 deletions be/src/util/mem_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,19 @@
#include <iostream>
#include <sstream>

#include "common/config.h"
#include "gutil/strings/split.h"
#include "util/cgroup_util.h"
#include "util/parse_util.h"
#include "util/pretty_printer.h"
#include "util/string_parser.hpp"

namespace doris {

bool MemInfo::_s_initialized = false;
int64_t MemInfo::_s_physical_mem = -1;
int64_t MemInfo::_s_mem_limit = -1;
size_t MemInfo::_s_current_mem = 0;

void MemInfo::init() {
// Read from /proc/meminfo
Expand Down Expand Up @@ -79,16 +83,21 @@ void MemInfo::init() {
LOG(WARNING) << "Could not determine amount of physical memory on this machine.";
}

LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES);
bool is_percent = true;
_s_mem_limit = ParseUtil::parse_mem_spec(config::mem_limit, -1, _s_physical_mem, &is_percent);

LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES);
_s_initialized = true;
}

std::string MemInfo::debug_string() {
DCHECK(_s_initialized);
CGroupUtil util;
std::stringstream stream;
stream << "Mem Info: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES) << std::endl;
stream << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES)
<< std::endl;
stream << "Memory Limt: " << PrettyPrinter::print(_s_mem_limit, TUnit::BYTES) << std::endl;
stream << "Current Usage: " << PrettyPrinter::print(_s_current_mem, TUnit::BYTES) << std::endl;
stream << "CGroup Info: " << util.debug_string() << std::endl;
return stream.str();
}
Expand Down

0 comments on commit e2d3d01

Please sign in to comment.