Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 10 additions & 13 deletions be/src/pipeline/exec/group_commit_block_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,21 @@ Status GroupCommitBlockSinkLocalState::open(RuntimeState* state) {
for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i]));
}
_write_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"GroupCommitBlockSinkDependency", true);

_create_plan_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"CreateGroupCommitPlanDependency", true);
_put_block_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"GroupCommitPutBlockDependency", true);
WARN_IF_ERROR(_initialize_load_queue(), "");
return Status::OK();
}

Status GroupCommitBlockSinkLocalState::_initialize_load_queue() {
auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>();
TUniqueId load_id;
load_id.__set_hi(p._load_id.hi);
load_id.__set_lo(p._load_id.lo);
if (_state->exec_env()->wal_mgr()->is_running()) {
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
p._db_id, p._table_id, p._base_schema_version, load_id, _load_block_queue,
_state->be_exec_version(), _state->query_mem_tracker(), _write_dependency));
p._db_id, p._table_id, p._base_schema_version, p._load_id, _load_block_queue,
_state->be_exec_version(), _state->query_mem_tracker(), _create_plan_dependency,
_put_block_dependency));
return Status::OK();
} else {
return Status::InternalError("be is stopping");
Expand Down Expand Up @@ -138,7 +137,8 @@ Status GroupCommitBlockSinkLocalState::_add_block(RuntimeState* state,
RETURN_IF_ERROR(_add_blocks(state, false));
}
RETURN_IF_ERROR(_load_block_queue->add_block(
state, output_block, _group_commit_mode == TGroupCommitMode::ASYNC_MODE));
state, output_block, _group_commit_mode == TGroupCommitMode::ASYNC_MODE,
_parent->cast<GroupCommitBlockSinkOperatorX>()._load_id));
}
return Status::OK();
}
Expand Down Expand Up @@ -181,9 +181,6 @@ Status GroupCommitBlockSinkLocalState::_add_blocks(RuntimeState* state,
bool is_blocks_contain_all_load_data) {
DCHECK(_is_block_appended == false);
auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>();
TUniqueId load_id;
load_id.__set_hi(p._load_id.hi);
load_id.__set_lo(p._load_id.lo);
if (_state->exec_env()->wal_mgr()->is_running()) {
if (_group_commit_mode == TGroupCommitMode::ASYNC_MODE) {
size_t estimated_wal_bytes =
Expand Down Expand Up @@ -212,7 +209,7 @@ Status GroupCommitBlockSinkLocalState::_add_blocks(RuntimeState* state,
}
for (auto it = _blocks.begin(); it != _blocks.end(); ++it) {
RETURN_IF_ERROR(_load_block_queue->add_block(
state, *it, _group_commit_mode == TGroupCommitMode::ASYNC_MODE));
state, *it, _group_commit_mode == TGroupCommitMode::ASYNC_MODE, p._load_id));
}
_is_block_appended = true;
_blocks.clear();
Expand Down
7 changes: 5 additions & 2 deletions be/src/pipeline/exec/group_commit_block_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ class GroupCommitBlockSinkLocalState final : public PipelineXSinkLocalState<Basi

Status close(RuntimeState* state, Status exec_status) override;
Dependency* finishdependency() override { return _finish_dependency.get(); }
std::vector<Dependency*> dependencies() const override { return {_write_dependency.get()}; }
std::vector<Dependency*> dependencies() const override {
return {_create_plan_dependency.get(), _put_block_dependency.get()};
}
std::string debug_string(int indentation_level) const override;

private:
Expand Down Expand Up @@ -75,7 +77,8 @@ class GroupCommitBlockSinkLocalState final : public PipelineXSinkLocalState<Basi
Bitmap _filter_bitmap;
int64_t _table_id;
std::shared_ptr<Dependency> _finish_dependency;
std::shared_ptr<Dependency> _write_dependency = nullptr;
std::shared_ptr<Dependency> _create_plan_dependency = nullptr;
std::shared_ptr<Dependency> _put_block_dependency = nullptr;
};

class GroupCommitBlockSinkOperatorX final
Expand Down
102 changes: 55 additions & 47 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,14 @@
namespace doris {

Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
std::shared_ptr<vectorized::Block> block, bool write_wal) {
std::shared_ptr<vectorized::Block> block, bool write_wal,
UniqueId& load_id) {
std::unique_lock l(mutex);
RETURN_IF_ERROR(status);
auto start = std::chrono::steady_clock::now();
DBUG_EXECUTE_IF("LoadBlockQueue.add_block.back_pressure_time_out", {
start = std::chrono::steady_clock::now() - std::chrono::milliseconds(120000);
});
while (!runtime_state->is_cancelled() && status.ok() &&
_all_block_queues_bytes->load(std::memory_order_relaxed) >=
config::group_commit_queue_mem_limit) {
_put_cond.wait_for(l,
std::chrono::milliseconds(LoadBlockQueue::MEM_BACK_PRESSURE_WAIT_TIME));
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start);
if (duration.count() > LoadBlockQueue::MEM_BACK_PRESSURE_WAIT_TIMEOUT) {
return Status::TimedOut<false>(
"Wal memory back pressure wait too much time! Load block queue txn id: {}, "
"label: {}, instance id: {}, consumed memory: {}",
txn_id, label, load_instance_id.to_string(),
_all_block_queues_bytes->load(std::memory_order_relaxed));
}
}
if (UNLIKELY(runtime_state->is_cancelled())) {
return runtime_state->cancel_reason();
}
Expand All @@ -69,8 +55,8 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
_all_block_queues_bytes->fetch_add(block->bytes(), std::memory_order_relaxed);
std::stringstream ss;
ss << "[";
for (const auto& id : _load_ids) {
ss << id.to_string() << ", ";
for (const auto& id : _load_ids_to_write_dep) {
ss << id.first.to_string() << ", ";
}
ss << "]";
VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::add_block). "
Expand All @@ -92,6 +78,12 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
return st;
}
}
if (!runtime_state->is_cancelled() && status.ok() &&
_all_block_queues_bytes->load(std::memory_order_relaxed) >=
config::group_commit_queue_mem_limit) {
DCHECK(_load_ids_to_write_dep.find(load_id) != _load_ids_to_write_dep.end());
_load_ids_to_write_dep[load_id]->block();
}
}
if (!_need_commit) {
if (_data_bytes >= _group_commit_data_bytes) {
Expand Down Expand Up @@ -125,7 +117,7 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block*
}
}
while (!runtime_state->is_cancelled() && status.ok() && _block_queue.empty() &&
(!_need_commit || (_need_commit && !_load_ids.empty()))) {
(!_need_commit || (_need_commit && !_load_ids_to_write_dep.empty()))) {
auto left_milliseconds = _group_commit_interval_ms;
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _start_time)
Expand All @@ -140,8 +132,8 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block*
if (duration >= 10 * _group_commit_interval_ms) {
std::stringstream ss;
ss << "[";
for (auto& id : _load_ids) {
ss << id.to_string() << ", ";
for (auto& id : _load_ids_to_write_dep) {
ss << id.first.to_string() << ", ";
}
ss << "]";
LOG(INFO) << "find one group_commit need to commit, txn_id=" << txn_id
Expand All @@ -167,8 +159,8 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block*
_all_block_queues_bytes->fetch_sub(block_data.block_bytes, std::memory_order_relaxed);
std::stringstream ss;
ss << "[";
for (const auto& id : _load_ids) {
ss << id.to_string() << ", ";
for (const auto& id : _load_ids_to_write_dep) {
ss << id.first.to_string() << ", ";
}
ss << "]";
VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::get_block). "
Expand All @@ -183,30 +175,37 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block*
<< ", the block is " << block->dump_data() << ", the block column size is "
<< block->columns_bytes();
}
if (_block_queue.empty() && _need_commit && _load_ids.empty()) {
if (_block_queue.empty() && _need_commit && _load_ids_to_write_dep.empty()) {
*eos = true;
} else {
*eos = false;
}
_put_cond.notify_all();
if (_all_block_queues_bytes->load(std::memory_order_relaxed) <
config::group_commit_queue_mem_limit) {
for (auto& id : _load_ids_to_write_dep) {
id.second->set_ready();
}
}
return Status::OK();
}

void LoadBlockQueue::remove_load_id(const UniqueId& load_id) {
std::unique_lock l(mutex);
if (_load_ids.find(load_id) != _load_ids.end()) {
_load_ids.erase(load_id);
if (_load_ids_to_write_dep.find(load_id) != _load_ids_to_write_dep.end()) {
_load_ids_to_write_dep[load_id]->set_always_ready();
_load_ids_to_write_dep.erase(load_id);
_get_cond.notify_all();
}
}

Status LoadBlockQueue::add_load_id(const UniqueId& load_id) {
Status LoadBlockQueue::add_load_id(const UniqueId& load_id,
const std::shared_ptr<pipeline::Dependency> put_block_dep) {
std::unique_lock l(mutex);
if (_need_commit) {
return Status::InternalError<false>("block queue is set need commit, id=" +
load_instance_id.to_string());
}
_load_ids.emplace(load_id);
_load_ids_to_write_dep[load_id] = put_block_dep;
group_commit_load_count.fetch_add(1);
return Status::OK();
}
Expand All @@ -228,8 +227,8 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st) {
_all_block_queues_bytes->fetch_sub(block_data.block_bytes, std::memory_order_relaxed);
std::stringstream ss;
ss << "[";
for (const auto& id : _load_ids) {
ss << id.to_string() << ", ";
for (const auto& id : _load_ids_to_write_dep) {
ss << id.first.to_string() << ", ";
}
ss << "]";
VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::_cancel_without_block). "
Expand All @@ -245,20 +244,26 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st) {
<< block_data.block->columns_bytes();
_block_queue.pop_front();
}
for (auto& id : _load_ids_to_write_dep) {
id.second->set_always_ready();
}
}

Status GroupCommitTable::get_first_block_load_queue(
int64_t table_id, int64_t base_schema_version, const UniqueId& load_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker, std::shared_ptr<pipeline::Dependency> dep) {
std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> create_plan_dep,
std::shared_ptr<pipeline::Dependency> put_block_dep) {
DCHECK(table_id == _table_id);
std::unique_lock l(_lock);
auto try_to_get_matched_queue = [&]() -> Status {
for (const auto& [_, inner_block_queue] : _load_block_queues) {
if (!inner_block_queue->need_commit()) {
if (base_schema_version == inner_block_queue->schema_version) {
if (inner_block_queue->add_load_id(load_id).ok()) {
if (inner_block_queue->add_load_id(load_id, put_block_dep).ok()) {
load_block_queue = inner_block_queue;

return Status::OK();
}
} else {
Expand All @@ -278,18 +283,19 @@ Status GroupCommitTable::get_first_block_load_queue(
}
if (!_is_creating_plan_fragment) {
_is_creating_plan_fragment = true;
dep->block();
RETURN_IF_ERROR(_thread_pool->submit_func([&, be_exe_version, mem_tracker, dep = dep] {
Defer defer {[&, dep = dep]() {
dep->set_ready();
std::unique_lock l(_lock);
_is_creating_plan_fragment = false;
}};
auto st = _create_group_commit_load(be_exe_version, mem_tracker);
if (!st.ok()) {
LOG(WARNING) << "create group commit load error, st=" << st.to_string();
}
}));
create_plan_dep->block();
RETURN_IF_ERROR(
_thread_pool->submit_func([&, be_exe_version, mem_tracker, dep = create_plan_dep] {
Defer defer {[&, dep = dep]() {
dep->set_ready();
std::unique_lock l(_lock);
_is_creating_plan_fragment = false;
}};
auto st = _create_group_commit_load(be_exe_version, mem_tracker);
if (!st.ok()) {
LOG(WARNING) << "create group commit load error, st=" << st.to_string();
}
}));
}
return try_to_get_matched_queue();
}
Expand Down Expand Up @@ -568,7 +574,9 @@ void GroupCommitMgr::stop() {
Status GroupCommitMgr::get_first_block_load_queue(
int64_t db_id, int64_t table_id, int64_t base_schema_version, const UniqueId& load_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker, std::shared_ptr<pipeline::Dependency> dep) {
std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> create_plan_dep,
std::shared_ptr<pipeline::Dependency> put_block_dep) {
std::shared_ptr<GroupCommitTable> group_commit_table;
{
std::lock_guard wlock(_lock);
Expand All @@ -581,7 +589,7 @@ Status GroupCommitMgr::get_first_block_load_queue(
}
RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue(
table_id, base_schema_version, load_id, load_block_queue, be_exe_version, mem_tracker,
dep));
create_plan_dep, put_block_dep));
return Status::OK();
}

Expand Down
14 changes: 8 additions & 6 deletions be/src/runtime/group_commit_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,11 @@ class LoadBlockQueue {
_all_block_queues_bytes(all_block_queues_bytes) {};

Status add_block(RuntimeState* runtime_state, std::shared_ptr<vectorized::Block> block,
bool write_wal);
bool write_wal, UniqueId& load_id);
Status get_block(RuntimeState* runtime_state, vectorized::Block* block, bool* find_block,
bool* eos);
Status add_load_id(const UniqueId& load_id);
Status add_load_id(const UniqueId& load_id,
const std::shared_ptr<pipeline::Dependency> put_block_dep);
void remove_load_id(const UniqueId& load_id);
void cancel(const Status& st);
bool need_commit() { return _need_commit; }
Expand Down Expand Up @@ -118,7 +119,7 @@ class LoadBlockQueue {
void _cancel_without_lock(const Status& st);

// the set of load ids of all blocks in this queue
std::set<UniqueId> _load_ids;
std::map<UniqueId, std::shared_ptr<pipeline::Dependency>> _load_ids_to_write_dep;
std::list<BlockData> _block_queue;

// wal
Expand All @@ -136,7 +137,6 @@ class LoadBlockQueue {

// memory back pressure, memory consumption of all tables' load block queues
std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
std::condition_variable _put_cond;
std::condition_variable _get_cond;
static constexpr size_t MEM_BACK_PRESSURE_WAIT_TIME = 1000; // 1s
static constexpr size_t MEM_BACK_PRESSURE_WAIT_TIMEOUT = 120000; // 120s
Expand All @@ -156,7 +156,8 @@ class GroupCommitTable {
std::shared_ptr<LoadBlockQueue>& load_block_queue,
int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> dep);
std::shared_ptr<pipeline::Dependency> create_plan_dep,
std::shared_ptr<pipeline::Dependency> put_block_dep);
Status get_load_block_queue(const TUniqueId& instance_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue);

Expand Down Expand Up @@ -200,7 +201,8 @@ class GroupCommitMgr {
std::shared_ptr<LoadBlockQueue>& load_block_queue,
int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> dep);
std::shared_ptr<pipeline::Dependency> create_plan_dep,
std::shared_ptr<pipeline::Dependency> put_block_dep);
std::promise<Status> debug_promise;
std::future<Status> debug_future = debug_promise.get_future();

Expand Down