Skip to content

Commit

Permalink
[fix](streamload&sink) release and allocate memory in the same tracker (
Browse files Browse the repository at this point in the history
#12820)

1. HttpServer threads allocate bytebuffer and put them into streamload pipe, but scanner thread release them with query tracker.
2. We can assume brpc allocate memory in doris thread.

Above problems leads to wrong result of memtracker.
  • Loading branch information
dataroaring committed Sep 23, 2022
1 parent bd12a49 commit a7d42b5
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 18 deletions.
17 changes: 13 additions & 4 deletions be/src/exec/tablet_sink.cpp
Expand Up @@ -196,6 +196,7 @@ Status NodeChannel::open_wait() {
// add batch closure
_add_batch_closure = ReusableClosure<PTabletWriterAddBatchResult>::create();
_add_batch_closure->addFailedHandler([this](bool is_last_rpc) {
SCOPED_ATTACH_TASK(_state);
std::lock_guard<std::mutex> l(this->_closed_lock);
if (this->_is_closed) {
// if the node channel is closed, no need to call `mark_as_failed`,
Expand All @@ -217,6 +218,7 @@ Status NodeChannel::open_wait() {

_add_batch_closure->addSuccessHandler([this](const PTabletWriterAddBatchResult& result,
bool is_last_rpc) {
SCOPED_ATTACH_TASK(_state);
std::lock_guard<std::mutex> l(this->_closed_lock);
if (this->_is_closed) {
// if the node channel is closed, no need to call the following logic,
Expand Down Expand Up @@ -578,12 +580,19 @@ void NodeChannel::try_send_batch(RuntimeState* state) {
brpc_url + "/PInternalServiceImpl/tablet_writer_add_batch_by_http";
_add_batch_closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
_add_batch_closure->cntl.http_request().set_content_type("application/json");
_brpc_http_stub->tablet_writer_add_batch_by_http(
&_add_batch_closure->cntl, NULL, &_add_batch_closure->result, _add_batch_closure);
{
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker());
_brpc_http_stub->tablet_writer_add_batch_by_http(&_add_batch_closure->cntl, NULL,
&_add_batch_closure->result,
_add_batch_closure);
}
} else {
_add_batch_closure->cntl.http_request().Clear();
_stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request,
&_add_batch_closure->result, _add_batch_closure);
{
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker());
_stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request,
&_add_batch_closure->result, _add_batch_closure);
}
}
_next_packet_seq++;
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/tablet_sink.h
Expand Up @@ -96,6 +96,8 @@ class ReusableClosure final : public google::protobuf::Closure {
~ReusableClosure() override {
// shouldn't delete when Run() is calling or going to be called, wait for current Run() done.
join();
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker());
cntl.Reset();
}

static ReusableClosure<T>* create() { return new ReusableClosure<T>(); }
Expand All @@ -122,6 +124,7 @@ class ReusableClosure final : public google::protobuf::Closure {

// plz follow this order: reset() -> set_in_flight() -> send brpc batch
void reset() {
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker());
cntl.Reset();
cid = cntl.call_id();
}
Expand Down
10 changes: 8 additions & 2 deletions be/src/runtime/stream_load/stream_load_pipe.h
Expand Up @@ -24,6 +24,7 @@
#include "gen_cpp/internal_service.pb.h"
#include "io/file_reader.h"
#include "runtime/message_body_sink.h"
#include "runtime/thread_context.h"
#include "util/bit_util.h"
#include "util/byte_buffer.h"

Expand All @@ -32,6 +33,7 @@ namespace doris {
const size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024;
// StreamLoadPipe use to transfer data from producer to consumer
// Data in pip is stored in chunks.

class StreamLoadPipe : public MessageBodySink, public FileReader {
public:
StreamLoadPipe(size_t max_buffered_bytes = kMaxPipeBufferedBytes,
Expand All @@ -43,7 +45,11 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {
_min_chunk_size(min_chunk_size),
_total_length(total_length),
_use_proto(use_proto) {}
virtual ~StreamLoadPipe() {}

virtual ~StreamLoadPipe() {
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker());
while (!_buf_queue.empty()) _buf_queue.pop_front();
}

Status open() override { return Status::OK(); }

Expand Down Expand Up @@ -113,6 +119,7 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {
}

Status read(uint8_t* data, int64_t data_size, int64_t* bytes_read, bool* eof) override {
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker());
*bytes_read = 0;
while (*bytes_read < data_size) {
std::unique_lock<std::mutex> l(_lock);
Expand Down Expand Up @@ -209,7 +216,6 @@ class StreamLoadPipe : public MessageBodySink, public FileReader {
*length = buf->remaining();
data->reset(new uint8_t[*length]);
buf->get_bytes((char*)(data->get()), *length);

_buf_queue.pop_front();
_buffered_bytes -= buf->limit;
if (_use_proto) {
Expand Down
6 changes: 0 additions & 6 deletions be/src/runtime/thread_context.h
Expand Up @@ -134,12 +134,6 @@ class ThreadContext {
void attach_task(const TaskType& type, const std::string& task_id,
const TUniqueId& fragment_instance_id,
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
#ifndef BE_TEST
DCHECK((_type == TaskType::UNKNOWN || _type == TaskType::BRPC) && _task_id == "")
<< ",new tracker label: " << mem_tracker->label() << ",old tracker label: "
<< _thread_mem_tracker_mgr->limiter_mem_tracker_raw()->label();
#endif
DCHECK(type != TaskType::UNKNOWN);
_type = type;
_task_id = task_id;
_fragment_instance_id = fragment_instance_id;
Expand Down
16 changes: 14 additions & 2 deletions be/src/vec/common/pod_array.h
Expand Up @@ -160,13 +160,15 @@ class PODArrayBase : private boost::noncopyable,
ExecEnv::GetInstance()->orphan_mem_tracker_raw());

ptrdiff_t end_diff = c_end - c_start;
ptrdiff_t peak_diff = c_end_peak - c_start;

c_start = reinterpret_cast<char*>(TAllocator::realloc(
c_start - pad_left, allocated_bytes(), bytes,
std::forward<TAllocatorParams>(allocator_params)...)) +
pad_left;

c_end = c_end_peak = c_start + end_diff;
c_end = c_start + end_diff;
c_end_peak = c_start + peak_diff;
c_end_of_storage = c_start + bytes - pad_right - pad_left;
}

Expand Down Expand Up @@ -517,9 +519,11 @@ class PODArray : public PODArrayBase<sizeof(T), initial_bytes, TAllocator, pad_r
auto swap_stack_heap = [this](PODArray& arr1, PODArray& arr2) {
size_t stack_size = arr1.size();
size_t stack_allocated = arr1.allocated_bytes();
size_t stack_peak_used = arr1.c_end_peak - arr1.c_start;

size_t heap_size = arr2.size();
size_t heap_allocated = arr2.allocated_bytes();
size_t heap_peak_used = arr2.c_end_peak - arr2.c_start;

/// Keep track of the stack content we have to copy.
char* stack_c_start = arr1.c_start;
Expand All @@ -528,13 +532,17 @@ class PODArray : public PODArrayBase<sizeof(T), initial_bytes, TAllocator, pad_r
arr1.c_start = arr2.c_start;
arr1.c_end_of_storage = arr1.c_start + heap_allocated - arr1.pad_right;
arr1.c_end = arr1.c_start + this->byte_size(heap_size);
arr1.c_end_peak = arr2.c_end_peak;
arr1.c_end_peak = arr1.c_end;
THREAD_MEM_TRACKER_TRANSFER_FROM(arr1.c_end_peak - arr1.c_start - heap_peak_used,
ExecEnv::GetInstance()->orphan_mem_tracker_raw());

/// Allocate stack space for arr2.
arr2.alloc(stack_allocated);
/// Copy the stack content.
memcpy(arr2.c_start, stack_c_start, this->byte_size(stack_size));
arr2.c_end = arr2.c_end_peak = arr2.c_start + this->byte_size(stack_size);
THREAD_MEM_TRACKER_TRANSFER_FROM(arr2.c_end_peak - arr2.c_start - stack_peak_used,
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
};

auto do_move = [this](PODArray& src, PODArray& dest) {
Expand All @@ -543,7 +551,11 @@ class PODArray : public PODArrayBase<sizeof(T), initial_bytes, TAllocator, pad_r
dest.alloc(src.allocated_bytes());
memcpy(dest.c_start, src.c_start, this->byte_size(src.size()));
dest.c_end = dest.c_end_peak = dest.c_start + (src.c_end - src.c_start);
THREAD_MEM_TRACKER_TRANSFER_FROM(dest.c_end_peak - dest.c_start,
ExecEnv::GetInstance()->orphan_mem_tracker_raw());

THREAD_MEM_TRACKER_TRANSFER_FROM(src.c_end_of_storage - src.c_end_peak,
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
src.c_start = Base::null;
src.c_end = Base::null;
src.c_end_of_storage = Base::null;
Expand Down
18 changes: 14 additions & 4 deletions be/src/vec/sink/vtablet_sink.cpp
Expand Up @@ -80,6 +80,7 @@ Status VNodeChannel::open_wait() {
// add block closure
_add_block_closure = ReusableClosure<PTabletWriterAddBlockResult>::create();
_add_block_closure->addFailedHandler([this](bool is_last_rpc) {
SCOPED_ATTACH_TASK(_state);
std::lock_guard<std::mutex> l(this->_closed_lock);
if (this->_is_closed) {
// if the node channel is closed, no need to call `mark_as_failed`,
Expand All @@ -101,6 +102,7 @@ Status VNodeChannel::open_wait() {

_add_block_closure->addSuccessHandler([this](const PTabletWriterAddBlockResult& result,
bool is_last_rpc) {
SCOPED_ATTACH_TASK(_state);
std::lock_guard<std::mutex> l(this->_closed_lock);
if (this->_is_closed) {
// if the node channel is closed, no need to call the following logic,
Expand Down Expand Up @@ -349,12 +351,20 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
brpc_url + "/PInternalServiceImpl/tablet_writer_add_block_by_http";
_add_block_closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
_add_block_closure->cntl.http_request().set_content_type("application/json");
_brpc_http_stub->tablet_writer_add_block_by_http(
&_add_block_closure->cntl, NULL, &_add_block_closure->result, _add_block_closure);

{
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker());
_brpc_http_stub->tablet_writer_add_block_by_http(&_add_block_closure->cntl, NULL,
&_add_block_closure->result,
_add_block_closure);
}
} else {
_add_block_closure->cntl.http_request().Clear();
_stub->tablet_writer_add_block(&_add_block_closure->cntl, &request,
&_add_block_closure->result, _add_block_closure);
{
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker());
_stub->tablet_writer_add_block(&_add_block_closure->cntl, &request,
&_add_block_closure->result, _add_block_closure);
}
}

_next_packet_seq++;
Expand Down

0 comments on commit a7d42b5

Please sign in to comment.