Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the memory usage of Loading process #1954

Merged
merged 10 commits into from Oct 15, 2019

Conversation

morningman
Copy link
Contributor

Current load framework using memtable to receive incoming loading data and flush to disk when reaching limit (default is 100MB).
Each tablet corresponds to a memtable, so that if there are many tablets on a Backend, and the loading data is distributed evenly on each of these tablets, the total memory consumption can be very large because all memtables will no be flushed until they reaching the 100MB.
For example, if there are 100 tablets on a Backend, the peak memory consumption can be 10GB(100 * 100MB), and this may cause process killed by system OOM.

This CL try to resolve this problem.

ISSUE #1951

be/src/runtime/load_channel_mgr.h Outdated Show resolved Hide resolved
// return Status::OK if mem is reduced.
Status reduce_mem_usage();

int64_t mem_consumption() { return _mem_tracker->consumption(); }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
int64_t mem_consumption() { return _mem_tracker->consumption(); }
int64_t mem_consumption() const { return _mem_tracker->consumption(); }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (handle != nullptr && request.has_eos() && request.eos()) {
_lastest_success_channel->release(handle);
return Status::OK();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should release handle if handle is not nullptr

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Status LoadChannelMgr::start_bg_worker() {
_load_channels_clean_thread = std::thread(
[this] {
#ifdef GOOGLE_PROFILER
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's wrong? It looks good~

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Macro definitions do not require spaces

#endif

uint32_t interval = 60;
while (true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better join this thread in destructor to avoid invalid visit of destroyed members.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will not detach this thread and join it when deconstructing LoadChannelMgr

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you while true, how does this thread exit?

// index id -> tablets channel
std::unordered_map<int64_t, std::shared_ptr<TabletsChannel>> _tablets_channels;

Cache* _lastest_success_channel = nullptr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::unordered_set<int64_t> is simple and enough, and renaming it _finished_channels is easy to understand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

if (it == _tablets_channels.end()) {
auto handle = _lastest_success_channel->lookup(std::to_string(index_id));
// success only when eos be true
if (handle != nullptr && request.has_eos() && request.eos()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If use _finished_channels, no need to check if request is eos

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK


RETURN_IF_ERROR(channel->open(params));

if (!_opened) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

return st;
}

bool LoadChannel::_find_largest_max_consumption_tablets_channel(std::shared_ptr<TabletsChannel>* channel) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment for lock should be held.
1 + 1 + 80 < 50 + 50
why not choose 80?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may happen but with low possibility. And even if we choose 50, it is not a big deal.
I will eave it unchanged. If it becomes a real case, change it later.

CONF_Int32(write_buffer_size, "104857600");
CONF_Int64(write_buffer_size, "104857600");

// followin 2 configs limit the memory consumption of load process on a Backend.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// followin 2 configs limit the memory consumption of load process on a Backend.
// following 2 configs limit the memory consumption of load process on a Backend.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@@ -347,6 +347,13 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
if (!http_req->header(HTTP_TIMEZONE).empty()) {
request.__set_timezone(http_req->header(HTTP_TIMEZONE));
}
if (!http_req->header(HTTP_EXEC_MEM_LIMIT).empty()) {
try {
request.__set_execMemLimit(std::stoi(http_req->header(HTTP_EXEC_MEM_LIMIT)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stoi return int, it should be stoll

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@@ -283,6 +283,9 @@ class OlapTableSink : public DataSink {

// BE id -> add_batch method counter
std::unordered_map<int64_t, AddBatchCounter> _node_add_batch_counter_map;

// load mem limit is for remote load channel
int64_t _load_mem_limit;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
int64_t _load_mem_limit;
int64_t _load_mem_limit = 0;

better to give a default value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will set default as -1, which means unlimit

@@ -51,9 +52,16 @@ void FlushHandler::on_flush_finished(const FlushResult& res) {
_stats.flush_count.fetch_add(1);
_counter_cond.dec();
}

#if 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why comment this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need, I will remove it. Memtracker will be released when deconstructing memtbale

@@ -54,6 +55,7 @@ struct MemTableFlushContext {
struct FlushResult {
OLAPStatus flush_status;
int64_t flush_time_ns;
int64_t flush_size_bytes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to give a default value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

}

LoadChannel::~LoadChannel() {
LOG(INFO) << "load channel mem peak usage: " << _mem_tracker->peak_consumption()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just want to observe. It won't print too many logs. Maybe removed later.

Status LoadChannelMgr::start_bg_worker() {
_load_channels_clean_thread = std::thread(
[this] {
#ifdef GOOGLE_PROFILER
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Macro definitions do not require spaces

#endif

uint32_t interval = 60;
while (true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you while true, how does this thread exit?

be/src/runtime/load_channel_mgr.h Outdated Show resolved Hide resolved
@@ -57,7 +57,7 @@ class MemTable {
};

RowCursorComparator _row_comparator;
std::unique_ptr<MemTracker> _tracker;
std::unique_ptr<MemTracker> _mem_tracker;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a little many levels of Memory Trackers.
It will hurt performance when the level of concurrence is high.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use 5-level hierarchy of mem trackers:
LoadChannelMgr->LoadChannel->TabletsChannel->DeltaWriter->Memtable.

I think it is OK because:

  1. Load process may not has such high concurrency that updating the mem trackers have impact on it.
  2. The Memtracker is designed for query process, which is ought to support at least 5 levels: process -> pool -> query -> fragment -> sub fragment. (This is what Memtracker's comment said)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Copy link
Contributor

@imay imay left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@imay imay closed this Oct 14, 2019
@imay imay reopened this Oct 14, 2019
@imay imay merged commit 62acf5d into apache:master Oct 15, 2019
wuyunfeng pushed a commit to wuyunfeng/incubator-doris that referenced this pull request Oct 22, 2019
SWJTU-ZhangLei pushed a commit to SWJTU-ZhangLei/incubator-doris that referenced this pull request Jul 25, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants