Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions be/src/olap/memtable_flush_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ Status FlushToken::submit(std::unique_ptr<MemTable> mem_table) {
this, std::move(mem_table), _rowset_writer->allocate_segment_id(), submit_task_time);
Status ret = _thread_pool->submit(std::move(task));
if (ret.ok()) {
// _wait_running_task_finish was executed after this function, so no need to notify _cond here
_stats.flush_running_count++;
}
return ret;
Expand All @@ -103,16 +104,8 @@ Status FlushToken::submit(std::unique_ptr<MemTable> mem_table) {
// NOTE: FlushToken's submit/cancel/wait run in one thread,
// so we don't need to make them mutually exclusive, std::atomic is enough.
void FlushToken::_wait_running_task_finish() {
while (true) {
int64_t flush_running_count = _stats.flush_running_count.load();
if (flush_running_count < 0) {
LOG(ERROR) << "flush_running_count < 0, this is not expected!";
}
if (flush_running_count == 0) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
std::unique_lock<std::mutex> lock(_mutex);
_cond.wait(lock, [&]() { return _stats.flush_running_count.load() == 0; });
}

void FlushToken::cancel() {
Expand Down Expand Up @@ -155,7 +148,13 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, in

void FlushToken::_flush_memtable(std::unique_ptr<MemTable> memtable_ptr, int32_t segment_id,
int64_t submit_task_time) {
Defer defer {[&]() { _stats.flush_running_count--; }};
Defer defer {[&]() {
std::lock_guard<std::mutex> lock(_mutex);
_stats.flush_running_count--;
if (_stats.flush_running_count == 0) {
_cond.notify_one();
}
}};
if (_is_shutdown()) {
return;
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/memtable_flush_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#pragma once

#include <atomic>
#include <condition_variable>
#include <cstdint>
#include <iosfwd>
#include <memory>
Expand Down Expand Up @@ -101,6 +102,9 @@ class FlushToken {

std::atomic<bool> _shutdown = false;
ThreadPool* _thread_pool = nullptr;

std::mutex _mutex;
std::condition_variable _cond;
};

// MemTableFlushExecutor is responsible for flushing memtables to disk.
Expand Down