Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notifying OnFlushCompleted and OnCompactionCompleted in sequence #6342

24 changes: 16 additions & 8 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,8 @@ Status DBImpl::SetOptions(
// Trigger possible flush/compactions. This has to be before we persist
// options to file, otherwise there will be a deadlock with writer
// thread.
InstallSuperVersionAndScheduleWork(cfd, &sv_context, new_options);
InstallSuperVersionAndScheduleWork(cfd, &sv_context, new_options,
nullptr);

persist_options_status = WriteOptionsFile(
false /*need_mutex_lock*/, true /*need_enter_write_thread*/);
Expand Down Expand Up @@ -2309,7 +2310,8 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
assert(cfd != nullptr);
InstallSuperVersionAndScheduleWork(cfd, &sv_context,
*cfd->GetLatestMutableCFOptions());
*cfd->GetLatestMutableCFOptions(),
nullptr);

if (!cfd->mem()->IsSnapshotSupported()) {
is_snapshot_supported_ = false;
Expand Down Expand Up @@ -3178,7 +3180,8 @@ Status DBImpl::DeleteFile(std::string name) {
if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd,
&job_context.superversion_contexts[0],
*cfd->GetLatestMutableCFOptions());
*cfd->GetLatestMutableCFOptions(),
nullptr);
}
FindObsoleteFiles(&job_context, false);
} // lock released here
Expand Down Expand Up @@ -3263,7 +3266,8 @@ Status DBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd,
&job_context.superversion_contexts[0],
*cfd->GetLatestMutableCFOptions());
*cfd->GetLatestMutableCFOptions(),
nullptr);
}
for (auto* deleted_file : deleted_files) {
deleted_file->being_compacted = false;
Expand Down Expand Up @@ -4146,7 +4150,8 @@ Status DBImpl::IngestExternalFiles(
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
if (!cfd->IsDropped()) {
InstallSuperVersionAndScheduleWork(cfd, &sv_ctxs[i],
*cfd->GetLatestMutableCFOptions());
*cfd->GetLatestMutableCFOptions(),
nullptr);
#ifndef NDEBUG
if (0 == i && num_cfs > 1) {
TEST_SYNC_POINT(
Expand Down Expand Up @@ -4248,7 +4253,8 @@ Status DBImpl::CreateColumnFamilyWithImport(
status = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options,
nullptr);
}
}
}
Expand Down Expand Up @@ -4284,7 +4290,8 @@ Status DBImpl::CreateColumnFamilyWithImport(
status = versions_->LogAndApply(cfd, *cf_options, import_job.edit(),
&mutex_, directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &sv_context, *cf_options);
InstallSuperVersionAndScheduleWork(cfd, &sv_context, *cf_options,
nullptr);
}
}

Expand Down Expand Up @@ -4485,7 +4492,8 @@ Status DBImpl::ReserveFileNumbersBeforeIngestion(
s = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
directories_.GetDbDir());
if (s.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options,
nullptr);
}
dummy_sv_ctx.Clean();
return s;
Expand Down
3 changes: 2 additions & 1 deletion db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1608,7 +1608,8 @@ class DBImpl : public DB {
// state needs flush or compaction.
void InstallSuperVersionAndScheduleWork(
ColumnFamilyData* cfd, SuperVersionContext* sv_context,
const MutableCFOptions& mutable_cf_options);
const MutableCFOptions& mutable_cf_options,
std::list<std::unique_ptr<FlushJobInfo>>* flush_jobs_info);
burtonli marked this conversation as resolved.
Show resolved Hide resolved

bool GetIntPropertyInternal(ColumnFamilyData* cfd,
const DBPropertyInfo& property_info,
Expand Down
51 changes: 38 additions & 13 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,15 @@ Status DBImpl::FlushMemTableToOutputFile(
}

if (s.ok()) {
#ifndef ROCKSDB_LITE
InstallSuperVersionAndScheduleWork(cfd, superversion_context,
mutable_cf_options,
flush_job.GetCommittedFlushJobsInfo());
#else
InstallSuperVersionAndScheduleWork(cfd, superversion_context,
mutable_cf_options);
mutable_cf_options,
nullptr);
#endif // ROCKSDB_LITE
if (made_progress) {
*made_progress = true;
}
Expand All @@ -211,9 +218,6 @@ Status DBImpl::FlushMemTableToOutputFile(
}
if (s.ok()) {
#ifndef ROCKSDB_LITE
// may temporarily unlock and lock the mutex.
NotifyOnFlushCompleted(cfd, mutable_cf_options,
flush_job.GetCommittedFlushJobsInfo());
auto sfm = static_cast<SstFileManagerImpl*>(
immutable_db_options_.sst_file_manager.get());
if (sfm) {
Expand Down Expand Up @@ -489,9 +493,17 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
if (cfds[i]->IsDropped()) {
continue;
}
#ifndef ROCKSDB_LITE
InstallSuperVersionAndScheduleWork(cfds[i],
&job_context->superversion_contexts[i],
all_mutable_cf_options[i],
jobs[i]->GetCommittedFlushJobsInfo());
#else
InstallSuperVersionAndScheduleWork(cfds[i],
&job_context->superversion_contexts[i],
all_mutable_cf_options[i]);
all_mutable_cf_options[i],
nullptr);
#endif // ROCKSDB_LITE
VersionStorageInfo::LevelSummaryStorage tmp;
ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
cfds[i]->GetName().c_str(),
Expand All @@ -508,8 +520,6 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
if (cfds[i]->IsDropped()) {
continue;
}
NotifyOnFlushCompleted(cfds[i], all_mutable_cf_options[i],
jobs[i]->GetCommittedFlushJobsInfo());
if (sfm) {
std::string file_path = MakeTableFileName(
cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber());
Expand Down Expand Up @@ -998,7 +1008,7 @@ Status DBImpl::CompactFilesImpl(
if (status.ok()) {
InstallSuperVersionAndScheduleWork(c->column_family_data(),
&job_context->superversion_contexts[0],
*c->mutable_cf_options());
*c->mutable_cf_options(), nullptr);
}
c->ReleaseCompactionFiles(s);
#ifndef ROCKSDB_LITE
Expand Down Expand Up @@ -1265,7 +1275,8 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {

status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_,
directories_.GetDbDir());
InstallSuperVersionAndScheduleWork(cfd, &sv_context, mutable_cf_options);
InstallSuperVersionAndScheduleWork(cfd, &sv_context, mutable_cf_options,
nullptr);

ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] LogAndApply: %s\n",
cfd->GetName().c_str(), status.ToString().data());
Expand Down Expand Up @@ -2630,7 +2641,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
&mutex_, directories_.GetDbDir());
InstallSuperVersionAndScheduleWork(c->column_family_data(),
&job_context->superversion_contexts[0],
*c->mutable_cf_options());
*c->mutable_cf_options(), nullptr);
ROCKS_LOG_BUFFER(log_buffer, "[%s] Deleted %d files\n",
c->column_family_data()->GetName().c_str(),
c->num_input_files(0));
Expand Down Expand Up @@ -2686,7 +2697,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
// Use latest MutableCFOptions
InstallSuperVersionAndScheduleWork(c->column_family_data(),
&job_context->superversion_contexts[0],
*c->mutable_cf_options());
*c->mutable_cf_options(), nullptr);

VersionStorageInfo::LevelSummaryStorage tmp;
c->column_family_data()->internal_stats()->IncBytesMoved(c->output_level(),
Expand Down Expand Up @@ -2772,7 +2783,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
if (status.ok()) {
InstallSuperVersionAndScheduleWork(c->column_family_data(),
&job_context->superversion_contexts[0],
*c->mutable_cf_options());
*c->mutable_cf_options(), nullptr);
}
*made_progress = true;
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
Expand Down Expand Up @@ -3019,7 +3030,8 @@ void DBImpl::BuildCompactionJobInfo(

void DBImpl::InstallSuperVersionAndScheduleWork(
ColumnFamilyData* cfd, SuperVersionContext* sv_context,
const MutableCFOptions& mutable_cf_options) {
const MutableCFOptions& mutable_cf_options,
std::list<std::unique_ptr<FlushJobInfo>>* flush_jobs_info) {
mutex_.AssertHeld();

// Update max_total_in_memory_state_
Expand All @@ -3036,6 +3048,19 @@ void DBImpl::InstallSuperVersionAndScheduleWork(
}
cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options);

#ifndef ROCKSDB_LITE
// Notify flush completed before triggering compactions.
// Make sure OnFlushCompleted for a sst file is called before involving in
// any compaction, which guarantees OnCompactionBegin/Completed is called
// subsequently.
if (flush_jobs_info) {
// may temporarily unlock and lock the mutex.
NotifyOnFlushCompleted(cfd, mutable_cf_options, flush_jobs_info);
}
#else
(void)flush_jobs_info;
#endif // ROCKSDB_LITE

// There may be a small data race here. The snapshot tricking bottommost
// compaction may already be released here. But assuming there will always be
// newer snapshot created and released frequently, the compaction will be
Expand Down
3 changes: 2 additions & 1 deletion db/db_impl/db_impl_experimental.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) {
if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd,
&job_context.superversion_contexts[0],
*cfd->GetLatestMutableCFOptions());
*cfd->GetLatestMutableCFOptions(),
nullptr);
}
} // lock released here
LogFlush(immutable_db_options_.info_log);
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1443,7 +1443,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
SuperVersionContext sv_context(/* create_superversion */ true);
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
impl->InstallSuperVersionAndScheduleWork(
cfd, &sv_context, *cfd->GetLatestMutableCFOptions());
cfd, &sv_context, *cfd->GetLatestMutableCFOptions(), nullptr);
}
sv_context.Clean();
if (impl->two_write_queues_) {
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1734,7 +1734,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
new_mem->Ref();
cfd->SetMemtable(new_mem);
InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context,
mutable_cf_options);
mutable_cf_options, nullptr);
#ifndef ROCKSDB_LITE
mutex_.Unlock();
// Notify client that memtable is sealed, now that we have successfully
Expand Down