Skip to content

Commit

Permalink
add_orphan_tracker
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Sep 21, 2022
1 parent 3cfaae0 commit 55e7767
Show file tree
Hide file tree
Showing 13 changed files with 95 additions and 54 deletions.
20 changes: 15 additions & 5 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,17 @@ class ExecEnv {
}

std::shared_ptr<MemTrackerLimiter> process_mem_tracker() { return _process_mem_tracker; }
MemTrackerLimiter* process_mem_tracker_raw() { return _process_mem_tracker_raw; }
void set_process_mem_tracker(const std::shared_ptr<MemTrackerLimiter>& tracker) {
_process_mem_tracker = tracker;
_process_mem_tracker_raw = tracker.get();
void set_global_mem_tracker(const std::shared_ptr<MemTrackerLimiter>& process_tracker,
const std::shared_ptr<MemTrackerLimiter>& orphan_tracker) {
_process_mem_tracker = process_tracker;
_orphan_mem_tracker = orphan_tracker;
_orphan_mem_tracker_raw = orphan_tracker.get();
}
std::shared_ptr<MemTracker> allocator_cache_mem_tracker() {
return _allocator_cache_mem_tracker;
}
std::shared_ptr<MemTrackerLimiter> orphan_mem_tracker() { return _orphan_mem_tracker; }
MemTrackerLimiter* orphan_mem_tracker_raw() { return _orphan_mem_tracker_raw; }
std::shared_ptr<MemTrackerLimiter> query_pool_mem_tracker() { return _query_pool_mem_tracker; }
std::shared_ptr<MemTrackerLimiter> load_pool_mem_tracker() { return _load_pool_mem_tracker; }
MemTrackerTaskPool* task_pool_mem_tracker_registry() { return _task_pool_mem_tracker_registry; }
Expand Down Expand Up @@ -215,8 +218,15 @@ class ExecEnv {
// The ancestor for all trackers. Every tracker is visible from the process down.
// Not limit total memory by process tracker, and it's just used to track virtual memory of process.
std::shared_ptr<MemTrackerLimiter> _process_mem_tracker;
// tcmalloc/jemalloc allocator cache tracker, Including thread cache, free heap, etc.
std::shared_ptr<MemTracker> _allocator_cache_mem_tracker;
MemTrackerLimiter* _process_mem_tracker_raw;
// The default tracker consumed by mem hook. If the thread does not attach other trackers,
// by default all consumption will be passed to the process tracker through the orphan tracker.
// In real time, `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption`.
// Ideally, all threads are expected to attach to the specified tracker, so that "all memory has its own ownership",
// and the consumption of the orphan mem tracker is close to 0, but greater than 0.
std::shared_ptr<MemTrackerLimiter> _orphan_mem_tracker;
MemTrackerLimiter* _orphan_mem_tracker_raw;
// The ancestor for all querys tracker.
std::shared_ptr<MemTrackerLimiter> _query_pool_mem_tracker;
// The ancestor for all load tracker.
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ Status ExecEnv::_init_mem_tracker() {
}
_process_mem_tracker =
std::make_shared<MemTrackerLimiter>(global_memory_limit_bytes, "Process");
_process_mem_tracker_raw = _process_mem_tracker.get();
_orphan_mem_tracker = std::make_shared<MemTrackerLimiter>(-1, "Orphan", _process_mem_tracker);
_orphan_mem_tracker_raw = _orphan_mem_tracker.get();
thread_context()->_thread_mem_tracker_mgr->init();
thread_context()->_thread_mem_tracker_mgr->set_check_attach(false);
#if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && \
Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/mem_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ MemPool::~MemPool() {
ChunkAllocator::instance()->free(chunk.chunk);
}
THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released - peak_allocated_bytes_,
ExecEnv::GetInstance()->process_mem_tracker_raw());
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
if (_mem_tracker) _mem_tracker->release(total_bytes_released);
DorisMetrics::instance()->memory_pool_bytes_total->increment(-total_bytes_released);
}
Expand All @@ -89,7 +89,7 @@ void MemPool::free_all() {
ChunkAllocator::instance()->free(chunk.chunk);
}
THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released - peak_allocated_bytes_,
ExecEnv::GetInstance()->process_mem_tracker_raw());
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
if (_mem_tracker) _mem_tracker->release(total_bytes_released);
chunks_.clear();
next_chunk_size_ = INITIAL_CHUNK_SIZE;
Expand Down Expand Up @@ -150,7 +150,7 @@ Status MemPool::find_chunk(size_t min_size, bool check_limits) {
// Allocate a new chunk. Return early if allocate fails.
Chunk chunk;
RETURN_IF_ERROR(ChunkAllocator::instance()->allocate(chunk_size, &chunk));
THREAD_MEM_TRACKER_TRANSFER_TO(chunk_size, ExecEnv::GetInstance()->process_mem_tracker_raw());
THREAD_MEM_TRACKER_TRANSFER_TO(chunk_size, ExecEnv::GetInstance()->orphan_mem_tracker_raw());
if (_mem_tracker) _mem_tracker->consume(chunk_size);
ASAN_POISON_MEMORY_REGION(chunk.data, chunk_size);
// Put it before the first free chunk. If no free chunks, it goes at the end.
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/mem_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ class MemPool {
void reset_peak() {
if (total_allocated_bytes_ - peak_allocated_bytes_ > 65536) {
THREAD_MEM_TRACKER_TRANSFER_FROM(total_allocated_bytes_ - peak_allocated_bytes_,
ExecEnv::GetInstance()->process_mem_tracker_raw());
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
peak_allocated_bytes_ = total_allocated_bytes_;
}
}
Expand Down
18 changes: 10 additions & 8 deletions be/src/runtime/memory/mem_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,14 @@ MemTracker::MemTracker(const std::string& label, RuntimeProfile* profile) {
_consumption = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES);
}

DCHECK(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker() != nullptr);
_label = fmt::format(
"{} | {}", label,
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->label());
_bind_group_num =
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->group_num();
DCHECK(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw() != nullptr);
MemTrackerLimiter* parent =
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw();
_label = fmt::format("[Observer] {} | {}", label,
parent->label() == "Orphan" ? "Process" : parent->label());
_bind_group_num = parent->label() == "Orphan"
? ExecEnv::GetInstance()->process_mem_tracker()->group_num()
: parent->group_num();
{
std::lock_guard<std::mutex> l(mem_tracker_pool[_bind_group_num].group_lock);
_tracker_group_it = mem_tracker_pool[_bind_group_num].trackers.insert(
Expand Down Expand Up @@ -119,8 +121,8 @@ std::shared_ptr<MemTracker> MemTracker::get_global_mem_tracker(const std::string
if (global_mem_trackers.find(label) != global_mem_trackers.end()) {
return global_mem_trackers[label];
} else {
global_mem_trackers.emplace(label,
std::make_shared<MemTracker>(fmt::format("[Global]{}", label)));
global_mem_trackers.emplace(
label, std::make_shared<MemTracker>(fmt::format("[Global] {}", label)));
return global_mem_trackers[label];
}
}
Expand Down
21 changes: 19 additions & 2 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,14 @@ MemTrackerLimiter::MemTrackerLimiter(int64_t byte_limit, const std::string& labe
_label = label;
_limit = byte_limit;
_group_num = GetCurrentTimeMicros() % 1000;
_parent = parent ? parent : thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker();
if (parent || label == "Process") {
_parent = parent;
} else if (thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->label() ==
"Orphan") {
_parent = ExecEnv::GetInstance()->process_mem_tracker();
} else {
_parent = thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker();
}
DCHECK(_parent || label == "Process");

// Walks the MemTrackerLimiter hierarchy and populates _all_ancestors and _limited_ancestors
Expand Down Expand Up @@ -70,6 +77,16 @@ MemTrackerLimiter::~MemTrackerLimiter() {
if (_label == "Process") doris::thread_context_ptr._init = false;
DCHECK(remain_child_count() == 0 || _label == "Process");
consume(_untracked_mem.exchange(0));
// In order to ensure `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption`
// in real time. Merge its consumption into orphan when all third level limiter trackers are destructed, to avoid repetition.
// the first layer: process;
// the second layer: a tracker that will not be destructed globally (query/load pool, load channel mgr, etc.);
// the third layer: a query/load/compaction task generates a tracker (query tracker, load channel tracker, etc.).
if (_parent->parent()->label() == "Process") {
ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local(
_consumption->current_value());
}

if (_parent) {
std::lock_guard<std::mutex> l(_parent->_child_tracker_limiter_lock);
if (_child_tracker_it != _parent->_child_tracker_limiters.end()) {
Expand Down Expand Up @@ -260,7 +277,7 @@ std::string MemTrackerLimiter::mem_limit_exceeded(const std::string& msg,
// The limit of the current tracker and parents is less than 0, the consume will not fail,
// and the current process memory has no excess limit.
detail += fmt::format("unknown exceed reason, executing msg:<{}>", msg);
print_log_usage_tracker = ExecEnv::GetInstance()->process_mem_tracker_raw();
print_log_usage_tracker = ExecEnv::GetInstance()->process_mem_tracker().get();
}
auto failed_msg = MemTrackerLimiter::limit_exceeded_errmsg_suffix_str(detail);
if (print_log_usage_tracker != nullptr) print_log_usage_tracker->print_log_usage(failed_msg);
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class MemTrackerLimiter final : public MemTracker {
"alloc size {}",
PerfCounters::get_vm_rss_str(), MemInfo::allocator_cache_mem_str(),
MemInfo::mem_limit_str(), print_bytes(bytes));
ExecEnv::GetInstance()->process_mem_tracker_raw()->print_log_usage(err_msg);
ExecEnv::GetInstance()->process_mem_tracker()->print_log_usage(err_msg);
return err_msg;
}

Expand Down
19 changes: 10 additions & 9 deletions be/src/runtime/memory/thread_mem_tracker_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,25 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
DCHECK(mem_tracker);
flush_untracked_mem<false>();
_task_id = task_id;
_fragment_instance_id = fragment_instance_id;
_limiter_tracker = mem_tracker;
_task_id_stack.push_back(task_id);
_fragment_instance_id_stack.push_back(fragment_instance_id);
_limiter_tracker_stack.push_back(mem_tracker);
_limiter_tracker_raw = mem_tracker.get();
}

void ThreadMemTrackerMgr::detach_limiter_tracker() {
DCHECK(!_limiter_tracker_stack.empty());
flush_untracked_mem<false>();
_task_id = "";
_fragment_instance_id = TUniqueId();
_limiter_tracker = ExecEnv::GetInstance()->process_mem_tracker();
_limiter_tracker_raw = ExecEnv::GetInstance()->process_mem_tracker_raw();
_task_id_stack.pop_back();
_fragment_instance_id_stack.pop_back();
_limiter_tracker_stack.pop_back();
_limiter_tracker_raw = _limiter_tracker_stack.back().get();
}

void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details) {
if (_fragment_instance_id != TUniqueId()) {
if (_fragment_instance_id_stack.back() != TUniqueId()) {
ExecEnv::GetInstance()->fragment_mgr()->cancel(
_fragment_instance_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
_fragment_instance_id_stack.back(), PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
cancel_details);
}
}
Expand Down
35 changes: 22 additions & 13 deletions be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ThreadMemTrackerMgr {

// only for tcmalloc hook
static void consume_no_attach(int64_t size) {
ExecEnv::GetInstance()->process_mem_tracker_raw()->consume(size);
ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume(size);
}

// After thread initialization, calling `init` again must call `clear_untracked_mems` first
Expand Down Expand Up @@ -81,9 +81,11 @@ class ThreadMemTrackerMgr {
template <bool CheckLimit>
void flush_untracked_mem();

bool is_attach_query() { return _fragment_instance_id != TUniqueId(); }
bool is_attach_query() { return _fragment_instance_id_stack.back() != TUniqueId(); }

std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() { return _limiter_tracker; }
std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() {
return _limiter_tracker_stack.back();
}
MemTrackerLimiter* limiter_mem_tracker_raw() { return _limiter_tracker_raw; }

void set_check_limit(bool check_limit) { _check_limit = check_limit; }
Expand All @@ -98,8 +100,8 @@ class ThreadMemTrackerMgr {
return fmt::format(
"ThreadMemTrackerMgr debug, _untracked_mem:{}, _task_id:{}, "
"_limiter_tracker:<{}>, _consumer_tracker_stack:<{}>",
std::to_string(_untracked_mem), _task_id, _limiter_tracker->log_usage(1),
fmt::to_string(consumer_tracker_buf));
std::to_string(_untracked_mem), _task_id_stack.back(),
_limiter_tracker_raw->log_usage(1), fmt::to_string(consumer_tracker_buf));
}

private:
Expand All @@ -115,7 +117,8 @@ class ThreadMemTrackerMgr {
int64_t old_untracked_mem = 0;
std::string failed_msg = std::string();

std::shared_ptr<MemTrackerLimiter> _limiter_tracker;
// _limiter_tracker_stack[0] = orphan_mem_tracker
std::vector<std::shared_ptr<MemTrackerLimiter>> _limiter_tracker_stack;
MemTrackerLimiter* _limiter_tracker_raw;
std::vector<MemTracker*> _consumer_tracker_stack;

Expand All @@ -124,16 +127,22 @@ class ThreadMemTrackerMgr {
// If there is a memory new/delete operation in the consume method, it may enter infinite recursion.
bool _stop_consume = false;
bool _check_attach = true;
std::string _task_id;
TUniqueId _fragment_instance_id;
std::vector<std::string> _task_id_stack;
std::vector<TUniqueId> _fragment_instance_id_stack;
ExceedCallBack _cb_func = nullptr;
};

inline void ThreadMemTrackerMgr::init() {
DCHECK(_consumer_tracker_stack.empty());
_task_id = "";
_limiter_tracker = ExecEnv::GetInstance()->process_mem_tracker();
_limiter_tracker_raw = ExecEnv::GetInstance()->process_mem_tracker_raw();
// _limiter_tracker_stack[0] = orphan_mem_tracker
DCHECK(_limiter_tracker_stack.size() <= 1)
<< "limiter_tracker_stack.size(): " << _limiter_tracker_stack.size();
if (_limiter_tracker_stack.size() == 0) {
_limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker());
_limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw();
_task_id_stack.push_back("");
_fragment_instance_id_stack.push_back(TUniqueId());
}
_check_limit = true;
}

Expand Down Expand Up @@ -173,7 +182,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
// the TCMalloc Hook again, so suspend consumption to avoid falling into an infinite loop.
_stop_consume = true;
old_untracked_mem = _untracked_mem;
DCHECK(_limiter_tracker);
DCHECK(_limiter_tracker_raw);
if (CheckLimit) {
#ifndef BE_TEST
// When all threads are started, `attach_limiter_tracker` is expected to be called to bind the limiter tracker.
Expand All @@ -183,7 +192,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
// TODO(zxy) The current p0 test cannot guarantee that all threads are checked,
// so disable it and try to open it when memory tracking is not on time.
// DCHECK(!_check_attach || btls_key != EMPTY_BTLS_KEY ||
// _limiter_tracker->label() != "Process");
// _limiter_tracker_raw->label() != "Process");
#endif
if (!_limiter_tracker_raw->try_consume(old_untracked_mem, failed_msg)) {
// The memory has been allocated, so when TryConsume fails, need to continue to complete
Expand Down
9 changes: 4 additions & 5 deletions be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,13 @@ Status RuntimeState::init(const TUniqueId& fragment_instance_id, const TQueryOpt
Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) {
bool has_query_mem_tracker = _query_options.__isset.mem_limit && (_query_options.mem_limit > 0);
int64_t bytes_limit = has_query_mem_tracker ? _query_options.mem_limit : -1;
if (bytes_limit > ExecEnv::GetInstance()->process_mem_tracker_raw()->limit()) {
if (bytes_limit > ExecEnv::GetInstance()->process_mem_tracker()->limit()) {
VLOG_NOTICE << "Query memory limit " << PrettyPrinter::print(bytes_limit, TUnit::BYTES)
<< " exceeds process memory limit of "
<< PrettyPrinter::print(
ExecEnv::GetInstance()->process_mem_tracker_raw()->limit(),
TUnit::BYTES)
<< PrettyPrinter::print(ExecEnv::GetInstance()->process_mem_tracker()->limit(),
TUnit::BYTES)
<< ". Using process memory limit instead";
bytes_limit = ExecEnv::GetInstance()->process_mem_tracker_raw()->limit();
bytes_limit = ExecEnv::GetInstance()->process_mem_tracker()->limit();
}
auto mem_tracker_counter = ADD_COUNTER(&_profile, "MemoryLimit", TUnit::BYTES);
mem_tracker_counter->set(bytes_limit);
Expand Down
2 changes: 1 addition & 1 deletion be/src/service/doris_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ int main(int argc, char** argv) {
doris::ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->logout_task_mem_tracker();
// The process tracker print log usage interval is 1s to avoid a large number of tasks being
// canceled when the process exceeds the mem limit, resulting in too many duplicate logs.
doris::ExecEnv::GetInstance()->process_mem_tracker_raw()->enable_print_log_usage();
doris::ExecEnv::GetInstance()->process_mem_tracker()->enable_print_log_usage();
sleep(1);
}

Expand Down
8 changes: 4 additions & 4 deletions be/src/vec/common/pod_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class PODArrayBase : private boost::noncopyable,
inline void reset_peak() {
if (UNLIKELY(c_end - c_end_peak > 65536)) {
THREAD_MEM_TRACKER_TRANSFER_FROM(c_end - c_end_peak,
ExecEnv::GetInstance()->process_mem_tracker_raw());
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
c_end_peak = c_end;
}
}
Expand All @@ -127,7 +127,7 @@ class PODArrayBase : private boost::noncopyable,
template <typename... TAllocatorParams>
void alloc(size_t bytes, TAllocatorParams&&... allocator_params) {
THREAD_MEM_TRACKER_TRANSFER_TO(bytes - pad_right - pad_left,
ExecEnv::GetInstance()->process_mem_tracker_raw());
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
c_start = c_end = c_end_peak =
reinterpret_cast<char*>(TAllocator::alloc(
bytes, std::forward<TAllocatorParams>(allocator_params)...)) +
Expand All @@ -144,7 +144,7 @@ class PODArrayBase : private boost::noncopyable,

TAllocator::free(c_start - pad_left, allocated_bytes());
THREAD_MEM_TRACKER_TRANSFER_FROM(c_end_of_storage - c_end_peak,
ExecEnv::GetInstance()->process_mem_tracker_raw());
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
}

template <typename... TAllocatorParams>
Expand All @@ -157,7 +157,7 @@ class PODArrayBase : private boost::noncopyable,
unprotect();

THREAD_MEM_TRACKER_TRANSFER_TO(bytes - allocated_bytes(),
ExecEnv::GetInstance()->process_mem_tracker_raw());
ExecEnv::GetInstance()->orphan_mem_tracker_raw());

ptrdiff_t end_diff = c_end - c_start;

Expand Down
Loading

0 comments on commit 55e7767

Please sign in to comment.