Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 17 additions & 22 deletions be/src/runtime/memory/mem_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct TrackerGroup {
// Multiple groups are used to reduce the impact of locks.
static std::vector<TrackerGroup> mem_tracker_pool(1000);

NewMemTracker::NewMemTracker(const std::string& label, RuntimeProfile* profile, bool is_limiter) {
NewMemTracker::NewMemTracker(const std::string& label, RuntimeProfile* profile) {
if (profile == nullptr) {
_consumption = std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES);
} else {
Expand All @@ -58,30 +58,24 @@ NewMemTracker::NewMemTracker(const std::string& label, RuntimeProfile* profile,
_consumption = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES);
}

_is_limiter = is_limiter;
if (!_is_limiter) {
if (thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()) {
_label = fmt::format(
"{} | {}", label,
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->label());
} else {
_label = label + " | ";
}

_bind_group_num =
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->group_num();
{
std::lock_guard<std::mutex> l(mem_tracker_pool[_bind_group_num].group_lock);
_tracker_group_it = mem_tracker_pool[_bind_group_num].trackers.insert(
mem_tracker_pool[_bind_group_num].trackers.end(), this);
}
if (thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()) {
_label = fmt::format(
"{} | {}", label,
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->label());
} else {
_label = label;
_label = label + " | ";
}

_bind_group_num = thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->group_num();
{
std::lock_guard<std::mutex> l(mem_tracker_pool[_bind_group_num].group_lock);
_tracker_group_it = mem_tracker_pool[_bind_group_num].trackers.insert(
mem_tracker_pool[_bind_group_num].trackers.end(), this);
}
}

NewMemTracker::~NewMemTracker() {
if (!_is_limiter) {
if (_bind_group_num != -1) {
std::lock_guard<std::mutex> l(mem_tracker_pool[_bind_group_num].group_lock);
if (_tracker_group_it != mem_tracker_pool[_bind_group_num].trackers.end()) {
mem_tracker_pool[_bind_group_num].trackers.erase(_tracker_group_it);
Expand All @@ -102,8 +96,9 @@ NewMemTracker::Snapshot NewMemTracker::make_snapshot(size_t level) const {
return snapshot;
}

void NewMemTracker::make_group_snapshot(std::vector<NewMemTracker::Snapshot>* snapshots, size_t level,
int64_t group_num, std::string related_label) {
void NewMemTracker::make_group_snapshot(std::vector<NewMemTracker::Snapshot>* snapshots,
size_t level, int64_t group_num,
std::string related_label) {
std::lock_guard<std::mutex> l(mem_tracker_pool[group_num].group_lock);
for (auto tracker : mem_tracker_pool[group_num].trackers) {
if (split(tracker->label(), " | ")[1] == related_label) {
Expand Down
8 changes: 3 additions & 5 deletions be/src/runtime/memory/mem_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ class NewMemTracker {
};

// Creates and adds the tracker to the mem_tracker_pool.
NewMemTracker(const std::string& label = std::string(), RuntimeProfile* profile = nullptr,
bool is_limiter = false);
NewMemTracker(const std::string& label, RuntimeProfile* profile = nullptr);
// For MemTrackerLimiter
NewMemTracker() { _bind_group_num = -1; }

~NewMemTracker();

Expand Down Expand Up @@ -93,9 +94,6 @@ class NewMemTracker {
// Tracker is located in group num in mem_tracker_pool
int64_t _bind_group_num;

// Whether is a MemTrackerLimiter
bool _is_limiter;

// Iterator into mem_tracker_pool for this object. Stored to have O(1) remove.
std::list<NewMemTracker*>::iterator _tracker_group_it;
};
Expand Down
70 changes: 56 additions & 14 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,14 @@ namespace doris {

MemTrackerLimiter::MemTrackerLimiter(int64_t byte_limit, const std::string& label,
const std::shared_ptr<MemTrackerLimiter>& parent,
RuntimeProfile* profile)
: NewMemTracker(label, profile, true) {
RuntimeProfile* profile) {
DCHECK_GE(byte_limit, -1);
if (profile == nullptr) {
_consumption = std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES);
} else {
_consumption = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES);
}
_label = label;
_limit = byte_limit;
_group_num = GetCurrentTimeMicros() % 1000;
_parent = parent ? parent : thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker();
Expand Down Expand Up @@ -143,9 +148,9 @@ bool MemTrackerLimiter::gc_memory(int64_t max_consumption) {

Status MemTrackerLimiter::try_gc_memory(int64_t bytes) {
if (UNLIKELY(gc_memory(_limit - bytes))) {
return Status::MemoryLimitExceeded(
fmt::format("label={}, limit={}, used={}, failed consume size={}", label(), _limit,
_consumption->current_value(), bytes));
return Status::MemoryLimitExceeded(fmt::format(
"need_size={}, exceeded_tracker={}, limit={}, peak_used={}, current_used={}", bytes,
label(), _limit, _consumption->value(), _consumption->current_value()));
}
VLOG_NOTICE << "GC succeeded, TryConsume bytes=" << bytes
<< " consumption=" << _consumption->current_value() << " limit=" << _limit;
Expand Down Expand Up @@ -218,20 +223,19 @@ std::string MemTrackerLimiter::log_usage(int max_recursive_depth,
return join(usage_strings, "\n");
}

Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, int64_t failed_consume_size) {
STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
Status MemTrackerLimiter::mem_limit_exceeded_log(const std::string& msg) {
DCHECK(_limit != -1);
std::string detail = fmt::format(
"{}, failed mem consume:<consume_size={}, mem_limit={}, mem_used={}, tracker_label={}, "
"in backend={} free memory left={}. details mem usage see be.INFO.",
msg, PrettyPrinter::print(failed_consume_size, TUnit::BYTES), _limit,
_consumption->current_value(), _label, BackendOptions::get_localhost(),
PrettyPrinter::print(ExecEnv::GetInstance()->new_process_mem_tracker()->spare_capacity(),
TUnit::BYTES));
"{}, backend={} memory used={}, free memory left={}. If is query, can change the limit "
"by `set exec_mem_limit=xxx`, details mem usage see be.INFO.",
msg, BackendOptions::get_localhost(),
PrettyPrinter::print(PerfCounters::get_vm_rss(), TUnit::BYTES),
PrettyPrinter::print(MemInfo::mem_limit() - PerfCounters::get_vm_rss(), TUnit::BYTES));
Status status = Status::MemoryLimitExceeded(detail);

// only print the tracker log_usage in be log.
if (_print_log_usage) {
if (ExecEnv::GetInstance()->new_process_mem_tracker()->spare_capacity() < failed_consume_size) {
if (_label == "Process") {
// Dumping the process MemTracker is expensive. Limiting the recursive depth to two
// levels limits the level of detail to a one-line summary for each query MemTracker.
detail += "\n" + ExecEnv::GetInstance()->new_process_mem_tracker()->log_usage(2);
Expand All @@ -245,6 +249,44 @@ Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, int64_t fai
return status;
}

Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, int64_t failed_consume_size) {
STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
DCHECK(!_limited_ancestors.empty());
for (const auto& tracker : _limited_ancestors) {
if (tracker->has_limit() &&
tracker->limit() < tracker->peak_consumption() + failed_consume_size) {
std::string detail;
if (failed_consume_size != 0) {
detail = fmt::format(
"memory limit exceeded:<consumed_tracker={}, need_size={}, "
"exceeded_tracker={}, limit={}, peak_used={}, current_used={}>, "
"executing:<{}>",
_label, PrettyPrinter::print(failed_consume_size, TUnit::BYTES),
tracker->label(), tracker->limit(), tracker->peak_consumption(),
tracker->consumption(), msg);
} else {
detail = fmt::format(
"memory limit exceeded:<exceeded_tracker={}, limit={}, peak_used={}, "
"current_used={}>, executing:<{}>",
tracker->label(), tracker->limit(), tracker->peak_consumption(),
tracker->consumption(), msg);
}
return tracker->mem_limit_exceeded_log(detail);
}
}
return Status::MemoryLimitExceeded("no mem tracker exceed limit");
}

Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg,
MemTrackerLimiter* failed_tracker,
Status failed_try_consume_st) {
STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
std::string detail =
fmt::format("memory limit exceeded:<consumed_tracker={}, {}>, executing:<{}>", _label,
failed_try_consume_st.get_error_msg(), msg);
return failed_tracker->mem_limit_exceeded_log(detail);
}

Status MemTrackerLimiter::mem_limit_exceeded(RuntimeState* state, const std::string& msg,
int64_t failed_alloc_size) {
Status rt = mem_limit_exceeded(msg, failed_alloc_size);
Expand Down
29 changes: 22 additions & 7 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,15 @@ class MemTrackerLimiter final : public NewMemTracker {
Status try_gc_memory(int64_t bytes);

public:
// up to (but not including) end_tracker.
// This happens when we want to update tracking on a particular mem tracker but the consumption
// against the limit recorded in one of its ancestors already happened.
// It is used for revise mem tracker consumption.
// If the location of memory alloc and free is different, the consumption value of mem tracker will be inaccurate.
// But the consumption value of the process mem tracker is not affecte
void consumption_revise(int64_t bytes) {
DCHECK(_label != "Process");
_consumption->add(bytes);
}
void consume_local(int64_t bytes, MemTrackerLimiter* end_tracker);

void enable_print_log_usage() { _print_log_usage = true; }

// Logs the usage of this tracker limiter and optionally its children (recursively).
// If 'logged_consumption' is non-nullptr, sets the consumption value logged.
Expand All @@ -143,9 +145,11 @@ class MemTrackerLimiter final : public NewMemTracker {
// If 'failed_allocation_size' is greater than zero, logs the allocation size. If
// 'failed_allocation_size' is zero, nothing about the allocation size is logged.
// If 'state' is non-nullptr, logs the error to 'state'.
Status mem_limit_exceeded(const std::string& msg, int64_t failed_consume_size);
Status mem_limit_exceeded(RuntimeState* state, const std::string& msg = std::string(),
int64_t failed_consume_size = -1);
Status mem_limit_exceeded(const std::string& msg, int64_t failed_consume_size = 0);
Status mem_limit_exceeded(const std::string& msg, MemTrackerLimiter* failed_tracker,
Status failed_try_consume_st);
Status mem_limit_exceeded(RuntimeState* state, const std::string& msg,
int64_t failed_consume_size = 0);

std::string debug_string() {
std::stringstream msg;
Expand Down Expand Up @@ -186,6 +190,8 @@ class MemTrackerLimiter final : public NewMemTracker {
const std::list<MemTrackerLimiter*>& trackers,
int64_t* logged_consumption);

Status mem_limit_exceeded_log(const std::string& msg);

private:
// Limit on memory consumption, in bytes. If limit_ == -1, there is no consumption limit. Used in log_usage。
int64_t _limit;
Expand Down Expand Up @@ -253,6 +259,15 @@ inline void MemTrackerLimiter::consume_cache(int64_t bytes) {
}
}

inline void MemTrackerLimiter::consume_local(int64_t bytes, MemTrackerLimiter* end_tracker) {
DCHECK(end_tracker);
if (bytes == 0) return;
for (auto& tracker : _all_ancestors) {
if (tracker->label() == end_tracker->label()) return;
tracker->_consumption->add(bytes);
}
}

inline Status MemTrackerLimiter::try_consume(int64_t bytes) {
if (bytes <= 0) {
release(-bytes);
Expand Down
4 changes: 3 additions & 1 deletion be/src/runtime/memory/mem_tracker_task_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
// In order to ensure that the query pool mem tracker is the sum of all currently running query mem trackers,
// the effect of the ended query mem tracker on the query pool mem tracker should be cleared, that is,
// the negative number of the current value of consume.
it->second->parent()->consumption_revise(-it->second->consumption());
it->second->parent()->consume_local(
-it->second->consumption(),
ExecEnv::GetInstance()->new_process_mem_tracker().get());
LOG(INFO) << fmt::format(
"Deregister query/load memory tracker, queryId={}, Limit={}, PeakUsed={}",
it->first, it->second->limit(), it->second->peak_consumption());
Expand Down
16 changes: 5 additions & 11 deletions be/src/runtime/memory/thread_mem_tracker_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,15 @@ void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details
}
}

void ThreadMemTrackerMgr::exceeded(int64_t failed_consume_size) {
void ThreadMemTrackerMgr::exceeded(Status failed_try_consume_st) {
if (_cb_func != nullptr) {
_cb_func();
}
if (is_attach_query()) {
std::string cancel_msg;
if (!_consumer_tracker_stack.empty()) {
cancel_msg = fmt::format(
"exec node:<name={}>, can change the limit by `set exec_mem_limit=xxx`",
_consumer_tracker_stack[-1]->label());
} else {
cancel_msg = "exec node:unknown, can change the limit by `set exec_mem_limit=xxx`";
}
auto st = _limiter_tracker->mem_limit_exceeded(cancel_msg, failed_consume_size);
exceeded_cancel_task(st.to_string());
auto st = _limiter_tracker->mem_limit_exceeded(fmt::format("exec node:<{}>", ""),
_limiter_tracker->parent().get(),
failed_try_consume_st);
exceeded_cancel_task(st.get_error_msg());
_check_limit = false; // Make sure it will only be canceled once
}
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class ThreadMemTrackerMgr {
// If tryConsume fails due to task mem tracker exceeding the limit, the task must be canceled
void exceeded_cancel_task(const std::string& cancel_details);

void exceeded(int64_t failed_consume_size);
void exceeded(Status failed_try_consume_st);

private:
// Cache untracked mem, only update to _untracked_mems when switching mem tracker.
Expand Down Expand Up @@ -191,7 +191,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
// The memory has been allocated, so when TryConsume fails, need to continue to complete
// the consume to ensure the accuracy of the statistics.
_limiter_tracker->consume(_untracked_mem);
exceeded(_untracked_mem);
exceeded(st);
}
} else {
_limiter_tracker->consume(_untracked_mem);
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@ Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) {
}

_new_instance_mem_tracker = std::make_shared<MemTrackerLimiter>(
bytes_limit, "RuntimeState:instance:" + print_id(_fragment_instance_id),
_new_query_mem_tracker);
-1, "RuntimeState:instance:" + print_id(_fragment_instance_id),
_new_query_mem_tracker, &_profile);

/*
// TODO: this is a stopgap until we implement ExprContext
Expand Down
9 changes: 5 additions & 4 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,9 @@ class StopCheckThreadMemTrackerLimit {
doris::thread_context()->_thread_mem_tracker_mgr->transfer_to(size, tracker)
#define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \
doris::thread_context()->_thread_mem_tracker_mgr->transfer_from(size, tracker)
#define RETURN_LIMIT_EXCEEDED(state, msg, ...) \
return doris::thread_context() \
->_thread_mem_tracker_mgr->limiter_mem_tracker() \
->mem_limit_exceeded(state, msg, ##__VA_ARGS__);
#define RETURN_LIMIT_EXCEEDED(state, msg, ...) \
return doris::thread_context() \
->_thread_mem_tracker_mgr->limiter_mem_tracker() \
->mem_limit_exceeded(state, fmt::format("exec node:<{}>, {}", "", msg), \
##__VA_ARGS__);
} // namespace doris
1 change: 1 addition & 0 deletions be/src/service/doris_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ int main(int argc, char** argv) {

// 1s clear the expired task mem tracker, a query mem tracker is about 57 bytes.
doris::ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->logout_task_mem_tracker();
doris::ExecEnv::GetInstance()->new_process_mem_tracker()->enable_print_log_usage();
sleep(1);
}

Expand Down