Skip to content

Commit

Permalink
[fix](memtracker) Introduce orphan mem tracker to verify memory track…
Browse files Browse the repository at this point in the history
…ing accuracy (apache#12794)

The mem hook consumes the orphan tracker by default. If the thread does not attach other trackers, by default all consumption will be passed to the process tracker through the orphan tracker.

In real time, consumption of all other trackers + orphan tracker consumption = process tracker consumption.

Ideally, all threads are expected to attach to the specified tracker, so that "all memory has its own ownership", and the consumption of the orphan mem tracker is close to 0, but greater than 0.
  • Loading branch information
xinyiZzz authored and Yijia Su committed Oct 8, 2022
1 parent 0976c4e commit f29a5c4
Show file tree
Hide file tree
Showing 13 changed files with 97 additions and 54 deletions.
20 changes: 15 additions & 5 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,17 @@ class ExecEnv {
}

std::shared_ptr<MemTrackerLimiter> process_mem_tracker() { return _process_mem_tracker; }
MemTrackerLimiter* process_mem_tracker_raw() { return _process_mem_tracker_raw; }
void set_process_mem_tracker(const std::shared_ptr<MemTrackerLimiter>& tracker) {
_process_mem_tracker = tracker;
_process_mem_tracker_raw = tracker.get();
void set_global_mem_tracker(const std::shared_ptr<MemTrackerLimiter>& process_tracker,
const std::shared_ptr<MemTrackerLimiter>& orphan_tracker) {
_process_mem_tracker = process_tracker;
_orphan_mem_tracker = orphan_tracker;
_orphan_mem_tracker_raw = orphan_tracker.get();
}
std::shared_ptr<MemTracker> allocator_cache_mem_tracker() {
return _allocator_cache_mem_tracker;
}
std::shared_ptr<MemTrackerLimiter> orphan_mem_tracker() { return _orphan_mem_tracker; }
MemTrackerLimiter* orphan_mem_tracker_raw() { return _orphan_mem_tracker_raw; }
std::shared_ptr<MemTrackerLimiter> query_pool_mem_tracker() { return _query_pool_mem_tracker; }
std::shared_ptr<MemTrackerLimiter> load_pool_mem_tracker() { return _load_pool_mem_tracker; }
MemTrackerTaskPool* task_pool_mem_tracker_registry() { return _task_pool_mem_tracker_registry; }
Expand Down Expand Up @@ -215,8 +218,15 @@ class ExecEnv {
// The ancestor for all trackers. Every tracker is visible from the process down.
// Not limit total memory by process tracker, and it's just used to track virtual memory of process.
std::shared_ptr<MemTrackerLimiter> _process_mem_tracker;
// tcmalloc/jemalloc allocator cache tracker, Including thread cache, free heap, etc.
std::shared_ptr<MemTracker> _allocator_cache_mem_tracker;
MemTrackerLimiter* _process_mem_tracker_raw;
// The default tracker consumed by mem hook. If the thread does not attach other trackers,
// by default all consumption will be passed to the process tracker through the orphan tracker.
// In real time, `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption`.
// Ideally, all threads are expected to attach to the specified tracker, so that "all memory has its own ownership",
// and the consumption of the orphan mem tracker is close to 0, but greater than 0.
std::shared_ptr<MemTrackerLimiter> _orphan_mem_tracker;
MemTrackerLimiter* _orphan_mem_tracker_raw;
// The ancestor for all querys tracker.
std::shared_ptr<MemTrackerLimiter> _query_pool_mem_tracker;
// The ancestor for all load tracker.
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ Status ExecEnv::_init_mem_tracker() {
}
_process_mem_tracker =
std::make_shared<MemTrackerLimiter>(global_memory_limit_bytes, "Process");
_process_mem_tracker_raw = _process_mem_tracker.get();
_orphan_mem_tracker = std::make_shared<MemTrackerLimiter>(-1, "Orphan", _process_mem_tracker);
_orphan_mem_tracker_raw = _orphan_mem_tracker.get();
thread_context()->_thread_mem_tracker_mgr->init();
thread_context()->_thread_mem_tracker_mgr->set_check_attach(false);
#if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && \
Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/mem_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ MemPool::~MemPool() {
ChunkAllocator::instance()->free(chunk.chunk);
}
THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released - peak_allocated_bytes_,
ExecEnv::GetInstance()->process_mem_tracker_raw());
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
if (_mem_tracker) _mem_tracker->release(total_bytes_released);
DorisMetrics::instance()->memory_pool_bytes_total->increment(-total_bytes_released);
}
Expand All @@ -89,7 +89,7 @@ void MemPool::free_all() {
ChunkAllocator::instance()->free(chunk.chunk);
}
THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released - peak_allocated_bytes_,
ExecEnv::GetInstance()->process_mem_tracker_raw());
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
if (_mem_tracker) _mem_tracker->release(total_bytes_released);
chunks_.clear();
next_chunk_size_ = INITIAL_CHUNK_SIZE;
Expand Down Expand Up @@ -150,7 +150,7 @@ Status MemPool::find_chunk(size_t min_size, bool check_limits) {
// Allocate a new chunk. Return early if allocate fails.
Chunk chunk;
RETURN_IF_ERROR(ChunkAllocator::instance()->allocate(chunk_size, &chunk));
THREAD_MEM_TRACKER_TRANSFER_TO(chunk_size, ExecEnv::GetInstance()->process_mem_tracker_raw());
THREAD_MEM_TRACKER_TRANSFER_TO(chunk_size, ExecEnv::GetInstance()->orphan_mem_tracker_raw());
if (_mem_tracker) _mem_tracker->consume(chunk_size);
ASAN_POISON_MEMORY_REGION(chunk.data, chunk_size);
// Put it before the first free chunk. If no free chunks, it goes at the end.
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/mem_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ class MemPool {
void reset_peak() {
if (total_allocated_bytes_ - peak_allocated_bytes_ > 65536) {
THREAD_MEM_TRACKER_TRANSFER_FROM(total_allocated_bytes_ - peak_allocated_bytes_,
ExecEnv::GetInstance()->process_mem_tracker_raw());
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
peak_allocated_bytes_ = total_allocated_bytes_;
}
}
Expand Down
18 changes: 10 additions & 8 deletions be/src/runtime/memory/mem_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,14 @@ MemTracker::MemTracker(const std::string& label, RuntimeProfile* profile) {
_consumption = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES);
}

DCHECK(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker() != nullptr);
_label = fmt::format(
"{} | {}", label,
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->label());
_bind_group_num =
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->group_num();
DCHECK(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw() != nullptr);
MemTrackerLimiter* parent =
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw();
_label = fmt::format("[Observer] {} | {}", label,
parent->label() == "Orphan" ? "Process" : parent->label());
_bind_group_num = parent->label() == "Orphan"
? ExecEnv::GetInstance()->process_mem_tracker()->group_num()
: parent->group_num();
{
std::lock_guard<std::mutex> l(mem_tracker_pool[_bind_group_num].group_lock);
_tracker_group_it = mem_tracker_pool[_bind_group_num].trackers.insert(
Expand Down Expand Up @@ -119,8 +121,8 @@ std::shared_ptr<MemTracker> MemTracker::get_global_mem_tracker(const std::string
if (global_mem_trackers.find(label) != global_mem_trackers.end()) {
return global_mem_trackers[label];
} else {
global_mem_trackers.emplace(label,
std::make_shared<MemTracker>(fmt::format("[Global]{}", label)));
global_mem_trackers.emplace(
label, std::make_shared<MemTracker>(fmt::format("[Global] {}", label)));
return global_mem_trackers[label];
}
}
Expand Down
23 changes: 21 additions & 2 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,14 @@ MemTrackerLimiter::MemTrackerLimiter(int64_t byte_limit, const std::string& labe
_label = label;
_limit = byte_limit;
_group_num = GetCurrentTimeMicros() % 1000;
_parent = parent ? parent : thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker();
if (parent || label == "Process") {
_parent = parent;
} else if (thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->label() ==
"Orphan") {
_parent = ExecEnv::GetInstance()->process_mem_tracker();
} else {
_parent = thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker();
}
DCHECK(_parent || label == "Process");

// Walks the MemTrackerLimiter hierarchy and populates _all_ancestors and _limited_ancestors
Expand Down Expand Up @@ -70,6 +77,18 @@ MemTrackerLimiter::~MemTrackerLimiter() {
if (_label == "Process") doris::thread_context_ptr._init = false;
DCHECK(remain_child_count() == 0 || _label == "Process");
consume(_untracked_mem.exchange(0));
#ifndef BE_TEST
// In order to ensure `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption`
// in real time. Merge its consumption into orphan when all third level limiter trackers are destructed, to avoid repetition.
// the first layer: process;
// the second layer: a tracker that will not be destructed globally (query/load pool, load channel mgr, etc.);
// the third layer: a query/load/compaction task generates a tracker (query tracker, load channel tracker, etc.).
if (_parent->parent()->label() == "Process") {
ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local(
_consumption->current_value());
}
#endif

if (_parent) {
std::lock_guard<std::mutex> l(_parent->_child_tracker_limiter_lock);
if (_child_tracker_it != _parent->_child_tracker_limiters.end()) {
Expand Down Expand Up @@ -260,7 +279,7 @@ std::string MemTrackerLimiter::mem_limit_exceeded(const std::string& msg,
// The limit of the current tracker and parents is less than 0, the consume will not fail,
// and the current process memory has no excess limit.
detail += fmt::format("unknown exceed reason, executing msg:<{}>", msg);
print_log_usage_tracker = ExecEnv::GetInstance()->process_mem_tracker_raw();
print_log_usage_tracker = ExecEnv::GetInstance()->process_mem_tracker().get();
}
auto failed_msg = MemTrackerLimiter::limit_exceeded_errmsg_suffix_str(detail);
if (print_log_usage_tracker != nullptr) print_log_usage_tracker->print_log_usage(failed_msg);
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class MemTrackerLimiter final : public MemTracker {
"alloc size {}",
PerfCounters::get_vm_rss_str(), MemInfo::allocator_cache_mem_str(),
MemInfo::mem_limit_str(), print_bytes(bytes));
ExecEnv::GetInstance()->process_mem_tracker_raw()->print_log_usage(err_msg);
ExecEnv::GetInstance()->process_mem_tracker()->print_log_usage(err_msg);
return err_msg;
}

Expand Down
19 changes: 10 additions & 9 deletions be/src/runtime/memory/thread_mem_tracker_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,25 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
DCHECK(mem_tracker);
flush_untracked_mem<false>();
_task_id = task_id;
_fragment_instance_id = fragment_instance_id;
_limiter_tracker = mem_tracker;
_task_id_stack.push_back(task_id);
_fragment_instance_id_stack.push_back(fragment_instance_id);
_limiter_tracker_stack.push_back(mem_tracker);
_limiter_tracker_raw = mem_tracker.get();
}

void ThreadMemTrackerMgr::detach_limiter_tracker() {
DCHECK(!_limiter_tracker_stack.empty());
flush_untracked_mem<false>();
_task_id = "";
_fragment_instance_id = TUniqueId();
_limiter_tracker = ExecEnv::GetInstance()->process_mem_tracker();
_limiter_tracker_raw = ExecEnv::GetInstance()->process_mem_tracker_raw();
_task_id_stack.pop_back();
_fragment_instance_id_stack.pop_back();
_limiter_tracker_stack.pop_back();
_limiter_tracker_raw = _limiter_tracker_stack.back().get();
}

void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details) {
if (_fragment_instance_id != TUniqueId()) {
if (_fragment_instance_id_stack.back() != TUniqueId()) {
ExecEnv::GetInstance()->fragment_mgr()->cancel(
_fragment_instance_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
_fragment_instance_id_stack.back(), PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
cancel_details);
}
}
Expand Down
35 changes: 22 additions & 13 deletions be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ThreadMemTrackerMgr {

// only for tcmalloc hook
static void consume_no_attach(int64_t size) {
ExecEnv::GetInstance()->process_mem_tracker_raw()->consume(size);
ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume(size);
}

// After thread initialization, calling `init` again must call `clear_untracked_mems` first
Expand Down Expand Up @@ -81,9 +81,11 @@ class ThreadMemTrackerMgr {
template <bool CheckLimit>
void flush_untracked_mem();

bool is_attach_query() { return _fragment_instance_id != TUniqueId(); }
bool is_attach_query() { return _fragment_instance_id_stack.back() != TUniqueId(); }

std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() { return _limiter_tracker; }
std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() {
return _limiter_tracker_stack.back();
}
MemTrackerLimiter* limiter_mem_tracker_raw() { return _limiter_tracker_raw; }

void set_check_limit(bool check_limit) { _check_limit = check_limit; }
Expand All @@ -98,8 +100,8 @@ class ThreadMemTrackerMgr {
return fmt::format(
"ThreadMemTrackerMgr debug, _untracked_mem:{}, _task_id:{}, "
"_limiter_tracker:<{}>, _consumer_tracker_stack:<{}>",
std::to_string(_untracked_mem), _task_id, _limiter_tracker->log_usage(1),
fmt::to_string(consumer_tracker_buf));
std::to_string(_untracked_mem), _task_id_stack.back(),
_limiter_tracker_raw->log_usage(1), fmt::to_string(consumer_tracker_buf));
}

private:
Expand All @@ -115,7 +117,8 @@ class ThreadMemTrackerMgr {
int64_t old_untracked_mem = 0;
std::string failed_msg = std::string();

std::shared_ptr<MemTrackerLimiter> _limiter_tracker;
// _limiter_tracker_stack[0] = orphan_mem_tracker
std::vector<std::shared_ptr<MemTrackerLimiter>> _limiter_tracker_stack;
MemTrackerLimiter* _limiter_tracker_raw;
std::vector<MemTracker*> _consumer_tracker_stack;

Expand All @@ -124,16 +127,22 @@ class ThreadMemTrackerMgr {
// If there is a memory new/delete operation in the consume method, it may enter infinite recursion.
bool _stop_consume = false;
bool _check_attach = true;
std::string _task_id;
TUniqueId _fragment_instance_id;
std::vector<std::string> _task_id_stack;
std::vector<TUniqueId> _fragment_instance_id_stack;
ExceedCallBack _cb_func = nullptr;
};

inline void ThreadMemTrackerMgr::init() {
DCHECK(_consumer_tracker_stack.empty());
_task_id = "";
_limiter_tracker = ExecEnv::GetInstance()->process_mem_tracker();
_limiter_tracker_raw = ExecEnv::GetInstance()->process_mem_tracker_raw();
// _limiter_tracker_stack[0] = orphan_mem_tracker
DCHECK(_limiter_tracker_stack.size() <= 1)
<< "limiter_tracker_stack.size(): " << _limiter_tracker_stack.size();
if (_limiter_tracker_stack.size() == 0) {
_limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker());
_limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw();
_task_id_stack.push_back("");
_fragment_instance_id_stack.push_back(TUniqueId());
}
_check_limit = true;
}

Expand Down Expand Up @@ -173,7 +182,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
// the TCMalloc Hook again, so suspend consumption to avoid falling into an infinite loop.
_stop_consume = true;
old_untracked_mem = _untracked_mem;
DCHECK(_limiter_tracker);
DCHECK(_limiter_tracker_raw);
if (CheckLimit) {
#ifndef BE_TEST
// When all threads are started, `attach_limiter_tracker` is expected to be called to bind the limiter tracker.
Expand All @@ -183,7 +192,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
// TODO(zxy) The current p0 test cannot guarantee that all threads are checked,
// so disable it and try to open it when memory tracking is not on time.
// DCHECK(!_check_attach || btls_key != EMPTY_BTLS_KEY ||
// _limiter_tracker->label() != "Process");
// _limiter_tracker_raw->label() != "Process");
#endif
if (!_limiter_tracker_raw->try_consume(old_untracked_mem, failed_msg)) {
// The memory has been allocated, so when TryConsume fails, need to continue to complete
Expand Down
9 changes: 4 additions & 5 deletions be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,13 @@ Status RuntimeState::init(const TUniqueId& fragment_instance_id, const TQueryOpt
Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) {
bool has_query_mem_tracker = _query_options.__isset.mem_limit && (_query_options.mem_limit > 0);
int64_t bytes_limit = has_query_mem_tracker ? _query_options.mem_limit : -1;
if (bytes_limit > ExecEnv::GetInstance()->process_mem_tracker_raw()->limit()) {
if (bytes_limit > ExecEnv::GetInstance()->process_mem_tracker()->limit()) {
VLOG_NOTICE << "Query memory limit " << PrettyPrinter::print(bytes_limit, TUnit::BYTES)
<< " exceeds process memory limit of "
<< PrettyPrinter::print(
ExecEnv::GetInstance()->process_mem_tracker_raw()->limit(),
TUnit::BYTES)
<< PrettyPrinter::print(ExecEnv::GetInstance()->process_mem_tracker()->limit(),
TUnit::BYTES)
<< ". Using process memory limit instead";
bytes_limit = ExecEnv::GetInstance()->process_mem_tracker_raw()->limit();
bytes_limit = ExecEnv::GetInstance()->process_mem_tracker()->limit();
}
auto mem_tracker_counter = ADD_COUNTER(&_profile, "MemoryLimit", TUnit::BYTES);
mem_tracker_counter->set(bytes_limit);
Expand Down
2 changes: 1 addition & 1 deletion be/src/service/doris_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ int main(int argc, char** argv) {
doris::ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->logout_task_mem_tracker();
// The process tracker print log usage interval is 1s to avoid a large number of tasks being
// canceled when the process exceeds the mem limit, resulting in too many duplicate logs.
doris::ExecEnv::GetInstance()->process_mem_tracker_raw()->enable_print_log_usage();
doris::ExecEnv::GetInstance()->process_mem_tracker()->enable_print_log_usage();
sleep(1);
}

Expand Down
8 changes: 4 additions & 4 deletions be/src/vec/common/pod_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class PODArrayBase : private boost::noncopyable,
inline void reset_peak() {
if (UNLIKELY(c_end - c_end_peak > 65536)) {
THREAD_MEM_TRACKER_TRANSFER_FROM(c_end - c_end_peak,
ExecEnv::GetInstance()->process_mem_tracker_raw());
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
c_end_peak = c_end;
}
}
Expand All @@ -127,7 +127,7 @@ class PODArrayBase : private boost::noncopyable,
template <typename... TAllocatorParams>
void alloc(size_t bytes, TAllocatorParams&&... allocator_params) {
THREAD_MEM_TRACKER_TRANSFER_TO(bytes - pad_right - pad_left,
ExecEnv::GetInstance()->process_mem_tracker_raw());
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
c_start = c_end = c_end_peak =
reinterpret_cast<char*>(TAllocator::alloc(
bytes, std::forward<TAllocatorParams>(allocator_params)...)) +
Expand All @@ -144,7 +144,7 @@ class PODArrayBase : private boost::noncopyable,

TAllocator::free(c_start - pad_left, allocated_bytes());
THREAD_MEM_TRACKER_TRANSFER_FROM(c_end_of_storage - c_end_peak,
ExecEnv::GetInstance()->process_mem_tracker_raw());
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
}

template <typename... TAllocatorParams>
Expand All @@ -157,7 +157,7 @@ class PODArrayBase : private boost::noncopyable,
unprotect();

THREAD_MEM_TRACKER_TRANSFER_TO(bytes - allocated_bytes(),
ExecEnv::GetInstance()->process_mem_tracker_raw());
ExecEnv::GetInstance()->orphan_mem_tracker_raw());

ptrdiff_t end_diff = c_end - c_start;

Expand Down
Loading

0 comments on commit f29a5c4

Please sign in to comment.