Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent a case of WriteBufferManager flush thrashing #6364

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* Remove [min|max]_timestamp from VersionEdit for now since they are not tracked in MANIFEST anyway but consume two empty std::string (up to 64 bytes) for each file. Should they be added back in the future, we should store them more compactly.
* Improve universal tiered storage compaction picker to avoid extra major compaction triggered by size amplification. If `preclude_last_level_data_seconds` is enabled, the size amplification is calculated within non last_level data only which skip the last level and use the penultimate level as the size base.
* If an error is hit when writing to a file (append, sync, etc), RocksDB is more strict with not issuing more operations to it, except closing the file, with exceptions of some WAL file operations in error recovery path.
* A `WriteBufferManager` constructed with `allow_stall == false` will no longer trigger write stall implicitly by thrashing until memtable count limit is reached. Instead, a column family can continue accumulating writes while that CF is flushing, which means memory may increase. Users who prefer stalling writes must now explicitly set `allow_stall == true`.

### Performance Improvements
* Instead of constructing `FragmentedRangeTombstoneList` during every read operation, it is now constructed once and stored in immutable memtables. This improves speed of querying range tombstones from immutable memtables.
Expand Down
23 changes: 14 additions & 9 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1648,12 +1648,6 @@ Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) {
// thread is writing to another DB with the same write buffer, they may also
// be flushed. We may end up with flushing much more DBs than needed. It's
// suboptimal but still correct.
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"Flushing column family with oldest memtable entry. Write buffers are "
"using %" ROCKSDB_PRIszt " bytes out of a total of %" ROCKSDB_PRIszt ".",
write_buffer_manager_->memory_usage(),
write_buffer_manager_->buffer_size());
// no need to refcount because drop is happening in write thread, so can't
// happen while we're in the write thread
autovector<ColumnFamilyData*> cfds;
Expand All @@ -1667,9 +1661,11 @@ Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) {
if (cfd->IsDropped()) {
continue;
}
if (!cfd->mem()->IsEmpty()) {
// We only consider active mem table, hoping immutable memtable is
// already in the process of flushing.
if (!cfd->mem()->IsEmpty() && !cfd->imm()->IsFlushPendingOrRunning()) {
// We only consider flush on CFs with bytes in the mutable memtable,
// and no immutable memtables for which flush has yet to finish. If
// we triggered flush on CFs already trying to flush, we would risk
// creating too many immutable memtables leading to write stalls.
uint64_t seq = cfd->mem()->GetCreationSeq();
if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) {
cfd_picked = cfd;
Expand All @@ -1682,6 +1678,15 @@ Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) {
}
MaybeFlushStatsCF(&cfds);
}
if (!cfds.empty()) {
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"Flushing triggered to alleviate write buffer memory usage. Write "
"buffer is using %" ROCKSDB_PRIszt
" bytes out of a total of %" ROCKSDB_PRIszt ".",
write_buffer_manager_->memory_usage(),
write_buffer_manager_->buffer_size());
}

WriteThread::Writer nonmem_w;
if (two_write_queues_) {
Expand Down
69 changes: 69 additions & 0 deletions db/db_write_buffer_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,75 @@ TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsMultipleDB) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}

#ifndef ROCKSDB_LITE

// Tests a `WriteBufferManager` constructed with `allow_stall == false` does not
// thrash memtable switching when full and a CF receives multiple writes.
// Instead, we expect to switch a CF's memtable for flush only when that CF does
// not have any pending or running flush.
//
// This test uses multiple DBs each with a single CF instead of a single DB
// with multiple CFs. That way we can control which CF is considered for switch
// by writing to that CF's DB.
//
// Not supported in LITE mode due to `GetProperty()` unavailable.
TEST_P(DBWriteBufferManagerTest, StopSwitchingMemTablesOnceFlushing) {
Options options = CurrentOptions();
options.arena_block_size = 4 << 10; // 4KB
options.write_buffer_size = 1 << 20; // 1MB
std::shared_ptr<Cache> cache =
NewLRUCache(4 << 20 /* capacity (4MB) */, 2 /* num_shard_bits */);
ASSERT_LT(cache->GetUsage(), 256 << 10 /* 256KB */);
cost_cache_ = GetParam();
if (cost_cache_) {
options.write_buffer_manager.reset(new WriteBufferManager(
512 << 10 /* buffer_size (512KB) */, cache, false /* allow_stall */));
} else {
options.write_buffer_manager.reset(
new WriteBufferManager(512 << 10 /* buffer_size (512KB) */,
nullptr /* cache */, false /* allow_stall */));
}

Reopen(options);
std::string dbname = test::PerThreadDBPath("db_shared_wbm_db");
DB* shared_wbm_db = nullptr;

ASSERT_OK(DestroyDB(dbname, options));
ASSERT_OK(DB::Open(options, dbname, &shared_wbm_db));

// The last write will make WBM need flush, but it won't flush yet.
ASSERT_OK(Put(Key(1), DummyString(256 << 10 /* 256KB */), WriteOptions()));
ASSERT_FALSE(options.write_buffer_manager->ShouldFlush());
ASSERT_OK(Put(Key(1), DummyString(256 << 10 /* 256KB */), WriteOptions()));
ASSERT_TRUE(options.write_buffer_manager->ShouldFlush());

// Flushes will be pending, not running because flush threads are blocked.
test::SleepingBackgroundTask sleeping_task_high;
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
&sleeping_task_high, Env::Priority::HIGH);

for (int i = 0; i < 3; ++i) {
ASSERT_OK(
shared_wbm_db->Put(WriteOptions(), Key(1), DummyString(1 /* len */)));
std::string prop;
ASSERT_TRUE(
shared_wbm_db->GetProperty("rocksdb.num-immutable-mem-table", &prop));
ASSERT_EQ(std::to_string(i > 0 ? 1 : 0), prop);
ASSERT_TRUE(
shared_wbm_db->GetProperty("rocksdb.mem-table-flush-pending", &prop));
ASSERT_EQ(std::to_string(i > 0 ? 1 : 0), prop);
}

// Clean up DBs.
sleeping_task_high.WakeUp();
sleeping_task_high.WaitUntilDone();
ASSERT_OK(shared_wbm_db->Close());
ASSERT_OK(DestroyDB(dbname, options));
delete shared_wbm_db;
}

#endif // ROCKSDB_LITE

INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerTest, DBWriteBufferManagerTest,
testing::Bool());

Expand Down
8 changes: 8 additions & 0 deletions db/memtable_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,14 @@ bool MemTableList::IsFlushPending() const {
return false;
}

bool MemTableList::IsFlushPendingOrRunning() const {
if (current_->memlist_.size() - num_flush_not_started_ > 0) {
// Flush is already running on at least one memtable
return true;
}
return IsFlushPending();
}

// Returns the memtables that need to be flushed.
void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id,
autovector<MemTable*>* ret,
Expand Down
4 changes: 4 additions & 0 deletions db/memtable_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,10 @@ class MemTableList {
// not yet started.
bool IsFlushPending() const;

// Returns true if there is at least one memtable that is pending flush or
// flushing.
bool IsFlushPendingOrRunning() const;

// Returns the earliest memtables that needs to be flushed. The returned
// memtables are guaranteed to be in the ascending order of created time.
void PickMemtablesToFlush(uint64_t max_memtable_id,
Expand Down
3 changes: 2 additions & 1 deletion include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,8 @@ struct DBOptions {
// can be passed into multiple DBs and it will track the sum of size of all
// the DBs. If the total size of all live memtables of all the DBs exceeds
// a limit, a flush will be triggered in the next DB to which the next write
// is issued.
// is issued, as long as there is one or more column family not already
// flushing.
//
// If the object is only passed to one DB, the behavior is the same as
// db_write_buffer_size. When write_buffer_manager is set, the value set will
Expand Down