Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notifying OnFlushCompleted and OnCompactionCompleted in sequence #6342

1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* Fix a bug that prevents opening a DB after two consecutive crash with TransactionDB, where the first crash recovers from a corrupted WAL with kPointInTimeRecovery but the second cannot.
* Fixed issue #6316 that can cause a corruption of the MANIFEST file in the middle when writing to it fails due to no disk space.
* Add DBOptions::skip_checking_sst_file_sizes_on_db_open. It disables potentially expensive checking of all sst file sizes in DB::Open().
* Fixed an issue where listeners could receive out of order OnFlushCompleted/OnCompactionCompleted notifications. This could cause a crash in BlobDB when garbage collection is enabled (see #6338).

### Public API Change
* The BlobDB garbage collector now emits the statistics `BLOB_DB_GC_NUM_FILES` (number of blob files obsoleted during GC), `BLOB_DB_GC_NUM_NEW_FILES` (number of new blob files generated during GC), `BLOB_DB_GC_FAILURES` (number of failed GC passes), `BLOB_DB_GC_NUM_KEYS_RELOCATED` (number of blobs relocated during GC), and `BLOB_DB_GC_BYTES_RELOCATED` (total size of blobs relocated during GC). On the other hand, the following statistics, which are not relevant for the new GC implementation, are now deprecated: `BLOB_DB_GC_NUM_KEYS_OVERWRITTEN`, `BLOB_DB_GC_NUM_KEYS_EXPIRED`, `BLOB_DB_GC_BYTES_OVERWRITTEN`, `BLOB_DB_GC_BYTES_EXPIRED`, and `BLOB_DB_GC_MICROS`.
Expand Down
24 changes: 16 additions & 8 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,8 @@ Status DBImpl::SetOptions(
// Trigger possible flush/compactions. This has to be before we persist
// options to file, otherwise there will be a deadlock with writer
// thread.
InstallSuperVersionAndScheduleWork(cfd, &sv_context, new_options);
InstallSuperVersionAndScheduleWork(cfd, &sv_context, new_options,
nullptr);

persist_options_status = WriteOptionsFile(
false /*need_mutex_lock*/, true /*need_enter_write_thread*/);
Expand Down Expand Up @@ -2310,7 +2311,8 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
assert(cfd != nullptr);
InstallSuperVersionAndScheduleWork(cfd, &sv_context,
*cfd->GetLatestMutableCFOptions());
*cfd->GetLatestMutableCFOptions(),
nullptr);

if (!cfd->mem()->IsSnapshotSupported()) {
is_snapshot_supported_ = false;
Expand Down Expand Up @@ -3179,7 +3181,8 @@ Status DBImpl::DeleteFile(std::string name) {
if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd,
&job_context.superversion_contexts[0],
*cfd->GetLatestMutableCFOptions());
*cfd->GetLatestMutableCFOptions(),
nullptr);
}
FindObsoleteFiles(&job_context, false);
} // lock released here
Expand Down Expand Up @@ -3264,7 +3267,8 @@ Status DBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd,
&job_context.superversion_contexts[0],
*cfd->GetLatestMutableCFOptions());
*cfd->GetLatestMutableCFOptions(),
nullptr);
}
for (auto* deleted_file : deleted_files) {
deleted_file->being_compacted = false;
Expand Down Expand Up @@ -4168,7 +4172,8 @@ Status DBImpl::IngestExternalFiles(
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
if (!cfd->IsDropped()) {
InstallSuperVersionAndScheduleWork(cfd, &sv_ctxs[i],
*cfd->GetLatestMutableCFOptions());
*cfd->GetLatestMutableCFOptions(),
nullptr);
#ifndef NDEBUG
if (0 == i && num_cfs > 1) {
TEST_SYNC_POINT(
Expand Down Expand Up @@ -4270,7 +4275,8 @@ Status DBImpl::CreateColumnFamilyWithImport(
status = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options,
nullptr);
}
}
}
Expand Down Expand Up @@ -4306,7 +4312,8 @@ Status DBImpl::CreateColumnFamilyWithImport(
status = versions_->LogAndApply(cfd, *cf_options, import_job.edit(),
&mutex_, directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &sv_context, *cf_options);
InstallSuperVersionAndScheduleWork(cfd, &sv_context, *cf_options,
nullptr);
}
}

Expand Down Expand Up @@ -4507,7 +4514,8 @@ Status DBImpl::ReserveFileNumbersBeforeIngestion(
s = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
directories_.GetDbDir());
if (s.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options,
nullptr);
}
dummy_sv_ctx.Clean();
return s;
Expand Down
6 changes: 5 additions & 1 deletion db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1612,7 +1612,8 @@ class DBImpl : public DB {
// state needs flush or compaction.
void InstallSuperVersionAndScheduleWork(
ColumnFamilyData* cfd, SuperVersionContext* sv_context,
const MutableCFOptions& mutable_cf_options);
const MutableCFOptions& mutable_cf_options,
std::function<void()> completion_callback);

bool GetIntPropertyInternal(ColumnFamilyData* cfd,
const DBPropertyInfo& property_info,
Expand Down Expand Up @@ -2067,6 +2068,9 @@ class DBImpl : public DB {
InstrumentedCondVar atomic_flush_install_cv_;

bool wal_in_db_path_;

// The mutex is for ordering flush and compaction completion notifications.
InstrumentedMutex notification_mutex_;
};

extern Options SanitizeOptions(const std::string& db, const Options& src);
Expand Down
95 changes: 63 additions & 32 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,13 @@ Status DBImpl::FlushMemTableToOutputFile(
}

if (s.ok()) {
std::function<void()> callback;
#ifndef ROCKSDB_LITE
callback = std::bind(&DBImpl::NotifyOnFlushCompleted, this, cfd,
mutable_cf_options, flush_job.GetCommittedFlushJobsInfo());
#endif // ROCKSDB_LITE
InstallSuperVersionAndScheduleWork(cfd, superversion_context,
mutable_cf_options);
mutable_cf_options, callback);
if (made_progress) {
*made_progress = true;
}
Expand All @@ -211,9 +216,6 @@ Status DBImpl::FlushMemTableToOutputFile(
}
if (s.ok()) {
#ifndef ROCKSDB_LITE
// may temporarily unlock and lock the mutex.
NotifyOnFlushCompleted(cfd, mutable_cf_options,
flush_job.GetCommittedFlushJobsInfo());
auto sfm = static_cast<SstFileManagerImpl*>(
immutable_db_options_.sst_file_manager.get());
if (sfm) {
Expand Down Expand Up @@ -489,9 +491,15 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
if (cfds[i]->IsDropped()) {
continue;
}
std::function<void()> callback;
#ifndef ROCKSDB_LITE
callback = std::bind(&DBImpl::NotifyOnFlushCompleted, this, cfds[i],
all_mutable_cf_options[i], jobs[i]->GetCommittedFlushJobsInfo());
#endif // ROCKSDB_LITE
InstallSuperVersionAndScheduleWork(cfds[i],
&job_context->superversion_contexts[i],
all_mutable_cf_options[i]);
all_mutable_cf_options[i],
callback);
VersionStorageInfo::LevelSummaryStorage tmp;
ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
cfds[i]->GetName().c_str(),
Expand All @@ -508,8 +516,6 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
if (cfds[i]->IsDropped()) {
continue;
}
NotifyOnFlushCompleted(cfds[i], all_mutable_cf_options[i],
jobs[i]->GetCommittedFlushJobsInfo());
if (sfm) {
std::string file_path = MakeTableFileName(
cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber());
Expand Down Expand Up @@ -619,17 +625,21 @@ void DBImpl::NotifyOnFlushCompleted(
bool triggered_writes_stop =
(cfd->current()->storage_info()->NumLevelFiles(0) >=
mutable_cf_options.level0_stop_writes_trigger);
// release lock while notifying events
mutex_.Unlock();
{
for (auto& info : *flush_jobs_info) {
info->triggered_writes_slowdown = triggered_writes_slowdown;
info->triggered_writes_stop = triggered_writes_stop;
for (auto listener : immutable_db_options_.listeners) {
listener->OnFlushCompleted(this, *info);
// triggers completion notifications in turn
InstrumentedMutexLock l(&notification_mutex_);
// release lock while notifying events
mutex_.Unlock();
burtonli marked this conversation as resolved.
Show resolved Hide resolved
{
for (auto& info : *flush_jobs_info) {
info->triggered_writes_slowdown = triggered_writes_slowdown;
info->triggered_writes_stop = triggered_writes_stop;
for (auto listener : immutable_db_options_.listeners) {
listener->OnFlushCompleted(this, *info);
}
}
flush_jobs_info->clear();
}
flush_jobs_info->clear();
}
mutex_.Lock();
// no need to signal bg_cv_ as it will be signaled at the end of the
Expand Down Expand Up @@ -998,7 +1008,7 @@ Status DBImpl::CompactFilesImpl(
if (status.ok()) {
InstallSuperVersionAndScheduleWork(c->column_family_data(),
&job_context->superversion_contexts[0],
*c->mutable_cf_options());
*c->mutable_cf_options(), nullptr);
}
c->ReleaseCompactionFiles(s);
#ifndef ROCKSDB_LITE
Expand Down Expand Up @@ -1177,15 +1187,19 @@ void DBImpl::NotifyOnCompactionCompleted(
}
Version* current = cfd->current();
current->Ref();
// release lock while notifying events
mutex_.Unlock();
TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex");
{
CompactionJobInfo info{};
BuildCompactionJobInfo(cfd, c, st, compaction_job_stats, job_id, current,
&info);
for (auto listener : immutable_db_options_.listeners) {
listener->OnCompactionCompleted(this, info);
// triggers completion notifications in turn
InstrumentedMutexLock l(&notification_mutex_);
// release lock while notifying events
mutex_.Unlock();
TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex");
{
CompactionJobInfo info{};
BuildCompactionJobInfo(cfd, c, st, compaction_job_stats, job_id, current,
&info);
for (auto listener : immutable_db_options_.listeners) {
listener->OnCompactionCompleted(this, info);
}
}
}
mutex_.Lock();
Expand Down Expand Up @@ -1265,7 +1279,8 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {

status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_,
directories_.GetDbDir());
InstallSuperVersionAndScheduleWork(cfd, &sv_context, mutable_cf_options);
InstallSuperVersionAndScheduleWork(cfd, &sv_context, mutable_cf_options,
nullptr);

ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] LogAndApply: %s\n",
cfd->GetName().c_str(), status.ToString().data());
Expand Down Expand Up @@ -2628,9 +2643,12 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
status = versions_->LogAndApply(c->column_family_data(),
*c->mutable_cf_options(), c->edit(),
&mutex_, directories_.GetDbDir());
auto callback = std::bind(&DBImpl::NotifyOnCompactionCompleted, this,
c->column_family_data(), c.get(), status, compaction_job_stats,
job_context->job_id);
InstallSuperVersionAndScheduleWork(c->column_family_data(),
&job_context->superversion_contexts[0],
*c->mutable_cf_options());
*c->mutable_cf_options(), callback);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a side note that has nothing to do with the PR per se but we might have a small bug here: InstallSuperVersionAndScheduleWork is called here (and on the trivial move branch) regardless of status.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have noticed that as well. Also there is a different behavior between NotifyOnCompactionCompleted and NotifyOnFlushCompleted. NotifyOnFlushCompleted only triggers when status.ok(), but NotifyOnCompactionCompleted triggers regardless of status. I kept the existing logic as it is, and we can have separate PR to address them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is definitely out of scope here. Also, IIRC FlushJobInfo does not even have a status field.

ROCKS_LOG_BUFFER(log_buffer, "[%s] Deleted %d files\n",
c->column_family_data()->GetName().c_str(),
c->num_input_files(0));
Expand Down Expand Up @@ -2683,10 +2701,13 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
status = versions_->LogAndApply(c->column_family_data(),
*c->mutable_cf_options(), c->edit(),
&mutex_, directories_.GetDbDir());
auto callback = std::bind(&DBImpl::NotifyOnCompactionCompleted, this,
c->column_family_data(), c.get(), status, compaction_job_stats,
job_context->job_id);
// Use latest MutableCFOptions
InstallSuperVersionAndScheduleWork(c->column_family_data(),
&job_context->superversion_contexts[0],
*c->mutable_cf_options());
*c->mutable_cf_options(), callback);

VersionStorageInfo::LevelSummaryStorage tmp;
c->column_family_data()->internal_stats()->IncBytesMoved(c->output_level(),
Expand Down Expand Up @@ -2769,10 +2790,15 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
mutex_.Lock();

status = compaction_job.Install(*c->mutable_cf_options());
auto callback = std::bind(&DBImpl::NotifyOnCompactionCompleted, this,
c->column_family_data(), c.get(), status, compaction_job_stats,
job_context->job_id);
if (status.ok()) {
InstallSuperVersionAndScheduleWork(c->column_family_data(),
&job_context->superversion_contexts[0],
*c->mutable_cf_options());
*c->mutable_cf_options(), callback);
} else {
callback();
}
*made_progress = true;
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
Expand All @@ -2790,9 +2816,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
sfm->OnCompactionCompletion(c.get());
}
#endif // ROCKSDB_LITE

NotifyOnCompactionCompleted(c->column_family_data(), c.get(), status,
compaction_job_stats, job_context->job_id);
}

if (status.ok() || status.IsCompactionTooLarge() ||
Expand Down Expand Up @@ -3019,7 +3042,8 @@ void DBImpl::BuildCompactionJobInfo(

void DBImpl::InstallSuperVersionAndScheduleWork(
ColumnFamilyData* cfd, SuperVersionContext* sv_context,
const MutableCFOptions& mutable_cf_options) {
const MutableCFOptions& mutable_cf_options,
std::function<void()> completion_callback) {
mutex_.AssertHeld();

// Update max_total_in_memory_state_
Expand All @@ -3036,6 +3060,13 @@ void DBImpl::InstallSuperVersionAndScheduleWork(
}
cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options);

// Install super version compaction callback for notifying OnFlushCompleted
// or OnCompactionCompleted. Notification triggers before scheduling any new
// compaction.
if (completion_callback) {
completion_callback();
}

// There may be a small data race here. The snapshot tricking bottommost
// compaction may already be released here. But assuming there will always be
// newer snapshot created and released frequently, the compaction will be
Expand Down
3 changes: 2 additions & 1 deletion db/db_impl/db_impl_experimental.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) {
if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd,
&job_context.superversion_contexts[0],
*cfd->GetLatestMutableCFOptions());
*cfd->GetLatestMutableCFOptions(),
nullptr);
}
} // lock released here
LogFlush(immutable_db_options_.info_log);
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1464,7 +1464,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
SuperVersionContext sv_context(/* create_superversion */ true);
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
impl->InstallSuperVersionAndScheduleWork(
cfd, &sv_context, *cfd->GetLatestMutableCFOptions());
cfd, &sv_context, *cfd->GetLatestMutableCFOptions(), nullptr);
}
sv_context.Clean();
if (impl->two_write_queues_) {
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1734,7 +1734,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
new_mem->Ref();
cfd->SetMemtable(new_mem);
InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context,
mutable_cf_options);
mutable_cf_options, nullptr);
#ifndef ROCKSDB_LITE
mutex_.Unlock();
// Notify client that memtable is sealed, now that we have successfully
Expand Down