Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 130 additions & 24 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ bool ExchangeSinkBuffer::is_pending_finish() {
bool need_cancel = _context->is_canceled();

for (auto& pair : _instance_to_package_queue_mutex) {
std::unique_lock<std::mutex> lock(*(pair.second));
std::unique_lock lock(*(pair.second));
auto& id = pair.first;
if (!_instance_to_sending_by_pipeline.at(id)) {
// when pending finish, we need check whether current query is cancelled
Expand All @@ -104,7 +104,7 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) {
if (_instance_to_package_queue_mutex.count(low_id)) {
return;
}
_instance_to_package_queue_mutex[low_id] = std::make_unique<std::mutex>();
_instance_to_package_queue_mutex[low_id] = std::make_unique<bthread::Mutex>();
_instance_to_seq[low_id] = 0;
_instance_to_package_queue[low_id] = std::queue<TransmitInfo, std::list<TransmitInfo>>();
_instance_to_broadcast_package_queue[low_id] =
Expand All @@ -114,8 +114,14 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) {
finst_id.set_lo(fragment_instance_id.lo);
_instance_to_sending_by_pipeline[low_id] = true;
_instance_to_rpc_ctx[low_id] = {};
_instance_watcher[low_id] = {};
_instance_watcher[low_id].start();
_instance_to_receiver_eof[low_id] = false;
_instance_to_rpc_time[low_id] = 0;
_instance_to_rpc_exec_delay_time[low_id] = 0;
_instance_to_rpc_exec_time[low_id] = 0;
_instance_to_rpc_callback_time[low_id] = 0;
_instance_to_rpc_callback_exec_time[low_id] = 0;
_construct_request(low_id, finst_id);
}

Expand All @@ -126,7 +132,7 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
TUniqueId ins_id = request.channel->_fragment_instance_id;
bool send_now = false;
{
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[ins_id.lo]);
std::unique_lock lock(*_instance_to_package_queue_mutex[ins_id.lo]);
// Do not have in process rpc, directly send
if (_instance_to_sending_by_pipeline[ins_id.lo]) {
send_now = true;
Expand All @@ -152,7 +158,7 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
bool send_now = false;
request.block_holder->ref();
{
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[ins_id.lo]);
std::unique_lock lock(*_instance_to_package_queue_mutex[ins_id.lo]);
// Do not have in process rpc, directly send
if (_instance_to_sending_by_pipeline[ins_id.lo]) {
send_now = true;
Expand All @@ -168,7 +174,7 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
}

Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
std::unique_lock lock(*_instance_to_package_queue_mutex[id]);

DCHECK(_instance_to_sending_by_pipeline[id] == false);

Expand Down Expand Up @@ -205,7 +211,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos,
const PTransmitDataResult& result,
const int64_t& start_rpc_time) {
set_rpc_time(id, start_rpc_time, result.receive_time());
auto callback_start_time = GetCurrentTimeNanos();
Status s(Status::create(result.status()));
if (s.is<ErrorCode::END_OF_FILE>()) {
_set_receiver_eof(id);
Expand All @@ -217,6 +223,9 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
} else {
_send_rpc(id);
}
auto callback_end_time = GetCurrentTimeNanos();
set_rpc_time(id, start_rpc_time, result.receive_time(), result.exec_start_time(),
result.exec_end_time(), callback_start_time, callback_end_time);
});
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
Expand Down Expand Up @@ -258,7 +267,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos,
const PTransmitDataResult& result,
const int64_t& start_rpc_time) {
set_rpc_time(id, start_rpc_time, result.receive_time());
auto callback_start_time = GetCurrentTimeNanos();
Status s(Status::create(result.status()));
if (s.is<ErrorCode::END_OF_FILE>()) {
_set_receiver_eof(id);
Expand All @@ -270,6 +279,9 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
} else {
_send_rpc(id);
}
auto callback_end_time = GetCurrentTimeNanos();
set_rpc_time(id, start_rpc_time, result.receive_time(), result.exec_start_time(),
result.exec_end_time(), callback_start_time, callback_end_time);
});
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
Expand Down Expand Up @@ -303,8 +315,9 @@ void ExchangeSinkBuffer::_construct_request(InstanceLoId id, PUniqueId finst_id)
}

void ExchangeSinkBuffer::_ended(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
std::unique_lock lock(*_instance_to_package_queue_mutex[id]);
_instance_to_sending_by_pipeline[id] = true;
_instance_watcher[id].stop();
}

void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) {
Expand All @@ -314,27 +327,71 @@ void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) {
}

void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
std::unique_lock lock(*_instance_to_package_queue_mutex[id]);
_instance_to_receiver_eof[id] = true;
_instance_to_sending_by_pipeline[id] = true;
}

bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
std::unique_lock lock(*_instance_to_package_queue_mutex[id]);
return _instance_to_receiver_eof[id];
}

void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* min_time) {
void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* min_time,
int64_t* max_exec_delay_time,
int64_t* min_exec_delay_time, int64_t* max_exec_time,
int64_t* min_exec_time, int64_t* max_callback_time,
int64_t* min_callback_time,
int64_t* max_callback_exec_time,
int64_t* min_callback_exec_time) {
int64_t local_max_time = 0;
int64_t local_max_exec_delay_time = 0;
int64_t local_max_exec_time = 0;
int64_t local_max_callback_time = 0;
int64_t local_max_callback_exec_time = 0;
int64_t local_min_time = INT64_MAX;
int64_t local_min_exec_delay_time = INT64_MAX;
int64_t local_min_exec_time = INT64_MAX;
int64_t local_min_callback_time = INT64_MAX;
int64_t local_min_callback_exec_time = INT64_MAX;
for (auto& [id, time] : _instance_to_rpc_time) {
if (time != 0) {
local_max_time = std::max(local_max_time, time);
local_min_time = std::min(local_min_time, time);
}
auto& exec_delay_time = _instance_to_rpc_exec_delay_time[id];
if (exec_delay_time != 0) {
local_max_exec_delay_time = std::max(local_max_exec_delay_time, exec_delay_time);
local_min_exec_delay_time = std::min(local_min_exec_delay_time, exec_delay_time);
}
auto& exec_time = _instance_to_rpc_exec_time[id];
if (exec_time != 0) {
local_max_exec_time = std::max(local_max_exec_time, exec_time);
local_min_exec_time = std::min(local_min_exec_time, exec_time);
}
auto& callback_time = _instance_to_rpc_callback_time[id];
if (callback_time != 0) {
local_max_callback_time = std::max(local_max_callback_time, callback_time);
local_min_callback_time = std::min(local_min_callback_time, callback_time);
}
auto& callback_exec_time = _instance_to_rpc_callback_exec_time[id];
if (callback_exec_time != 0) {
local_max_callback_exec_time =
std::max(local_max_callback_exec_time, callback_exec_time);
local_min_callback_exec_time =
std::min(local_min_callback_exec_time, callback_exec_time);
}
}
*max_time = local_max_time;
*max_exec_delay_time = local_max_exec_delay_time;
*max_exec_time = local_max_exec_time;
*max_callback_time = local_max_callback_time;
*max_callback_exec_time = local_max_callback_exec_time;
*min_time = local_min_time;
*min_exec_delay_time = local_min_exec_delay_time;
*min_exec_time = local_min_exec_time;
*min_callback_time = local_min_callback_time;
*min_callback_exec_time = local_min_callback_exec_time;
}

int64_t ExchangeSinkBuffer::get_sum_rpc_time() {
Expand All @@ -345,31 +402,80 @@ int64_t ExchangeSinkBuffer::get_sum_rpc_time() {
return sum_time;
}

void ExchangeSinkBuffer::set_rpc_time(InstanceLoId id, int64_t start_rpc_time,
int64_t receive_rpc_time) {
void ExchangeSinkBuffer::set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_time,
int64_t exec_start, int64_t exec_end,
int64_t callback_start_time, int64_t callback_end_time) {
_rpc_count++;
int64_t rpc_spend_time = receive_rpc_time - start_rpc_time;
DCHECK(_instance_to_rpc_time.find(id) != _instance_to_rpc_time.end());
if (rpc_spend_time > 0) {
_instance_to_rpc_time[id] += rpc_spend_time;
int64_t rpc_forward_time = receive_time - start_rpc_time;
int64_t rpc_exec_delay_time = exec_start - receive_time;
int64_t rpc_exec_time = exec_end - exec_start;
int64_t rpc_callback_time = callback_start_time - exec_end;
int64_t callback_exec_time = callback_end_time - callback_start_time;
if (rpc_forward_time > 0) {
_instance_to_rpc_time[id] += rpc_forward_time;
}
if (rpc_exec_delay_time > 0) {
_instance_to_rpc_exec_delay_time[id] += rpc_exec_delay_time;
}
if (rpc_exec_time > 0) {
_instance_to_rpc_exec_time[id] += rpc_exec_time;
}
if (rpc_callback_time > 0) {
_instance_to_rpc_callback_time[id] += rpc_callback_time;
}
if (callback_exec_time > 0) {
_instance_to_rpc_callback_exec_time[id] += callback_exec_time;
}
}

void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) {
auto* _max_rpc_timer = ADD_TIMER(profile, "RpcMaxTime");
auto* _min_rpc_timer = ADD_TIMER(profile, "RpcMinTime");
auto* _sum_rpc_timer = ADD_TIMER(profile, "RpcSumTime");
auto* _count_rpc = ADD_COUNTER(profile, "RpcCount", TUnit::UNIT);
auto* _avg_rpc_timer = ADD_TIMER(profile, "RpcAvgTime");

int64_t max_rpc_time = 0, min_rpc_time = 0;
get_max_min_rpc_time(&max_rpc_time, &min_rpc_time);
auto* _count_rpc = ADD_COUNTER(profile, "Rpc0Count", TUnit::UNIT);
auto* _max_rpc_timer = ADD_TIMER(profile, "Rpc1MaxTime");
auto* _min_rpc_timer = ADD_TIMER(profile, "Rpc1MinTime");
auto* _sum_rpc_timer = ADD_TIMER(profile, "Rpc1SumTime");
auto* _avg_rpc_timer = ADD_TIMER(profile, "Rpc1AvgTime");

auto* _max_rpc_exec_delay_timer = ADD_TIMER(profile, "Rpc2MaxExecDelayTime");
auto* _min_rpc_exec_delay_timer = ADD_TIMER(profile, "Rpc2MinExecDelayTime");

auto* _max_rpc_exec_timer = ADD_TIMER(profile, "Rpc3MaxExecTime");
auto* _min_rpc_exec_timer = ADD_TIMER(profile, "Rpc3MinExecTime");

auto* _max_rpc_callback_timer = ADD_TIMER(profile, "Rpc4MaxCallbackTime");
auto* _min_rpc_callback_timer = ADD_TIMER(profile, "Rpc4MinCallbackTime");

auto* _max_rpc_callback_exec_timer = ADD_TIMER(profile, "Rpc5MaxCallbackExecTime");
auto* _min_rpc_callback_exec_timer = ADD_TIMER(profile, "Rpc5MinCallbackExecTime");

int64_t max_rpc_time = 0, min_rpc_time = 0, max_exec_delay_t = 0, min_exec_delay_t = 0,
max_exec_t, min_exec_t = 0, max_callback_t = 0, min_callback_t = 0,
max_callback_exec_t = 0, min_callback_exec_t = 0;
get_max_min_rpc_time(&max_rpc_time, &min_rpc_time, &max_exec_delay_t, &min_exec_delay_t,
&max_exec_t, &min_exec_t, &max_callback_t, &min_callback_t,
&max_callback_exec_t, &min_callback_exec_t);
_max_rpc_timer->set(max_rpc_time);
_min_rpc_timer->set(min_rpc_time);
_max_rpc_exec_delay_timer->set(max_exec_delay_t);
_min_rpc_exec_delay_timer->set(min_exec_delay_t);
_max_rpc_exec_timer->set(max_exec_t);
_min_rpc_exec_timer->set(min_exec_t);

_max_rpc_callback_timer->set(max_callback_t);
_min_rpc_callback_timer->set(min_callback_t);
_max_rpc_callback_exec_timer->set(max_callback_exec_t);
_min_rpc_callback_exec_timer->set(min_callback_exec_t);

_count_rpc->set(_rpc_count);
int64_t sum_time = get_sum_rpc_time();
_sum_rpc_timer->set(sum_time);
_avg_rpc_timer->set(sum_time / std::max(static_cast<int64_t>(1), _rpc_count.load()));

uint64_t max_end_time = 0;
for (auto& [id, timer] : _instance_watcher) {
max_end_time = std::max(timer.elapsed_time(), max_end_time);
}
auto* _max_end_timer = ADD_TIMER(profile, "MaxRpcEndTime");
_max_end_timer->set(static_cast<int64_t>(max_end_time));
}
} // namespace doris::pipeline
18 changes: 15 additions & 3 deletions be/src/pipeline/exec/exchange_sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#pragma once

#include <brpc/controller.h>
#include <bthread/mutex.h>
#include <gen_cpp/data.pb.h>
#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/types.pb.h>
Expand Down Expand Up @@ -170,11 +171,13 @@ class ExchangeSinkBuffer {
bool can_write() const;
bool is_pending_finish();
void close();
void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_rpc_time);
void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_time,
int64_t exec_start, int64_t exec_end, int64_t callback_start_time,
int64_t callback_end_time);
void update_profile(RuntimeProfile* profile);

private:
phmap::flat_hash_map<InstanceLoId, std::unique_ptr<std::mutex>>
phmap::flat_hash_map<InstanceLoId, std::unique_ptr<bthread::Mutex>>
_instance_to_package_queue_mutex;
// store data in non-broadcast shuffle
phmap::flat_hash_map<InstanceLoId, std::queue<TransmitInfo, std::list<TransmitInfo>>>
Expand All @@ -189,8 +192,13 @@ class ExchangeSinkBuffer {
phmap::flat_hash_map<InstanceLoId, PackageSeq> _instance_to_seq;
phmap::flat_hash_map<InstanceLoId, std::unique_ptr<PTransmitDataParams>> _instance_to_request;
phmap::flat_hash_map<InstanceLoId, bool> _instance_to_sending_by_pipeline;
phmap::flat_hash_map<InstanceLoId, MonotonicStopWatch> _instance_watcher;
phmap::flat_hash_map<InstanceLoId, bool> _instance_to_receiver_eof;
phmap::flat_hash_map<InstanceLoId, int64_t> _instance_to_rpc_time;
phmap::flat_hash_map<InstanceLoId, int64_t> _instance_to_rpc_exec_delay_time;
phmap::flat_hash_map<InstanceLoId, int64_t> _instance_to_rpc_exec_time;
phmap::flat_hash_map<InstanceLoId, int64_t> _instance_to_rpc_callback_time;
phmap::flat_hash_map<InstanceLoId, int64_t> _instance_to_rpc_callback_exec_time;
phmap::flat_hash_map<InstanceLoId, ExchangeRpcContext> _instance_to_rpc_ctx;

std::atomic<bool> _is_finishing;
Expand All @@ -209,7 +217,11 @@ class ExchangeSinkBuffer {
inline void _failed(InstanceLoId id, const std::string& err);
inline void _set_receiver_eof(InstanceLoId id);
inline bool _is_receiver_eof(InstanceLoId id);
void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time);
void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time, int64_t* max_exec_delay_time,
int64_t* min_exec_delay_time, int64_t* max_exec_time,
int64_t* min_exec_time, int64_t* max_callback_time,
int64_t* min_callback_time, int64_t* max_callback_exec_time,
int64_t* min_callback_exec_time);
int64_t get_sum_rpc_time();
};

Expand Down
Loading