Skip to content

Commit

Permalink
Fix conflict between AddFile() and CompactRange()
Browse files Browse the repository at this point in the history
Summary:
Fix the conflict bug between AddFile() and CompactRange() by
- Make sure that no AddFile calls are running when asking CompactionPicker to pick compaction for manual compaction
- If AddFile() run after we pick the compaction for the manual compaction it will be aware of it since we will add the manual compaction to running_compactions_ after picking it

This will solve these 2 scenarios
- If AddFile() is running, we will wait for it to finish before we pick a compaction for the manual compaction
- If we already picked a manual compaction and then AddFile() started ... we ensure that it never ingest a file in a level that will overlap with the manual compaction

Test Plan: unit tests

Reviewers: sdong

Reviewed By: sdong

Subscribers: andrewkr, yoshinorim, jkedgar, dhruba

Differential Revision: https://reviews.facebook.net/D64449
  • Loading branch information
IslamAbdelRahman committed Sep 28, 2016
1 parent eb44ed6 commit 87dfc1d
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 21 deletions.
13 changes: 11 additions & 2 deletions db/db_impl.cc
Expand Up @@ -346,7 +346,6 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
has_unpersisted_data_(false),
env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
num_running_addfile_(0),
addfile_cv_(&mutex_),
#ifndef ROCKSDB_LITE
wal_manager_(immutable_db_options_, env_options_),
#endif // ROCKSDB_LITE
Expand Down Expand Up @@ -2033,7 +2032,6 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
int max_level_with_files = 0;
{
InstrumentedMutexLock l(&mutex_);
WaitForAddFile();
Version* base = cfd->current();
for (int level = 1; level < base->storage_info()->num_non_empty_levels();
level++) {
Expand Down Expand Up @@ -2746,6 +2744,8 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
manual.end = &end_storage;
}

TEST_SYNC_POINT("DBImpl::RunManualCompaction:0");
TEST_SYNC_POINT("DBImpl::RunManualCompaction:1");
InstrumentedMutexLock l(&mutex_);

// When a manual compaction arrives, temporarily disable scheduling of
Expand Down Expand Up @@ -2813,6 +2813,10 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
ca->m = &manual;
manual.incomplete = false;
bg_compaction_scheduled_++;
// manual.compaction will be added to running_compactions_ and erased
// inside BackgroundCompaction() but we need to put it now since we
// will unlock the mutex.
running_compactions_.insert(manual.compaction);
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
&DBImpl::UnscheduleCallback);
scheduled = true;
Expand Down Expand Up @@ -3653,6 +3657,11 @@ void DBImpl::RemoveManualCompaction(DBImpl::ManualCompaction* m) {
}

bool DBImpl::ShouldntRunManualCompaction(ManualCompaction* m) {
if (num_running_addfile_ > 0) {
// We need to wait for other AddFile() calls to finish
// before running a manual compaction.
return true;
}
if (m->exclusive) {
return (bg_compaction_scheduled_ > 0);
}
Expand Down
16 changes: 7 additions & 9 deletions db/db_impl.h
Expand Up @@ -655,11 +655,12 @@ class DBImpl : public DB {
// REQUIRES: mutex_ held
void WaitForAddFile();

Status CompactFilesImpl(
const CompactionOptions& compact_options, ColumnFamilyData* cfd,
Version* version, const std::vector<std::string>& input_file_names,
const int output_level, int output_path_id, JobContext* job_context,
LogBuffer* log_buffer);
Status CompactFilesImpl(const CompactionOptions& compact_options,
ColumnFamilyData* cfd, Version* version,
const std::vector<std::string>& input_file_names,
const int output_level, int output_path_id,
JobContext* job_context, LogBuffer* log_buffer);

Status ReadExternalSstFileInfo(ColumnFamilyHandle* column_family,
const std::string& file_path,
ExternalSstFileInfo* file_info);
Expand Down Expand Up @@ -737,6 +738,7 @@ class DBImpl : public DB {
// * whenever bg_flush_scheduled_ or bg_purge_scheduled_ value decreases
// (i.e. whenever a flush is done, even if it didn't make any progress)
// * whenever there is an error in background purge, flush or compaction
// * whenever num_running_addfile_ goes to 0.
InstrumentedCondVar bg_cv_;
uint64_t logfile_number_;
std::deque<uint64_t>
Expand Down Expand Up @@ -986,10 +988,6 @@ class DBImpl : public DB {
// REQUIRES: mutex held
int num_running_addfile_;

// A condition variable that will be signaled whenever
// num_running_addfile_ goes to 0.
InstrumentedCondVar addfile_cv_;

#ifndef ROCKSDB_LITE
WalManager wal_manager_;
#endif // ROCKSDB_LITE
Expand Down
4 changes: 2 additions & 2 deletions db/db_impl_add_file.cc
Expand Up @@ -340,7 +340,7 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,

num_running_addfile_--;
if (num_running_addfile_ == 0) {
addfile_cv_.SignalAll();
bg_cv_.SignalAll();
}
TEST_SYNC_POINT("DBImpl::AddFile:MutexUnlock");
} // mutex_ is unlocked here;
Expand Down Expand Up @@ -426,7 +426,7 @@ int DBImpl::PickLevelForIngestedFile(ColumnFamilyData* cfd,
void DBImpl::WaitForAddFile() {
mutex_.AssertHeld();
while (num_running_addfile_ > 0) {
addfile_cv_.Wait();
bg_cv_.Wait();
}
}
#endif // ROCKSDB_LITE
Expand Down
67 changes: 59 additions & 8 deletions db/external_sst_file_test.cc
Expand Up @@ -1011,10 +1011,13 @@ TEST_F(ExternalSSTFileTest, PickedLevelBug) {
// We have 2 overlapping files in L0
EXPECT_EQ(FilesPerLevel(), "2");

rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"DBImpl::AddFile:MutexLock", "ExternalSSTFileTest::PickedLevelBug:0"},
{"ExternalSSTFileTest::PickedLevelBug:1", "DBImpl::AddFile:MutexUnlock"},
});
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::AddFile:MutexLock", "ExternalSSTFileTest::PickedLevelBug:0"},
{"ExternalSSTFileTest::PickedLevelBug:1", "DBImpl::AddFile:MutexUnlock"},
{"ExternalSSTFileTest::PickedLevelBug:2",
"DBImpl::RunManualCompaction:0"},
{"ExternalSSTFileTest::PickedLevelBug:3",
"DBImpl::RunManualCompaction:1"}});

std::atomic<bool> bg_compact_started(false);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
Expand All @@ -1023,6 +1026,12 @@ TEST_F(ExternalSSTFileTest, PickedLevelBug) {

rocksdb::SyncPoint::GetInstance()->EnableProcessing();

// While writing the MANIFEST start a thread that will ask for compaction
std::thread bg_compact([&]() {
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
});
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:2");

// Start a thread that will ingest a new file
std::thread bg_addfile([&]() {
file_keys = {1, 2, 3};
Expand All @@ -1032,10 +1041,7 @@ TEST_F(ExternalSSTFileTest, PickedLevelBug) {
// Wait for AddFile to start picking levels and writing MANIFEST
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:0");

// While writing the MANIFEST start a thread that will ask for compaction
std::thread bg_compact([&]() {
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
});
TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:3");

// We need to verify that no compactions can run while AddFile is
// ingesting the files into the levels it find suitable. So we will
Expand Down Expand Up @@ -1065,6 +1071,51 @@ TEST_F(ExternalSSTFileTest, PickedLevelBug) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}

TEST_F(ExternalSSTFileTest, CompactDuringAddFileRandom) {
Options options = CurrentOptions();
options.disable_auto_compactions = false;
options.level0_file_num_compaction_trigger = 2;
options.num_levels = 2;
options.env = env_;
DestroyAndReopen(options);

std::function<void()> bg_compact = [&]() {
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
};

int range_id = 0;
std::vector<int> file_keys;
std::function<void()> bg_addfile = [&]() {
ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, range_id));
};

std::vector<std::thread> threads;
while (range_id < 5000) {
int range_start = (range_id * 20);
int range_end = range_start + 10;

file_keys.clear();
for (int k = range_start + 1; k < range_end; k++) {
file_keys.push_back(k);
}
ASSERT_OK(Put(Key(range_start), Key(range_start)));
ASSERT_OK(Put(Key(range_end), Key(range_end)));
ASSERT_OK(Flush());

if (range_id % 10 == 0) {
threads.emplace_back(bg_compact);
}
threads.emplace_back(bg_addfile);

for (auto& t : threads) {
t.join();
}
threads.clear();

range_id++;
}
}

TEST_F(ExternalSSTFileTest, PickedLevelDynamic) {
Options options = CurrentOptions();
options.disable_auto_compactions = false;
Expand Down

0 comments on commit 87dfc1d

Please sign in to comment.