Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call the OnFlushCompleted handler unconditionally #5207

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions db/compact_files_test.cc
Expand Up @@ -38,6 +38,9 @@ class FlushedFileCollector : public EventListener {
~FlushedFileCollector() override {}

void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
if (!info.status.ok()) {
return;
}
std::lock_guard<std::mutex> lock(mutex_);
flushed_files_.push_back(info.file_path);
}
Expand Down
8 changes: 7 additions & 1 deletion db/db_compaction_test.cc
Expand Up @@ -58,6 +58,9 @@ class FlushedFileCollector : public EventListener {
~FlushedFileCollector() override {}

void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
if (!info.status.ok()) {
return;
}
std::lock_guard<std::mutex> lock(mutex_);
flushed_files_.push_back(info.file_path);
}
Expand Down Expand Up @@ -103,7 +106,10 @@ class CompactionStatsCollector : public EventListener {
compaction_completed_[k]++;
}

void OnFlushCompleted(DB* /* db */, const FlushJobInfo& /* info */) override {
void OnFlushCompleted(DB* /* db */, const FlushJobInfo& info) override {
if (!info.status.ok()) {
return;
}
int k = static_cast<int>(CompactionReason::kFlush);
compaction_completed_[k]++;
}
Expand Down
3 changes: 2 additions & 1 deletion db/db_impl.h
Expand Up @@ -817,7 +817,8 @@ class DBImpl : public DB {

void NotifyOnFlushCompleted(ColumnFamilyData* cfd, FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop);
const Status& st, int job_id,
TableProperties prop);

void NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c,
const Status& st,
Expand Down
19 changes: 12 additions & 7 deletions db/db_impl_compaction_flush.cc
Expand Up @@ -205,11 +205,12 @@ Status DBImpl::FlushMemTableToOutputFile(
Status new_bg_error = s;
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
}
if (s.ok()) {
#ifndef ROCKSDB_LITE
// may temporarily unlock and lock the mutex.
NotifyOnFlushCompleted(cfd, &file_meta, mutable_cf_options,
job_context->job_id, flush_job.GetTableProperties());
// may temporarily unlock and lock the mutex.
NotifyOnFlushCompleted(cfd, &file_meta, mutable_cf_options, s,
job_context->job_id, flush_job.GetTableProperties());

if (s.ok()) {
auto sfm = static_cast<SstFileManagerImpl*>(
immutable_db_options_.sst_file_manager.get());
if (sfm) {
Expand All @@ -226,8 +227,8 @@ Status DBImpl::FlushMemTableToOutputFile(
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
}
}
#endif // ROCKSDB_LITE
}
#endif // ROCKSDB_LITE
return s;
}

Expand Down Expand Up @@ -495,7 +496,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
continue;
}
NotifyOnFlushCompleted(cfds[i], &file_meta[i], all_mutable_cf_options[i],
job_context->job_id, jobs[i].GetTableProperties());
s, job_context->job_id,
jobs[i].GetTableProperties());
if (sfm) {
std::string file_path = MakeTableFileName(
cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber());
Expand Down Expand Up @@ -561,6 +563,7 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
info.cf_name = cfd->GetName();
// TODO(yhchiang): make db_paths dynamic in case flush does not
// go to L0 in the future.
info.status = Status::OK();
info.file_path = MakeTableFileName(cfd->ioptions()->cf_paths[0].path,
file_meta->fd.GetNumber());
info.thread_id = env_->GetThreadID();
Expand Down Expand Up @@ -590,7 +593,8 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd,
FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop) {
const Status& st, int job_id,
TableProperties prop) {
#ifndef ROCKSDB_LITE
if (immutable_db_options_.listeners.size() == 0U) {
return;
Expand All @@ -613,6 +617,7 @@ void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd,
info.cf_name = cfd->GetName();
// TODO(yhchiang): make db_paths dynamic in case flush does not
// go to L0 in the future.
info.status = st;
info.file_path = MakeTableFileName(cfd->ioptions()->cf_paths[0].path,
file_meta->fd.GetNumber());
info.thread_id = env_->GetThreadID();
Expand Down
3 changes: 3 additions & 0 deletions db/db_sst_test.cc
Expand Up @@ -28,6 +28,9 @@ class FlushedFileCollector : public EventListener {
~FlushedFileCollector() override {}

void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
if (!info.status.ok()) {
return;
}
std::lock_guard<std::mutex> lock(mutex_);
flushed_files_.push_back(info.file_path);
}
Expand Down
6 changes: 6 additions & 0 deletions db/listener_test.cc
Expand Up @@ -197,6 +197,9 @@ class TestFlushListener : public EventListener {

void OnFlushCompleted(
DB* db, const FlushJobInfo& info) override {
if (!info.status.ok()) {
return;
}
flushed_dbs_.push_back(db);
flushed_column_family_names_.push_back(info.cf_name);
if (info.triggered_writes_slowdown) {
Expand Down Expand Up @@ -743,6 +746,9 @@ class MemTableSealedListener : public EventListener {

void OnFlushCompleted(DB* /*db*/,
const FlushJobInfo& flush_job_info) override {
if (!flush_job_info.status.ok()) {
return;
}
ASSERT_LE(flush_job_info.smallest_seqno, latest_seq_number_);
}
};
Expand Down
3 changes: 3 additions & 0 deletions examples/compact_files_example.cc
Expand Up @@ -72,6 +72,9 @@ class FullCompactor : public Compactor {
// compaction-task to true.
void OnFlushCompleted(
DB* db, const FlushJobInfo& info) override {
if (!info.status.ok()) {
return;
}
CompactionTask* task = PickCompaction(db, info.cf_name);
if (task != nullptr) {
if (info.triggered_writes_stop) {
Expand Down
2 changes: 2 additions & 0 deletions include/rocksdb/listener.h
Expand Up @@ -190,6 +190,8 @@ struct FlushJobInfo {
TableProperties table_properties;

FlushReason flush_reason;
// the status indicating whether the flush was successful or not.
Status status;
};

struct CompactionJobInfo {
Expand Down
3 changes: 3 additions & 0 deletions tools/db_stress.cc
Expand Up @@ -1261,6 +1261,9 @@ class DbStressListener : public EventListener {
}
#ifndef ROCKSDB_LITE
virtual void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
if (!info.status.ok()) {
return;
}
assert(IsValidColumnFamilyName(info.cf_name));
VerifyFilePath(info.file_path);
// pretending doing some work here
Expand Down
5 changes: 4 additions & 1 deletion utilities/blob_db/blob_db_listener.h
Expand Up @@ -26,7 +26,10 @@ class BlobDBListener : public EventListener {
blob_db_impl_->SyncBlobFiles();
}

void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& /*info*/) override {
void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
if (!info.status.ok()) {
return;
}
assert(blob_db_impl_ != nullptr);
blob_db_impl_->UpdateLiveSSTSize();
}
Expand Down