Skip to content

Commit

Permalink
Disable manual compaction during ReFitLevel() (#7250)
Browse files Browse the repository at this point in the history
Summary:
Manual compaction with `CompactRangeOptions::change_levels` set could
refit to a level targeted by another manual compaction. If
force_consistency_checks were disabled, it could be possible for
overlapping files to be written at that target level.

This PR prevents the possibility by calling `DisableManualCompaction()`
prior to `ReFitLevel()`. It also improves the manual compaction disabling
mechanism to wait for pending manual compactions to complete before
returning, and support disabling from multiple threads.

Fixes #6432.

Pull Request resolved: #7250

Test Plan:
crash test command that repro'd the bug reliably:

```
$ TEST_TMPDIR=/dev/shm python tools/db_crashtest.py blackbox --simple -target_file_size_base=524288 -write_buffer_size=1048576 -clear_column_family_one_in=0 -reopen=0 -max_key=10000000 -column_families=1 -max_background_compactions=8 -compact_range_one_in=100000 -compression_type=none -compaction_style=1 -num_levels=5 -universal_min_merge_width=4 -universal_max_merge_width=8 -level0_file_num_compaction_trigger=12 -rate_limiter_bytes_per_sec=1048576000 -universal_max_size_amplification_percent=100 --duration=3600 --interval=60 --use_direct_io_for_flush_and_compaction=0 --use_direct_reads=0 --enable_compaction_filter=0
```

Reviewed By: ltamasi

Differential Revision: D23090800

Pulled By: ajkr

fbshipit-source-id: afcbcd51b42ce76789fdb907d8b9ada790709c13
  • Loading branch information
ajkr authored and facebook-github-bot committed Aug 14, 2020
1 parent e503f5e commit a1aa3f8
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 53 deletions.
2 changes: 1 addition & 1 deletion HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
## Unreleased
### Bug fixes
* Fix a performance regression introduced in 6.4 that makes a upper bound check for every Next() even if keys are within a data block that is within the upper bound.
* Fix a possible corruption to the LSM state (overlapping files within a level) when a `CompactRange()` for refitting levels (`CompactRangeOptions::change_level == true`) and another manual compaction are executed in parallel.

### New Features
* A new option `std::shared_ptr<FileChecksumGenFactory> file_checksum_gen_factory` is added to `BackupableDBOptions`. The default value for this option is `nullptr`. If this option is null, the default backup engine checksum function (crc32c) will be used for creating, verifying, or restoring backups. If it is not null and is set to the DB custom checksum factory, the custom checksum function used in DB will also be used for creating, verifying, or restoring backups, in addition to the default checksum function (crc32c). If it is not null and is set to a custom checksum factory different than the DB custom checksum factory (which may be null), BackupEngine will return `Status::InvalidArgument()`.
* A new field `std::string requested_checksum_func_name` is added to `FileChecksumGenContext`, which enables the checksum factory to create generators for a suite of different functions.


## 6.12 (2020-07-28)
### Public API Change
* Encryption file classes now exposed for inheritance in env_encryption.h
Expand Down
4 changes: 2 additions & 2 deletions db/compaction/compaction_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ CompactionIterator::CompactionIterator(
const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum,
const std::atomic<bool>* manual_compaction_paused,
const std::atomic<int>* manual_compaction_paused,
const std::shared_ptr<Logger> info_log)
: CompactionIterator(
input, cmp, merge_helper, last_sequence, snapshots,
Expand All @@ -62,7 +62,7 @@ CompactionIterator::CompactionIterator(
const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum,
const std::atomic<bool>* manual_compaction_paused,
const std::atomic<int>* manual_compaction_paused,
const std::shared_ptr<Logger> info_log)
: input_(input),
cmp_(cmp),
Expand Down
56 changes: 28 additions & 28 deletions db/compaction/compaction_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,34 +59,34 @@ class CompactionIterator {
const Compaction* compaction_;
};

CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
const Compaction* compaction = nullptr,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const SequenceNumber preserve_deletes_seqnum = 0,
const std::atomic<bool>* manual_compaction_paused = nullptr,
const std::shared_ptr<Logger> info_log = nullptr);
CompactionIterator(InternalIterator* input, const Comparator* cmp,
MergeHelper* merge_helper, SequenceNumber last_sequence,
std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
const Compaction* compaction = nullptr,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const SequenceNumber preserve_deletes_seqnum = 0,
const std::atomic<int>* manual_compaction_paused = nullptr,
const std::shared_ptr<Logger> info_log = nullptr);

// Constructor with custom CompactionProxy, used for tests.
CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const SequenceNumber preserve_deletes_seqnum = 0,
const std::atomic<bool>* manual_compaction_paused = nullptr,
const std::shared_ptr<Logger> info_log = nullptr);
CompactionIterator(InternalIterator* input, const Comparator* cmp,
MergeHelper* merge_helper, SequenceNumber last_sequence,
std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter = nullptr,
const std::atomic<bool>* shutting_down = nullptr,
const SequenceNumber preserve_deletes_seqnum = 0,
const std::atomic<int>* manual_compaction_paused = nullptr,
const std::shared_ptr<Logger> info_log = nullptr);

~CompactionIterator();

Expand Down Expand Up @@ -166,7 +166,7 @@ class CompactionIterator {
std::unique_ptr<CompactionProxy> compaction_;
const CompactionFilter* compaction_filter_;
const std::atomic<bool>* shutting_down_;
const std::atomic<bool>* manual_compaction_paused_;
const std::atomic<int>* manual_compaction_paused_;
const SequenceNumber preserve_deletes_seqnum_;
bool bottommost_level_;
bool valid_ = false;
Expand Down Expand Up @@ -235,7 +235,7 @@ class CompactionIterator {
bool IsPausingManualCompaction() {
// This is a best-effort facility, so memory_order_relaxed is sufficient.
return manual_compaction_paused_ &&
manual_compaction_paused_->load(std::memory_order_relaxed);
manual_compaction_paused_->load(std::memory_order_relaxed) > 0;
}
};
} // namespace ROCKSDB_NAMESPACE
8 changes: 4 additions & 4 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ CompactionJob::CompactionJob(
EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats,
const std::string& dbname, CompactionJobStats* compaction_job_stats,
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<bool>* manual_compaction_paused, const std::string& db_id,
const std::atomic<int>* manual_compaction_paused, const std::string& db_id,
const std::string& db_session_id)
: job_id_(job_id),
compact_(new CompactionState(compaction)),
Expand Down Expand Up @@ -929,7 +929,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
TEST_SYNC_POINT_CALLBACK(
"CompactionJob::Run():PausingManualCompaction:1",
reinterpret_cast<void*>(
const_cast<std::atomic<bool>*>(manual_compaction_paused_)));
const_cast<std::atomic<int>*>(manual_compaction_paused_)));

Slice* start = sub_compact->start;
Slice* end = sub_compact->end;
Expand Down Expand Up @@ -1023,7 +1023,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
TEST_SYNC_POINT_CALLBACK(
"CompactionJob::Run():PausingManualCompaction:2",
reinterpret_cast<void*>(
const_cast<std::atomic<bool>*>(manual_compaction_paused_)));
const_cast<std::atomic<int>*>(manual_compaction_paused_)));
if (partitioner.get()) {
last_key_for_partitioner.assign(c_iter->user_key().data_,
c_iter->user_key().size_);
Expand Down Expand Up @@ -1090,7 +1090,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
}
if ((status.ok() || status.IsColumnFamilyDropped()) &&
(manual_compaction_paused_ &&
manual_compaction_paused_->load(std::memory_order_relaxed))) {
manual_compaction_paused_->load(std::memory_order_relaxed) > 0)) {
status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
if (status.ok()) {
Expand Down
4 changes: 2 additions & 2 deletions db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class CompactionJob {
bool paranoid_file_checks, bool measure_io_stats,
const std::string& dbname, CompactionJobStats* compaction_job_stats,
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<bool>* manual_compaction_paused = nullptr,
const std::atomic<int>* manual_compaction_paused = nullptr,
const std::string& db_id = "", const std::string& db_session_id = "");

~CompactionJob();
Expand Down Expand Up @@ -163,7 +163,7 @@ class CompactionJob {
FileOptions file_options_for_read_;
VersionSet* versions_;
const std::atomic<bool>* shutting_down_;
const std::atomic<bool>* manual_compaction_paused_;
const std::atomic<int>* manual_compaction_paused_;
const SequenceNumber preserve_deletes_seqnum_;
LogBuffer* log_buffer_;
FSDirectory* db_directory_;
Expand Down
97 changes: 96 additions & 1 deletion db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5442,7 +5442,102 @@ TEST_F(DBCompactionTest, UpdateUniversalSubCompactionTest) {
ASSERT_TRUE(has_compaction);
}

#endif // !defined(ROCKSDB_LITE)
TEST_F(DBCompactionTest, ChangeLevelCompactRangeConflictsWithManual) {
// A `CompactRange()` with `change_level == true` needs to execute its final
// step, `ReFitLevel()`, in isolation. Previously there was a bug where
// refitting could target the same level as an ongoing manual compaction,
// leading to overlapping files in that level.
//
// This test ensures that case is not possible by verifying any manual
// compaction issued during the `ReFitLevel()` phase fails with
// `Status::Incomplete`.
Options options = CurrentOptions();
options.memtable_factory.reset(
new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
options.level0_file_num_compaction_trigger = 2;
options.num_levels = 3;
Reopen(options);

// Setup an LSM with three levels populated.
Random rnd(301);
int key_idx = 0;
GenerateNewFile(&rnd, &key_idx);
{
CompactRangeOptions cro;
cro.change_level = true;
cro.target_level = 2;
ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
}
ASSERT_EQ("0,0,2", FilesPerLevel(0));

GenerateNewFile(&rnd, &key_idx);
GenerateNewFile(&rnd, &key_idx);
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("1,1,2", FilesPerLevel(0));

// The background thread will refit L2->L1 while the
// foreground thread will try to simultaneously compact L0->L1.
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
// The first two dependencies ensure the foreground creates an L0 file
// between the background compaction's L0->L1 and its L1->L2.
{
"DBImpl::RunManualCompaction()::1",
"DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
"PutFG",
},
{
"DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
"FlushedFG",
"DBImpl::RunManualCompaction()::2",
},
// The next two dependencies ensure the foreground invokes
// `CompactRange()` while the background is refitting. The
// foreground's `CompactRange()` is guaranteed to attempt an L0->L1
// as we set it up with an empty memtable and a new L0 file.
{
"DBImpl::CompactRange:PreRefitLevel",
"DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
"CompactFG",
},
{
"DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
"CompactedFG",
"DBImpl::CompactRange:PostRefitLevel",
},
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

ROCKSDB_NAMESPACE::port::Thread refit_level_thread([&] {
CompactRangeOptions cro;
cro.change_level = true;
cro.target_level = 1;
ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
});

TEST_SYNC_POINT(
"DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:PutFG");
// Make sure we have something new to compact in the foreground.
// Note key 1 is carefully chosen as it ensures the file we create here
// overlaps with one of the files being refitted L2->L1 in the background.
// If we chose key 0, the file created here would not overlap.
ASSERT_OK(Put(Key(1), "val"));
ASSERT_OK(Flush());
TEST_SYNC_POINT(
"DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:FlushedFG");

TEST_SYNC_POINT(
"DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:CompactFG");
ASSERT_TRUE(dbfull()
->CompactRange(CompactRangeOptions(), nullptr, nullptr)
.IsIncomplete());
TEST_SYNC_POINT(
"DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
"CompactedFG");
refit_level_thread.join();
}

#endif // !defined(ROCKSDB_LITE)

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
9 changes: 8 additions & 1 deletion db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1833,7 +1833,14 @@ class DBImpl : public DB {
InstrumentedMutex log_write_mutex_;

std::atomic<bool> shutting_down_;
std::atomic<bool> manual_compaction_paused_;

// If zero, manual compactions are allowed to proceed. If non-zero, manual
// compactions may still be running, but will quickly fail with
// `Status::Incomplete`. The value indicates how many threads have paused
// manual compactions. It is accessed in read mode outside the DB mutex in
// compaction code paths.
std::atomic<int> manual_compaction_paused_;

// This condition variable is signaled on these conditions:
// * whenever bg_compaction_scheduled_ goes down to 0
// * if AnyManualCompaction, whenever a compaction finishes, even if it hasn't
Expand Down
28 changes: 21 additions & 7 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -848,11 +848,15 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
if (options.change_level) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[RefitLevel] waiting for background threads to stop");
DisableManualCompaction();
s = PauseBackgroundWork();
if (s.ok()) {
TEST_SYNC_POINT("DBImpl::CompactRange:PreRefitLevel");
s = ReFitLevel(cfd, final_output_level, options.target_level);
TEST_SYNC_POINT("DBImpl::CompactRange:PostRefitLevel");
ContinueBackgroundWork();
}
ContinueBackgroundWork();
EnableManualCompaction();
}
LogFlush(immutable_db_options_.info_log);

Expand Down Expand Up @@ -959,7 +963,7 @@ Status DBImpl::CompactFilesImpl(
if (shutting_down_.load(std::memory_order_acquire)) {
return Status::ShutdownInProgress();
}
if (manual_compaction_paused_.load(std::memory_order_acquire)) {
if (manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}

Expand Down Expand Up @@ -1180,7 +1184,7 @@ void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c,
return;
}
if (c->is_manual_compaction() &&
manual_compaction_paused_.load(std::memory_order_acquire)) {
manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
return;
}
Version* current = cfd->current();
Expand Down Expand Up @@ -1254,7 +1258,7 @@ void DBImpl::NotifyOnCompactionCompleted(
return;
}
if (c->is_manual_compaction() &&
manual_compaction_paused_.load(std::memory_order_acquire)) {
manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
return;
}
Version* current = cfd->current();
Expand Down Expand Up @@ -1965,11 +1969,21 @@ Status DBImpl::EnableAutoCompaction(
}

void DBImpl::DisableManualCompaction() {
manual_compaction_paused_.store(true, std::memory_order_release);
InstrumentedMutexLock l(&mutex_);
manual_compaction_paused_.fetch_add(1, std::memory_order_release);
// Wait for any pending manual compactions to finish (typically through
// failing with `Status::Incomplete`) prior to returning. This way we are
// guaranteed no pending manual compaction will commit while manual
// compactions are "disabled".
while (HasPendingManualCompaction()) {
bg_cv_.Wait();
}
}

void DBImpl::EnableManualCompaction() {
manual_compaction_paused_.store(false, std::memory_order_release);
InstrumentedMutexLock l(&mutex_);
assert(manual_compaction_paused_ > 0);
manual_compaction_paused_.fetch_sub(1, std::memory_order_release);
}

void DBImpl::MaybeScheduleFlushOrCompaction() {
Expand Down Expand Up @@ -2528,7 +2542,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
if (shutting_down_.load(std::memory_order_acquire)) {
status = Status::ShutdownInProgress();
} else if (is_manual &&
manual_compaction_paused_.load(std::memory_order_acquire)) {
manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
} else {
Expand Down
13 changes: 6 additions & 7 deletions db/db_test2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2760,9 +2760,9 @@ TEST_F(DBTest2, PausingManualCompaction1) {
int manual_compactions_paused = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run():PausingManualCompaction:1", [&](void* arg) {
auto paused = reinterpret_cast<std::atomic<bool>*>(arg);
ASSERT_FALSE(paused->load(std::memory_order_acquire));
paused->store(true, std::memory_order_release);
auto paused = static_cast<std::atomic<int>*>(arg);
ASSERT_EQ(0, paused->load(std::memory_order_acquire));
paused->fetch_add(1, std::memory_order_release);
manual_compactions_paused += 1;
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Expand Down Expand Up @@ -2921,14 +2921,13 @@ TEST_F(DBTest2, PausingManualCompaction4) {
int run_manual_compactions = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run():PausingManualCompaction:2", [&](void* arg) {
auto paused = reinterpret_cast<std::atomic<bool>*>(arg);
ASSERT_FALSE(paused->load(std::memory_order_acquire));
paused->store(true, std::memory_order_release);
auto paused = static_cast<std::atomic<int>*>(arg);
ASSERT_EQ(0, paused->load(std::memory_order_acquire));
paused->fetch_add(1, std::memory_order_release);
run_manual_compactions++;
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

dbfull()->EnableManualCompaction();
dbfull()->CompactRange(compact_options, nullptr, nullptr);
dbfull()->TEST_WaitForCompact(true);
ASSERT_EQ(run_manual_compactions, 1);
Expand Down

0 comments on commit a1aa3f8

Please sign in to comment.