Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1558,8 +1558,8 @@ void PipelineFragmentContext::_close_fragment_instance() {
}

if (_query_ctx->enable_profile()) {
_query_ctx->add_fragment_profile_x(_fragment_id, collect_realtime_profile_x(),
collect_realtime_load_channel_profile_x());
_query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile_x(),
collect_realtime_load_channel_profile_x());
}

// all submitted tasks done
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1261,7 +1261,7 @@ Status FragmentMgr::get_realtime_exec_status(const TUniqueId& query_id,
}

if (query_context->enable_pipeline_x_exec()) {
*exec_status = query_context->get_realtime_exec_status_x();
*exec_status = query_context->get_realtime_exec_status();
}

return Status::OK();
Expand Down
69 changes: 13 additions & 56 deletions be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ Status QueryContext::set_workload_group(WorkloadGroupPtr& tg) {
return Status::OK();
}

void QueryContext::add_fragment_profile_x(
void QueryContext::add_fragment_profile(
int fragment_id, const std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profiles,
std::shared_ptr<TRuntimeProfileTree> load_channel_profile) {
if (pipeline_profiles.empty()) {
Expand All @@ -375,78 +375,35 @@ void QueryContext::add_fragment_profile_x(
LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline profile count {} ",
print_id(this->_query_id), fragment_id, pipeline_profiles.size());

_profile_map_x.insert(std::make_pair(fragment_id, pipeline_profiles));
_profile_map.insert(std::make_pair(fragment_id, pipeline_profiles));

if (load_channel_profile != nullptr) {
_load_channel_profile_map_x.insert(std::make_pair(fragment_id, load_channel_profile));
}
}

void QueryContext::add_instance_profile(const TUniqueId& instance_id,
std::shared_ptr<TRuntimeProfileTree> profile,
std::shared_ptr<TRuntimeProfileTree> load_channel_profile) {
DCHECK(profile != nullptr) << print_id(instance_id);

std::lock_guard<std::mutex> lg(_profile_mutex);
_profile_map.insert(std::make_pair(instance_id, profile));
if (load_channel_profile != nullptr) {
_load_channel_profile_map.insert(std::make_pair(instance_id, load_channel_profile));
_load_channel_profile_map.insert(std::make_pair(fragment_id, load_channel_profile));
}
}

void QueryContext::_report_query_profile() {
_report_query_profile_x();
_report_query_profile_non_pipeline();
}

void QueryContext::_report_query_profile_non_pipeline() {
if (enable_pipeline_x_exec()) {
return;
}

std::lock_guard<std::mutex> lg(_profile_mutex);
LOG_INFO("Query {}, register query profile, instance profile count {}", print_id(_query_id),
_profile_map.size());

for (auto& [instance_id, instance_profile] : _profile_map) {
std::shared_ptr<TRuntimeProfileTree> load_channel_profile = nullptr;
if (_load_channel_profile_map.contains(instance_id)) {
load_channel_profile = _load_channel_profile_map[instance_id];
}

ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_instance_profile(
_query_id, this->coord_addr, instance_id, instance_profile, load_channel_profile);
}

ExecEnv::GetInstance()->runtime_query_statistics_mgr()->trigger_report_profile();
}

void QueryContext::_report_query_profile_x() {
if (!enable_pipeline_x_exec()) {
return;
}

std::lock_guard<std::mutex> lg(_profile_mutex);
LOG_INFO(
"Pipeline x query context, register query profile, query {}, fragment profile count {}",
print_id(_query_id), _profile_map_x.size());
print_id(_query_id), _profile_map.size());

for (auto& [fragment_id, fragment_profile] : _profile_map_x) {
for (auto& [fragment_id, fragment_profile] : _profile_map) {
std::shared_ptr<TRuntimeProfileTree> load_channel_profile = nullptr;

if (_load_channel_profile_map_x.contains(fragment_id)) {
load_channel_profile = _load_channel_profile_map_x[fragment_id];
if (_load_channel_profile_map.contains(fragment_id)) {
load_channel_profile = _load_channel_profile_map[fragment_id];
}

ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_fragment_profile_x(
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_fragment_profile(
_query_id, this->coord_addr, fragment_id, fragment_profile, load_channel_profile);
}

ExecEnv::GetInstance()->runtime_query_statistics_mgr()->trigger_report_profile();
}

std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
QueryContext::_collect_realtime_query_profile_x() const {
QueryContext::_collect_realtime_query_profile() const {
std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> res;

if (!enable_pipeline_x_exec()) {
Expand Down Expand Up @@ -482,20 +439,20 @@ QueryContext::_collect_realtime_query_profile_x() const {
return res;
}

TReportExecStatusParams QueryContext::get_realtime_exec_status_x() const {
TReportExecStatusParams QueryContext::get_realtime_exec_status() const {
TReportExecStatusParams exec_status;

if (enable_pipeline_x_exec()) {
auto realtime_query_profile = _collect_realtime_query_profile_x();
auto realtime_query_profile = _collect_realtime_query_profile();
std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profiles;

for (auto load_channel_profile : _load_channel_profile_map_x) {
for (auto load_channel_profile : _load_channel_profile_map) {
if (load_channel_profile.second != nullptr) {
load_channel_profiles.push_back(load_channel_profile.second);
}
}

exec_status = RuntimeQueryStatiticsMgr::create_report_exec_status_params_x(
exec_status = RuntimeQueryStatiticsMgr::create_report_exec_status_params(
this->_query_id, std::move(realtime_query_profile),
std::move(load_channel_profiles), /*is_done=*/false);
} else {
Expand Down
26 changes: 7 additions & 19 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ class QueryContext {

std::mutex _profile_mutex;

// when fragment of pipeline x is closed, it will register its profile to this map by using add_fragment_profile_x
// when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile
// flatten profile of one fragment:
// Pipeline 0
// PipelineTask 0
Expand All @@ -339,34 +339,22 @@ class QueryContext {
// PipelineTask 3
// Operator 3
// fragment_id -> list<profile>
std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> _profile_map_x;
std::unordered_map<int, std::shared_ptr<TRuntimeProfileTree>> _load_channel_profile_map_x;

// instance_id -> profile
std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>> _profile_map;
std::unordered_map<TUniqueId, std::shared_ptr<TRuntimeProfileTree>> _load_channel_profile_map;
std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> _profile_map;
std::unordered_map<int, std::shared_ptr<TRuntimeProfileTree>> _load_channel_profile_map;

void _report_query_profile();
void _report_query_profile_non_pipeline();
void _report_query_profile_x();

std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
_collect_realtime_query_profile_x() const;

std::unordered_map<TUniqueId, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
_collect_realtime_query_profile_non_pipeline() const;
_collect_realtime_query_profile() const;

public:
// when fragment of pipeline x is closed, it will register its profile to this map by using add_fragment_profile_x
void add_fragment_profile_x(
// when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile
void add_fragment_profile(
int fragment_id,
const std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profile,
std::shared_ptr<TRuntimeProfileTree> load_channel_profile);

void add_instance_profile(const TUniqueId& iid, std::shared_ptr<TRuntimeProfileTree> profile,
std::shared_ptr<TRuntimeProfileTree> load_channel_profile);

TReportExecStatusParams get_realtime_exec_status_x() const;
TReportExecStatusParams get_realtime_exec_status() const;

bool enable_profile() const {
return _query_options.__isset.enable_profile && _query_options.enable_profile;
Expand Down
Loading