Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 18 additions & 67 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -600,15 +600,13 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
}
query_ctx = search->second;
} else {
{
// Find _query_ctx_map, in case some other request has already
// create the query fragments context.
std::lock_guard<std::mutex> lock(_lock);
auto search = _query_ctx_map.find(query_id);
if (search != _query_ctx_map.end()) {
query_ctx = search->second;
return Status::OK();
}
// Find _query_ctx_map, in case some other request has already
// create the query fragments context.
std::lock_guard<std::mutex> lock(_lock);
auto search = _query_ctx_map.find(query_id);
if (search != _query_ctx_map.end()) {
query_ctx = search->second;
return Status::OK();
}

// This may be a first fragment request of the query.
Expand Down Expand Up @@ -636,57 +634,21 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
}

query_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(pipeline);
query_ctx->timeout_second = params.query_options.execution_timeout;
_set_scan_concurrency(params, query_ctx.get());

bool has_query_mem_tracker =
params.query_options.__isset.mem_limit && (params.query_options.mem_limit > 0);
int64_t bytes_limit = has_query_mem_tracker ? params.query_options.mem_limit : -1;
if (bytes_limit > MemInfo::mem_limit()) {
VLOG_NOTICE << "Query memory limit " << PrettyPrinter::print(bytes_limit, TUnit::BYTES)
<< " exceeds process memory limit of "
<< PrettyPrinter::print(MemInfo::mem_limit(), TUnit::BYTES)
<< ". Using process memory limit instead";
bytes_limit = MemInfo::mem_limit();
}
if (params.query_options.query_type == TQueryType::SELECT) {
query_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::QUERY,
fmt::format("Query#Id={}", print_id(query_ctx->query_id())), bytes_limit);
} else if (params.query_options.query_type == TQueryType::LOAD) {
query_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::LOAD,
fmt::format("Load#Id={}", print_id(query_ctx->query_id())), bytes_limit);
} else { // EXTERNAL
query_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::LOAD,
fmt::format("External#Id={}", print_id(query_ctx->query_id())), bytes_limit);
}
if (params.query_options.__isset.is_report_success &&
params.query_options.is_report_success) {
query_ctx->query_mem_tracker->enable_print_log_usage();
}

query_ctx->register_memory_statistics();
query_ctx->register_cpu_statistics();

bool is_pipeline = false;
if constexpr (std::is_same_v<TPipelineFragmentParams, Params>) {
is_pipeline = true;
}

if (params.__isset.workload_groups && !params.workload_groups.empty()) {
uint64_t tg_id = params.workload_groups[0].id;
auto* tg_mgr = _exec_env->task_group_manager();
taskgroup::TaskGroupPtr task_group_ptr = nullptr;
Status ret = tg_mgr->add_query_to_group(tg_id, query_ctx->query_id(), &task_group_ptr);
if (ret.ok()) {
task_group_ptr->add_mem_tracker_limiter(query_ctx->query_mem_tracker);
// set task group to queryctx for memory tracker can be removed, see QueryContext's destructor
query_ctx->set_task_group(task_group_ptr);
taskgroup::TaskGroupPtr task_group_ptr =
_exec_env->task_group_manager()->get_task_group_by_id(tg_id);
if (task_group_ptr != nullptr) {
RETURN_IF_ERROR(query_ctx->set_task_group(task_group_ptr));
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(print_id(query_id),
tg_id);
query_ctx->set_query_scheduler(tg_id);

LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id())
<< ", use task group: " << task_group_ptr->debug_string()
Expand All @@ -695,26 +657,15 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
<< ((int)config::enable_cgroup_cpu_soft_limit);
} else {
LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id())
<< " carried group info but can not find group in be, reason: "
<< ret.to_string();
}
}

{
// Find _query_ctx_map again, in case some other request has already
// create the query fragments context.
std::lock_guard<std::mutex> lock(_lock);
auto search = _query_ctx_map.find(query_id);
if (search == _query_ctx_map.end()) {
_query_ctx_map.insert(std::make_pair(query_ctx->query_id(), query_ctx));
LOG(INFO) << "Register query/load memory tracker, query/load id: "
<< print_id(query_ctx->query_id())
<< " limit: " << PrettyPrinter::print(bytes_limit, TUnit::BYTES);
} else {
// Already has a query fragments context, use it
query_ctx = search->second;
<< " carried group info but can not find group in be";
}
}
// There is some logic in query ctx's dctor, we could not check if exists and delete the
// temp query ctx now. For example, the query id maybe removed from task group's queryset.
_query_ctx_map.insert(std::make_pair(query_ctx->query_id(), query_ctx));
LOG(INFO) << "Register query/load memory tracker, query/load id: "
<< print_id(query_ctx->query_id())
<< " limit: " << PrettyPrinter::print(query_ctx->mem_limit(), TUnit::BYTES);
}
return Status::OK();
}
Expand Down
51 changes: 44 additions & 7 deletions be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "pipeline/pipeline_x/dependency.h"
#include "runtime/runtime_query_statistics_mgr.h"
#include "runtime/task_group/task_group_manager.h"
#include "util/mem_info.h"

namespace doris {

Expand All @@ -46,6 +47,37 @@ QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv*
pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency", this);
_runtime_filter_mgr.reset(
new RuntimeFilterMgr(TUniqueId(), RuntimeFilterParamsContext::create(this)));

timeout_second = query_options.execution_timeout;

bool has_query_mem_tracker = query_options.__isset.mem_limit && (query_options.mem_limit > 0);
int64_t _bytes_limit = has_query_mem_tracker ? query_options.mem_limit : -1;
if (_bytes_limit > MemInfo::mem_limit()) {
VLOG_NOTICE << "Query memory limit " << PrettyPrinter::print(_bytes_limit, TUnit::BYTES)
<< " exceeds process memory limit of "
<< PrettyPrinter::print(MemInfo::mem_limit(), TUnit::BYTES)
<< ". Using process memory limit instead";
_bytes_limit = MemInfo::mem_limit();
}
if (query_options.query_type == TQueryType::SELECT) {
query_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::QUERY, fmt::format("Query#Id={}", print_id(_query_id)),
_bytes_limit);
} else if (query_options.query_type == TQueryType::LOAD) {
query_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}", print_id(_query_id)),
_bytes_limit);
} else { // EXTERNAL
query_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::LOAD, fmt::format("External#Id={}", print_id(_query_id)),
_bytes_limit);
}
if (query_options.__isset.is_report_success && query_options.is_report_success) {
query_mem_tracker->enable_print_log_usage();
}

register_memory_statistics();
register_cpu_statistics();
}

QueryContext::~QueryContext() {
Expand All @@ -64,7 +96,7 @@ QueryContext::~QueryContext() {
}
if (_task_group) {
_task_group->remove_mem_tracker_limiter(query_mem_tracker);
_exec_env->task_group_manager()->remove_query_from_group(_task_group->id(), _query_id);
_task_group->remove_query(_query_id);
}

_exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id));
Expand Down Expand Up @@ -154,12 +186,6 @@ void QueryContext::register_cpu_statistics() {
}
}

void QueryContext::set_query_scheduler(uint64_t tg_id) {
auto* tg_mgr = _exec_env->task_group_manager();
tg_mgr->get_query_scheduler(tg_id, &_task_scheduler, &_scan_task_scheduler,
&_non_pipe_thread_pool);
}

doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() {
if (_task_group) {
if (_task_scheduler) {
Expand All @@ -177,4 +203,15 @@ ThreadPool* QueryContext::get_non_pipe_exec_thread_pool() {
}
}

Status QueryContext::set_task_group(taskgroup::TaskGroupPtr& tg) {
_task_group = tg;
// Should add query first, then the task group will not be deleted.
// see task_group_manager::delete_task_group_by_ids
RETURN_IF_ERROR(_task_group->add_query(_query_id));
_task_group->add_mem_tracker_limiter(query_mem_tracker);
_exec_env->task_group_manager()->get_query_scheduler(
_task_group->id(), &_task_scheduler, &_scan_task_scheduler, &_non_pipe_thread_pool);
return Status::OK();
}

} // namespace doris
17 changes: 4 additions & 13 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class QueryContext {

vectorized::RuntimePredicate& get_runtime_predicate() { return _runtime_predicate; }

void set_task_group(taskgroup::TaskGroupPtr& tg) { _task_group = tg; }
Status set_task_group(taskgroup::TaskGroupPtr& tg);

int execution_timeout() const {
return _query_options.__isset.execution_timeout ? _query_options.execution_timeout
Expand Down Expand Up @@ -191,16 +191,6 @@ class QueryContext {

TUniqueId query_id() const { return _query_id; }

void set_task_scheduler(pipeline::TaskScheduler* task_scheduler) {
_task_scheduler = task_scheduler;
}

pipeline::TaskScheduler* get_task_scheduler() { return _task_scheduler; }

void set_scan_task_scheduler(vectorized::SimplifiedScanScheduler* scan_task_scheduler) {
_scan_task_scheduler = scan_task_scheduler;
}

vectorized::SimplifiedScanScheduler* get_scan_scheduler() { return _scan_task_scheduler; }

pipeline::Dependency* get_execution_dependency() { return _execution_dependency.get(); }
Expand All @@ -215,12 +205,12 @@ class QueryContext {

std::shared_ptr<QueryStatistics> get_cpu_statistics() { return _cpu_statistics; }

void set_query_scheduler(uint64_t wg_id);

doris::pipeline::TaskScheduler* get_pipe_exec_scheduler();

ThreadPool* get_non_pipe_exec_thread_pool();

int64_t mem_limit() { return _bytes_limit; }

public:
DescriptorTbl* desc_tbl = nullptr;
bool set_rsc_info = false;
Expand Down Expand Up @@ -254,6 +244,7 @@ class QueryContext {
TUniqueId _query_id;
ExecEnv* _exec_env = nullptr;
VecDateTimeValue _start_time;
int64_t _bytes_limit = 0;

// A token used to submit olap scanner to the "_limited_scan_thread_pool",
// This thread pool token is created from "_limited_scan_thread_pool" from exec env.
Expand Down
32 changes: 25 additions & 7 deletions be/src/runtime/task_group/task_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
#include <unordered_set>

#include "common/status.h"
#include "service/backend_options.h"
#include "util/hash_util.hpp"

namespace doris {

class TPipelineWorkloadGroup;
class MemTrackerLimiter;

namespace pipeline {
Expand Down Expand Up @@ -96,15 +96,33 @@ class TaskGroup : public std::enable_shared_from_this<TaskGroup> {
return _memory_limit > 0;
}

void add_query(TUniqueId query_id) { _query_id_set.insert(query_id); }

void remove_query(TUniqueId query_id) { _query_id_set.erase(query_id); }
Status add_query(TUniqueId query_id) {
std::unique_lock<std::shared_mutex> wlock(_mutex);
if (_is_shutdown) {
// If the task group is set shutdown, then should not run any more,
// because the scheduler pool and other pointer may be released.
return Status::InternalError(
"Failed add query to workload group, the workload group is shutdown. host: {}",
BackendOptions::get_localhost());
}
_query_id_set.insert(query_id);
return Status::OK();
}

void shutdown() { _is_shutdown = true; }
void remove_query(TUniqueId query_id) {
std::unique_lock<std::shared_mutex> wlock(_mutex);
_query_id_set.erase(query_id);
}

int query_num() { return _query_id_set.size(); }
void shutdown() {
std::unique_lock<std::shared_mutex> wlock(_mutex);
_is_shutdown = true;
}

bool is_shutdown() { return _is_shutdown; }
int query_num() {
std::shared_lock<std::shared_mutex> r_lock(_mutex);
return _query_id_set.size();
}

private:
mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, _memory_limit
Expand Down
28 changes: 0 additions & 28 deletions be/src/runtime/task_group/task_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,34 +276,6 @@ void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
<< "ms, deleted group size:" << deleted_tg_ids.size();
}

Status TaskGroupManager::add_query_to_group(uint64_t tg_id, TUniqueId query_id,
TaskGroupPtr* tg_ptr) {
std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
auto tg_iter = _task_groups.find(tg_id);
if (tg_iter != _task_groups.end()) {
if (tg_iter->second->is_shutdown()) {
return Status::InternalError<false>("workload group {} is shutdown.", tg_id);
}
tg_iter->second->add_query(query_id);
*tg_ptr = tg_iter->second;
return Status::OK();
} else {
return Status::InternalError<false>("can not find workload group {}.", tg_id);
}
}

void TaskGroupManager::remove_query_from_group(uint64_t tg_id, TUniqueId query_id) {
std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
auto tg_iter = _task_groups.find(tg_id);
if (tg_iter != _task_groups.end()) {
tg_iter->second->remove_query(query_id);
} else {
//NOTE: This should never happen
LOG(INFO) << "can not find task group when remove query, tg:" << tg_id
<< ", query_id:" << print_id(query_id);
}
}

void TaskGroupManager::stop() {
for (auto& task_sche : _tg_sche_map) {
task_sche.second->stop();
Expand Down
4 changes: 0 additions & 4 deletions be/src/runtime/task_group/task_group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,6 @@ class TaskGroupManager {
vectorized::SimplifiedScanScheduler** scan_sched,
ThreadPool** non_pipe_thread_pool);

Status add_query_to_group(uint64_t tg_id, TUniqueId query_id, TaskGroupPtr* tg_ptr);

void remove_query_from_group(uint64_t tg_id, TUniqueId query_id);

private:
std::shared_mutex _group_mutex;
std::unordered_map<uint64_t, TaskGroupPtr> _task_groups;
Expand Down