Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,15 @@ Status BaseDeltaWriter::init() {
if (_is_init) {
return Status::OK();
}
auto* t_ctx = doris::thread_context(true);
std::shared_ptr<WorkloadGroup> wg_sptr = nullptr;
if (t_ctx) {
wg_sptr = t_ctx->workload_group().lock();
}
RETURN_IF_ERROR(_rowset_builder->init());
RETURN_IF_ERROR(_memtable_writer->init(
_rowset_builder->rowset_writer(), _rowset_builder->tablet_schema(),
_rowset_builder->get_partial_update_info(), nullptr,
_rowset_builder->get_partial_update_info(), wg_sptr,
_rowset_builder->tablet()->enable_unique_key_merge_on_write()));
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
_is_init = true;
Expand Down
7 changes: 3 additions & 4 deletions be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,12 @@ Status DeltaWriterV2::init() {

_rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams);
RETURN_IF_ERROR(_rowset_writer->init(context));
ThreadPool* wg_thread_pool_ptr = nullptr;
std::shared_ptr<WorkloadGroup> wg_sptr = nullptr;
if (_state->get_query_ctx()) {
wg_thread_pool_ptr = _state->get_query_ctx()->get_memtable_flush_pool();
wg_sptr = _state->get_query_ctx()->workload_group();
}
RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, _partial_update_info,
wg_thread_pool_ptr,
_streams[0]->enable_unique_mow(_req.index_id)));
wg_sptr, _streams[0]->enable_unique_mow(_req.index_id)));
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
_is_init = true;
_streams.clear();
Expand Down
28 changes: 13 additions & 15 deletions be/src/olap/memtable_flush_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,16 @@ Status FlushToken::submit(std::unique_ptr<MemTable> mem_table) {
auto task = MemtableFlushTask::create_shared(shared_from_this(), std::move(mem_table),
_rowset_writer->allocate_segment_id(),
submit_task_time);
Status ret = _thread_pool->submit(std::move(task));
// NOTE: we should guarantee WorkloadGroup is not deconstructed when submit memtable flush task.
// because currently WorkloadGroup's can only be destroyed when all queries in the group is finished,
// but not consider whether load channel is finish.
std::shared_ptr<WorkloadGroup> wg_sptr = _wg_wptr.lock();
ThreadPool* wg_thread_pool = nullptr;
if (wg_sptr) {
wg_thread_pool = wg_sptr->get_memtable_flush_pool_ptr();
}
Status ret = wg_thread_pool ? wg_thread_pool->submit(std::move(task))
: _thread_pool->submit(std::move(task));
if (ret.ok()) {
// _wait_running_task_finish was executed after this function, so no need to notify _cond here
_stats.flush_running_count++;
Expand Down Expand Up @@ -233,15 +242,16 @@ void MemTableFlushExecutor::init(const std::vector<DataDir*>& data_dirs) {
// NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order.
Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& flush_token,
std::shared_ptr<RowsetWriter> rowset_writer,
bool is_high_priority) {
bool is_high_priority,
std::shared_ptr<WorkloadGroup> wg_sptr) {
switch (rowset_writer->type()) {
case ALPHA_ROWSET:
// alpha rowset do not support flush in CONCURRENT. and not support alpha rowset now.
return Status::InternalError<false>("not support alpha rowset load now.");
case BETA_ROWSET: {
// beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer.
ThreadPool* pool = is_high_priority ? _high_prio_flush_pool.get() : _flush_pool.get();
flush_token = FlushToken::create_shared(pool);
flush_token = FlushToken::create_shared(pool, wg_sptr);
flush_token->set_rowset_writer(rowset_writer);
return Status::OK();
}
Expand All @@ -250,18 +260,6 @@ Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& fl
}
}

Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& flush_token,
std::shared_ptr<RowsetWriter> rowset_writer,
ThreadPool* wg_flush_pool_ptr) {
if (rowset_writer->type() == BETA_ROWSET) {
flush_token = FlushToken::create_shared(wg_flush_pool_ptr);
} else {
return Status::InternalError<false>("not support alpha rowset load now.");
}
flush_token->set_rowset_writer(rowset_writer);
return Status::OK();
}

void MemTableFlushExecutor::_register_metrics() {
REGISTER_HOOK_METRIC(flush_thread_pool_queue_size,
[this]() { return _flush_pool->get_queue_size(); });
Expand Down
13 changes: 7 additions & 6 deletions be/src/olap/memtable_flush_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ namespace doris {
class DataDir;
class MemTable;
class RowsetWriter;
class WorkloadGroup;

// the statistic of a certain flush handler.
// use atomic because it may be updated by multi threads
Expand All @@ -59,7 +60,8 @@ class FlushToken : public std::enable_shared_from_this<FlushToken> {
ENABLE_FACTORY_CREATOR(FlushToken);

public:
FlushToken(ThreadPool* thread_pool) : _flush_status(Status::OK()), _thread_pool(thread_pool) {}
FlushToken(ThreadPool* thread_pool, std::shared_ptr<WorkloadGroup> wg_sptr)
: _flush_status(Status::OK()), _thread_pool(thread_pool), _wg_wptr(wg_sptr) {}

Status submit(std::unique_ptr<MemTable> mem_table);

Expand Down Expand Up @@ -108,6 +110,8 @@ class FlushToken : public std::enable_shared_from_this<FlushToken> {

std::mutex _mutex;
std::condition_variable _cond;

std::weak_ptr<WorkloadGroup> _wg_wptr;
};

// MemTableFlushExecutor is responsible for flushing memtables to disk.
Expand All @@ -133,11 +137,8 @@ class MemTableFlushExecutor {
void init(const std::vector<DataDir*>& data_dirs);

Status create_flush_token(std::shared_ptr<FlushToken>& flush_token,
std::shared_ptr<RowsetWriter> rowset_writer, bool is_high_priority);

Status create_flush_token(std::shared_ptr<FlushToken>& flush_token,
std::shared_ptr<RowsetWriter> rowset_writer,
ThreadPool* wg_flush_pool_ptr);
std::shared_ptr<RowsetWriter> rowset_writer, bool is_high_priority,
std::shared_ptr<WorkloadGroup> wg_sptr);

private:
void _register_metrics();
Expand Down
11 changes: 3 additions & 8 deletions be/src/olap/memtable_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ MemTableWriter::~MemTableWriter() {
Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer,
TabletSchemaSPtr tablet_schema,
std::shared_ptr<PartialUpdateInfo> partial_update_info,
ThreadPool* wg_flush_pool_ptr, bool unique_key_mow) {
std::shared_ptr<WorkloadGroup> wg_sptr, bool unique_key_mow) {
_rowset_writer = rowset_writer;
_tablet_schema = tablet_schema;
_unique_key_mow = unique_key_mow;
Expand All @@ -77,13 +77,8 @@ Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer,
// create flush handler
// by assigning segment_id to memtable before submiting to flush executor,
// we can make sure same keys sort in the same order in all replicas.
if (wg_flush_pool_ptr) {
RETURN_IF_ERROR(StorageEngine::instance()->memtable_flush_executor()->create_flush_token(
_flush_token, _rowset_writer, wg_flush_pool_ptr));
} else {
RETURN_IF_ERROR(StorageEngine::instance()->memtable_flush_executor()->create_flush_token(
_flush_token, _rowset_writer, _req.is_high_priority));
}
RETURN_IF_ERROR(StorageEngine::instance()->memtable_flush_executor()->create_flush_token(
_flush_token, _rowset_writer, _req.is_high_priority, wg_sptr));

_is_init = true;
return Status::OK();
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/memtable_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class SlotDescriptor;
class OlapTableSchemaParam;
class RowsetWriter;
struct FlushStatistic;
class WorkloadGroup;

namespace vectorized {
class Block;
Expand All @@ -69,7 +70,7 @@ class MemTableWriter {

Status init(std::shared_ptr<RowsetWriter> rowset_writer, TabletSchemaSPtr tablet_schema,
std::shared_ptr<PartialUpdateInfo> partial_update_info,
ThreadPool* wg_flush_pool_ptr, bool unique_key_mow = false);
std::shared_ptr<WorkloadGroup> wg_sptr, bool unique_key_mow = false);

Status write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs);

Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ class QueryContext {
// And will be shared by all instances of this query.
// So that we can control the max thread that a query can be used to execute.
// If this token is not set, the scanner will be executed in "_scan_thread_pool" in exec env.
std::unique_ptr<ThreadPoolToken> _thread_token;
std::unique_ptr<ThreadPoolToken> _thread_token {nullptr};

std::mutex _start_lock;
std::condition_variable _start_cond;
Expand Down
12 changes: 7 additions & 5 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,21 +331,21 @@ Status WorkloadGroupInfo::parse_topic_info(const TWorkloadGroupInfo& tworkload_g

// 4 cpu_share
uint64_t cpu_share = CgroupCpuCtl::cpu_soft_limit_default_value();
if (tworkload_group_info.__isset.cpu_share) {
if (tworkload_group_info.__isset.cpu_share && tworkload_group_info.cpu_share > 0) {
cpu_share = tworkload_group_info.cpu_share;
}
workload_group_info->cpu_share = cpu_share;

// 5 cpu hard limit
int cpu_hard_limit = CPU_HARD_LIMIT_DEFAULT_VALUE;
if (tworkload_group_info.__isset.cpu_hard_limit) {
if (tworkload_group_info.__isset.cpu_hard_limit && tworkload_group_info.cpu_hard_limit > 0) {
cpu_hard_limit = tworkload_group_info.cpu_hard_limit;
}
workload_group_info->cpu_hard_limit = cpu_hard_limit;

// 6 mem_limit
std::string mem_limit_str = MEMORY_LIMIT_DEFAULT_VALUE;
if (tworkload_group_info.__isset.mem_limit) {
if (tworkload_group_info.__isset.mem_limit && tworkload_group_info.mem_limit != "-1") {
mem_limit_str = tworkload_group_info.mem_limit;
}
bool is_percent = true;
Expand Down Expand Up @@ -407,14 +407,16 @@ Status WorkloadGroupInfo::parse_topic_info(const TWorkloadGroupInfo& tworkload_g

// 14 scan io
int read_bytes_per_second = -1;
if (tworkload_group_info.__isset.read_bytes_per_second) {
if (tworkload_group_info.__isset.read_bytes_per_second &&
tworkload_group_info.read_bytes_per_second > 0) {
read_bytes_per_second = tworkload_group_info.read_bytes_per_second;
}
workload_group_info->read_bytes_per_second = read_bytes_per_second;

// 15 remote scan io
int remote_read_bytes_per_second = -1;
if (tworkload_group_info.__isset.remote_read_bytes_per_second) {
if (tworkload_group_info.__isset.remote_read_bytes_per_second &&
tworkload_group_info.remote_read_bytes_per_second > 0) {
remote_read_bytes_per_second = tworkload_group_info.remote_read_bytes_per_second;
}
workload_group_info->remote_read_bytes_per_second = remote_read_bytes_per_second;
Expand Down
6 changes: 6 additions & 0 deletions be/src/runtime/workload_group/workload_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {

std::weak_ptr<CgroupCpuCtl> get_cgroup_cpu_ctl_wptr();

ThreadPool* get_memtable_flush_pool_ptr() {
// no lock here because this is called by memtable flush,
// to avoid lock competition with the workload thread pool's update
return _memtable_flush_pool.get();
}

private:
void create_cgroup_cpu_ctl_no_lock();
void upsert_cgroup_cpu_ctl_no_lock(WorkloadGroupInfo* wg_info);
Expand Down
Loading