-
Notifications
You must be signed in to change notification settings - Fork 6.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Notifying OnFlushCompleted and OnCompactionCompleted in sequence #6342
Changes from 5 commits
8114ac4
eca7e8d
559349d
9118967
df6c0be
9494469
9745171
47e1978
145ab41
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -194,8 +194,13 @@ Status DBImpl::FlushMemTableToOutputFile( | |
} | ||
|
||
if (s.ok()) { | ||
std::function<void()> callback; | ||
#ifndef ROCKSDB_LITE | ||
callback = std::bind(&DBImpl::NotifyOnFlushCompleted, this, cfd, | ||
mutable_cf_options, flush_job.GetCommittedFlushJobsInfo()); | ||
#endif // ROCKSDB_LITE | ||
InstallSuperVersionAndScheduleWork(cfd, superversion_context, | ||
mutable_cf_options); | ||
mutable_cf_options, callback); | ||
if (made_progress) { | ||
*made_progress = true; | ||
} | ||
|
@@ -211,9 +216,6 @@ Status DBImpl::FlushMemTableToOutputFile( | |
} | ||
if (s.ok()) { | ||
#ifndef ROCKSDB_LITE | ||
// may temporarily unlock and lock the mutex. | ||
NotifyOnFlushCompleted(cfd, mutable_cf_options, | ||
flush_job.GetCommittedFlushJobsInfo()); | ||
auto sfm = static_cast<SstFileManagerImpl*>( | ||
immutable_db_options_.sst_file_manager.get()); | ||
if (sfm) { | ||
|
@@ -489,9 +491,15 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( | |
if (cfds[i]->IsDropped()) { | ||
continue; | ||
} | ||
std::function<void()> callback; | ||
#ifndef ROCKSDB_LITE | ||
callback = std::bind(&DBImpl::NotifyOnFlushCompleted, this, cfds[i], | ||
all_mutable_cf_options[i], jobs[i]->GetCommittedFlushJobsInfo()); | ||
#endif // ROCKSDB_LITE | ||
InstallSuperVersionAndScheduleWork(cfds[i], | ||
&job_context->superversion_contexts[i], | ||
all_mutable_cf_options[i]); | ||
all_mutable_cf_options[i], | ||
callback); | ||
VersionStorageInfo::LevelSummaryStorage tmp; | ||
ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n", | ||
cfds[i]->GetName().c_str(), | ||
|
@@ -508,8 +516,6 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( | |
if (cfds[i]->IsDropped()) { | ||
continue; | ||
} | ||
NotifyOnFlushCompleted(cfds[i], all_mutable_cf_options[i], | ||
jobs[i]->GetCommittedFlushJobsInfo()); | ||
if (sfm) { | ||
std::string file_path = MakeTableFileName( | ||
cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber()); | ||
|
@@ -619,17 +625,21 @@ void DBImpl::NotifyOnFlushCompleted( | |
bool triggered_writes_stop = | ||
(cfd->current()->storage_info()->NumLevelFiles(0) >= | ||
mutable_cf_options.level0_stop_writes_trigger); | ||
// release lock while notifying events | ||
mutex_.Unlock(); | ||
{ | ||
for (auto& info : *flush_jobs_info) { | ||
info->triggered_writes_slowdown = triggered_writes_slowdown; | ||
info->triggered_writes_stop = triggered_writes_stop; | ||
for (auto listener : immutable_db_options_.listeners) { | ||
listener->OnFlushCompleted(this, *info); | ||
// triggers completion notifications in turn | ||
std::unique_lock<std::mutex> nlock(notification_mutex_); | ||
burtonli marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// release lock while notifying events | ||
mutex_.Unlock(); | ||
burtonli marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
for (auto& info : *flush_jobs_info) { | ||
info->triggered_writes_slowdown = triggered_writes_slowdown; | ||
info->triggered_writes_stop = triggered_writes_stop; | ||
for (auto listener : immutable_db_options_.listeners) { | ||
listener->OnFlushCompleted(this, *info); | ||
} | ||
} | ||
flush_jobs_info->clear(); | ||
} | ||
flush_jobs_info->clear(); | ||
} | ||
mutex_.Lock(); | ||
// no need to signal bg_cv_ as it will be signaled at the end of the | ||
|
@@ -998,7 +1008,7 @@ Status DBImpl::CompactFilesImpl( | |
if (status.ok()) { | ||
InstallSuperVersionAndScheduleWork(c->column_family_data(), | ||
&job_context->superversion_contexts[0], | ||
*c->mutable_cf_options()); | ||
*c->mutable_cf_options(), nullptr); | ||
} | ||
c->ReleaseCompactionFiles(s); | ||
#ifndef ROCKSDB_LITE | ||
|
@@ -1177,15 +1187,19 @@ void DBImpl::NotifyOnCompactionCompleted( | |
} | ||
Version* current = cfd->current(); | ||
current->Ref(); | ||
// release lock while notifying events | ||
mutex_.Unlock(); | ||
TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex"); | ||
{ | ||
CompactionJobInfo info{}; | ||
BuildCompactionJobInfo(cfd, c, st, compaction_job_stats, job_id, current, | ||
&info); | ||
for (auto listener : immutable_db_options_.listeners) { | ||
listener->OnCompactionCompleted(this, info); | ||
// triggers completion notifications in turn | ||
std::unique_lock<std::mutex> nlock(notification_mutex_); | ||
burtonli marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// release lock while notifying events | ||
mutex_.Unlock(); | ||
TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex"); | ||
{ | ||
CompactionJobInfo info{}; | ||
BuildCompactionJobInfo(cfd, c, st, compaction_job_stats, job_id, current, | ||
&info); | ||
for (auto listener : immutable_db_options_.listeners) { | ||
listener->OnCompactionCompleted(this, info); | ||
} | ||
} | ||
} | ||
mutex_.Lock(); | ||
|
@@ -1265,7 +1279,8 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { | |
|
||
status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_, | ||
directories_.GetDbDir()); | ||
InstallSuperVersionAndScheduleWork(cfd, &sv_context, mutable_cf_options); | ||
InstallSuperVersionAndScheduleWork(cfd, &sv_context, mutable_cf_options, | ||
nullptr); | ||
|
||
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] LogAndApply: %s\n", | ||
cfd->GetName().c_str(), status.ToString().data()); | ||
|
@@ -2628,9 +2643,12 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, | |
status = versions_->LogAndApply(c->column_family_data(), | ||
*c->mutable_cf_options(), c->edit(), | ||
&mutex_, directories_.GetDbDir()); | ||
auto callback = std::bind(&DBImpl::NotifyOnCompactionCompleted, this, | ||
c->column_family_data(), c.get(), status, compaction_job_stats, | ||
job_context->job_id); | ||
InstallSuperVersionAndScheduleWork(c->column_family_data(), | ||
&job_context->superversion_contexts[0], | ||
*c->mutable_cf_options()); | ||
*c->mutable_cf_options(), callback); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a side note that has nothing to do with the PR per se but we might have a small bug here: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have noticed that as well. Also there is a different behavior between NotifyOnCompactionCompleted and NotifyOnFlushCompleted. NotifyOnFlushCompleted only triggers when status.ok(), but NotifyOnCompactionCompleted triggers regardless of status. I kept the existing logic as it is, and we can have separate PR to address them. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, this is definitely out of scope here. Also, IIRC |
||
ROCKS_LOG_BUFFER(log_buffer, "[%s] Deleted %d files\n", | ||
c->column_family_data()->GetName().c_str(), | ||
c->num_input_files(0)); | ||
|
@@ -2683,10 +2701,13 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, | |
status = versions_->LogAndApply(c->column_family_data(), | ||
*c->mutable_cf_options(), c->edit(), | ||
&mutex_, directories_.GetDbDir()); | ||
auto callback = std::bind(&DBImpl::NotifyOnCompactionCompleted, this, | ||
c->column_family_data(), c.get(), status, compaction_job_stats, | ||
job_context->job_id); | ||
// Use latest MutableCFOptions | ||
InstallSuperVersionAndScheduleWork(c->column_family_data(), | ||
&job_context->superversion_contexts[0], | ||
*c->mutable_cf_options()); | ||
*c->mutable_cf_options(), callback); | ||
|
||
VersionStorageInfo::LevelSummaryStorage tmp; | ||
c->column_family_data()->internal_stats()->IncBytesMoved(c->output_level(), | ||
|
@@ -2769,10 +2790,15 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, | |
mutex_.Lock(); | ||
|
||
status = compaction_job.Install(*c->mutable_cf_options()); | ||
auto callback = std::bind(&DBImpl::NotifyOnCompactionCompleted, this, | ||
c->column_family_data(), c.get(), status, compaction_job_stats, | ||
job_context->job_id); | ||
if (status.ok()) { | ||
InstallSuperVersionAndScheduleWork(c->column_family_data(), | ||
&job_context->superversion_contexts[0], | ||
*c->mutable_cf_options()); | ||
*c->mutable_cf_options(), callback); | ||
} else { | ||
callback(); | ||
} | ||
*made_progress = true; | ||
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction", | ||
|
@@ -2790,9 +2816,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, | |
sfm->OnCompactionCompletion(c.get()); | ||
} | ||
#endif // ROCKSDB_LITE | ||
|
||
NotifyOnCompactionCompleted(c->column_family_data(), c.get(), status, | ||
compaction_job_stats, job_context->job_id); | ||
} | ||
|
||
if (status.ok() || status.IsCompactionTooLarge() || | ||
|
@@ -3019,7 +3042,8 @@ void DBImpl::BuildCompactionJobInfo( | |
|
||
void DBImpl::InstallSuperVersionAndScheduleWork( | ||
ColumnFamilyData* cfd, SuperVersionContext* sv_context, | ||
const MutableCFOptions& mutable_cf_options) { | ||
const MutableCFOptions& mutable_cf_options, | ||
std::function<void()> completion_callback) { | ||
mutex_.AssertHeld(); | ||
|
||
// Update max_total_in_memory_state_ | ||
|
@@ -3036,6 +3060,13 @@ void DBImpl::InstallSuperVersionAndScheduleWork( | |
} | ||
cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options); | ||
|
||
// Install super version compaction callback for notifying OnFlushCompleted | ||
// or OnCompactionCompleted. Notification triggers before scheduling any new | ||
// compaction. | ||
if (completion_callback) { | ||
completion_callback(); | ||
} | ||
|
||
// There may be a small data race here. The snapshot tricking bottommost | ||
// compaction may already be released here. But assuming there will always be | ||
// newer snapshot created and released frequently, the compaction will be | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we flip this around to clarify the fix/impact? Like "Fixed an issue where listeners could receive out of order
OnFlushCompleted
/OnCompactionCompleted
notifications. This could cause a crash in BlobDB when garbage collection is enabled (see #6338)."