Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,14 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
_timeout = request.query_options.execution_timeout;
}

_runtime_profile = std::make_unique<RuntimeProfile>("PipelineContext");
_prepare_timer = ADD_TIMER(_runtime_profile, "PrepareTime");
_fragment_level_profile = std::make_unique<RuntimeProfile>("PipelineContext");
_prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
SCOPED_TIMER(_prepare_timer);
_build_pipelines_timer = ADD_TIMER(_runtime_profile, "BuildPipelinesTime");
_init_context_timer = ADD_TIMER(_runtime_profile, "InitContextTime");
_plan_local_exchanger_timer = ADD_TIMER(_runtime_profile, "PlanLocalLocalExchangerTime");
_build_tasks_timer = ADD_TIMER(_runtime_profile, "BuildTasksTime");
_prepare_all_pipelines_timer = ADD_TIMER(_runtime_profile, "PrepareAllPipelinesTime");
_build_pipelines_timer = ADD_TIMER(_fragment_level_profile, "BuildPipelinesTime");
_init_context_timer = ADD_TIMER(_fragment_level_profile, "InitContextTime");
_plan_local_exchanger_timer = ADD_TIMER(_fragment_level_profile, "PlanLocalShuffleTime");
_build_tasks_timer = ADD_TIMER(_fragment_level_profile, "BuildTasksTime");
_prepare_all_pipelines_timer = ADD_TIMER(_fragment_level_profile, "PrepareAllPipelinesTime");
{
SCOPED_TIMER(_init_context_timer);
_num_instances = request.local_params.size();
Expand Down Expand Up @@ -1693,7 +1693,7 @@ void PipelineFragmentContext::_close_fragment_instance() {
return;
}
Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
_runtime_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
_fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
static_cast<void>(send_report(true));
// Print profile content in info log is a tempoeray solution for stream load and external_connector.
// Since stream load does not have someting like coordinator on FE, so
Expand Down Expand Up @@ -1777,8 +1777,6 @@ Status PipelineFragmentContext::send_report(bool done) {

ReportStatusRequest req {exec_status,
runtime_states,
_runtime_profile.get(),
_runtime_state->load_channel_profile(),
done || !exec_status.ok(),
_query_ctx->coord_addr,
_query_id,
Expand Down Expand Up @@ -1820,6 +1818,11 @@ PipelineFragmentContext::collect_realtime_profile() const {
return res;
}

// Make sure first profile is fragment level profile
auto fragment_profile = std::make_shared<TRuntimeProfileTree>();
_fragment_level_profile->to_thrift(fragment_profile.get());
res.push_back(fragment_profile);

// pipeline_id_to_profile is initialized in prepare stage
for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ class PipelineFragmentContext : public TaskExecutionContext {
// When submit fail, `_total_tasks` is equal to the number of tasks submitted.
std::atomic<int> _total_tasks = 0;

std::unique_ptr<RuntimeProfile> _runtime_profile;
std::unique_ptr<RuntimeProfile> _fragment_level_profile;
bool _is_report_success = false;

std::unique_ptr<RuntimeState> _runtime_state;
Expand Down
49 changes: 5 additions & 44 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,57 +392,18 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
params.__set_status(exec_status.to_thrift());
params.__set_done(req.done);
params.__set_query_type(req.runtime_state->query_type());
params.__isset.profile = false;

DCHECK(req.runtime_state != nullptr);

if (req.runtime_state->query_type() == TQueryType::LOAD) {
params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
}
params.__isset.detailed_report = true;
DCHECK(!req.runtime_states.empty());
const bool enable_profile = (*req.runtime_states.begin())->enable_profile();
if (enable_profile) {
params.__isset.profile = true;
params.__isset.loadChannelProfile = false;
for (auto* rs : req.runtime_states) {
DCHECK(req.load_channel_profile);
TDetailedReportParams detailed_param;
rs->load_channel_profile()->to_thrift(&detailed_param.loadChannelProfile);
// merge all runtime_states.loadChannelProfile to req.load_channel_profile
req.load_channel_profile->update(detailed_param.loadChannelProfile);
}
req.load_channel_profile->to_thrift(&params.loadChannelProfile);
} else {
params.__isset.profile = false;
}

if (enable_profile) {
DCHECK(req.profile != nullptr);
TDetailedReportParams detailed_param;
detailed_param.__isset.fragment_instance_id = false;
detailed_param.__isset.profile = true;
detailed_param.__isset.loadChannelProfile = false;
detailed_param.__set_is_fragment_level(true);
req.profile->to_thrift(&detailed_param.profile);
params.detailed_report.push_back(detailed_param);
for (auto pipeline_profile : req.runtime_state->pipeline_id_to_profile()) {
TDetailedReportParams detailed_param;
detailed_param.__isset.fragment_instance_id = false;
detailed_param.__isset.profile = true;
detailed_param.__isset.loadChannelProfile = false;
pipeline_profile->to_thrift(&detailed_param.profile);
params.detailed_report.push_back(std::move(detailed_param));
}
}
if (!req.runtime_state->output_files().empty()) {
params.__isset.delta_urls = true;
for (auto& it : req.runtime_state->output_files()) {
params.delta_urls.push_back(to_http_path(it));
}
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
for (auto& it : rs->output_files()) {
DCHECK(!req.runtime_states.empty());
if (!req.runtime_state->output_files().empty()) {
params.__isset.delta_urls = true;
for (auto& it : req.runtime_state->output_files()) {
params.delta_urls.push_back(to_http_path(it));
}
}
Expand Down
3 changes: 0 additions & 3 deletions be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,6 @@ void QueryContext::add_fragment_profile(

void QueryContext::_report_query_profile() {
std::lock_guard<std::mutex> lg(_profile_mutex);
LOG_INFO(
"Pipeline x query context, register query profile, query {}, fragment profile count {}",
print_id(_query_id), _profile_map.size());

for (auto& [fragment_id, fragment_profile] : _profile_map) {
std::shared_ptr<TRuntimeProfileTree> load_channel_profile = nullptr;
Expand Down
2 changes: 0 additions & 2 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ class PipelineFragmentContext;
struct ReportStatusRequest {
const Status status;
std::vector<RuntimeState*> runtime_states;
RuntimeProfile* profile = nullptr;
RuntimeProfile* load_channel_profile = nullptr;
bool done;
TNetworkAddress coord_addr;
TUniqueId query_id;
Expand Down
5 changes: 4 additions & 1 deletion be/src/runtime/runtime_query_statistics_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ TReportExecStatusParams RuntimeQueryStatisticsMgr::create_report_exec_status_par
int32_t fragment_id = entry.first;
const std::vector<std::shared_ptr<TRuntimeProfileTree>>& fragment_profile = entry.second;
std::vector<TDetailedReportParams> detailed_params;

bool is_first = true;
for (auto pipeline_profile : fragment_profile) {
if (pipeline_profile == nullptr) {
auto msg = fmt::format("Register fragment profile {} {} failed, profile is null",
Expand All @@ -129,6 +129,9 @@ TReportExecStatusParams RuntimeQueryStatisticsMgr::create_report_exec_status_par

TDetailedReportParams tmp;
THRIFT_MOVE_VALUES(tmp, profile, *pipeline_profile);
// First profile is fragment level
tmp.__set_is_fragment_level(is_first);
is_first = false;
// tmp.fragment_instance_id is not needed for pipeline x
detailed_params.push_back(std::move(tmp));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,14 @@

package org.apache.doris.common.profile;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.Pair;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TDetailedReportParams;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TQueryProfile;
import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TRuntimeProfileTree;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
Expand Down Expand Up @@ -235,8 +232,6 @@ public Status updateProfile(TQueryProfile profile, TNetworkAddress backendHBAddr
List<TDetailedReportParams> fragmentProfile = entry.getValue();
int pipelineIdx = 0;
List<RuntimeProfile> taskProfile = Lists.newArrayList();
// The naming rule must be same with the one in updateProfile(TReportExecStatusParams params)
// Because we relay on the name of RuntimeProfile to eliminate the duplicate profile
String suffix = " (host=" + backendHBAddress + ")";
for (TDetailedReportParams pipelineProfile : fragmentProfile) {
String name = "";
Expand All @@ -246,6 +241,7 @@ public Status updateProfile(TQueryProfile profile, TNetworkAddress backendHBAddr
name = "Pipeline :" + pipelineIdx + " " + suffix;
pipelineIdx++;
}

RuntimeProfile profileNode = new RuntimeProfile(name);
// The taskprofile is used to save the profile of the pipeline, without
// considering the FragmentLevel.
Expand Down Expand Up @@ -273,54 +269,6 @@ public Status updateProfile(TQueryProfile profile, TNetworkAddress backendHBAddr
return new Status(TStatusCode.OK, "Success");
}

public void updateProfile(TReportExecStatusParams params) {
Backend backend = null;
if (params.isSetBackendId()) {
backend = Env.getCurrentSystemInfo().getBackend(params.getBackendId());
if (backend == null) {
LOG.warn("could not find backend with id {}", params.getBackendId());
return;
}
} else {
LOG.warn("backend id is not set in report profile request, bad message");
return;
}

int pipelineIdx = 0;
List<RuntimeProfile> taskProfile = Lists.newArrayList();
String suffix = " (host=" + backend.getHeartbeatAddress() + ")";
// Each datailed report params is a fragment level profile or a pipeline profile
for (TDetailedReportParams param : params.detailed_report) {
String name = "";
if (param.isSetIsFragmentLevel() && param.is_fragment_level) {
name = "Fragment Level Profile: " + suffix;
} else {
name = "Pipeline :" + pipelineIdx + " " + suffix;
pipelineIdx++;
}
RuntimeProfile profile = new RuntimeProfile(name);
// The taskprofile is used to save the profile of the pipeline, without
// considering the FragmentLevel.
if (!(param.isSetIsFragmentLevel() && param.is_fragment_level)) {
taskProfile.add(profile);
}
if (param.isSetProfile()) {
profile.update(param.profile);
}
if (params.done) {
profile.setIsDone(true);
}
profile.sortChildren();
fragmentProfiles.get(params.fragment_id).addChild(profile);
}
// TODO ygl: is this right? there maybe multi Backends, what does
// update load profile do???
if (params.isSetLoadChannelProfile()) {
loadChannelProfile.update(params.loadChannelProfile);
}
setMultiBeProfile(params.fragment_id, backend.getHeartbeatAddress(), taskProfile);
}

public synchronized void addFragmentBackend(PlanFragmentId fragmentId, Long backendId) {
fragmentIdBeNum.put(fragmentId.asInt(), fragmentIdBeNum.get(fragmentId.asInt()) + 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,25 +244,6 @@ public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params,
}
}

if (params.isSetProfile() || params.isSetLoadChannelProfile()) {
LOG.info("Reporting profile, query_id={}, fragment {} backend num: {}, ip: {}",
DebugUtil.printId(params.query_id), params.getFragmentId(), params.backend_num, beAddr);
if (LOG.isDebugEnabled()) {
LOG.debug("params: {}", params);
}
ExecutionProfile executionProfile = ProfileManager.getInstance().getExecutionProfile(params.query_id);
if (executionProfile != null) {
// Update profile may cost a lot of time, use a seperate pool to deal with it.
writeProfileExecutor.submit(new Runnable() {
@Override
public void run() {
executionProfile.updateProfile(params);
}
});
} else {
LOG.info("Could not find execution profile with query id {}", DebugUtil.printId(params.query_id));
}
}
final TReportExecStatusResult result = new TReportExecStatusResult();

if (params.isSetReportWorkloadRuntimeStatus()) {
Expand Down