diff --git a/db/compact_files_test.cc b/db/compact_files_test.cc index ce80375e0e1..459dde9ea2a 100644 --- a/db/compact_files_test.cc +++ b/db/compact_files_test.cc @@ -38,6 +38,9 @@ class FlushedFileCollector : public EventListener { ~FlushedFileCollector() override {} void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override { + if (!info.status.ok()) { + return; + } std::lock_guard lock(mutex_); flushed_files_.push_back(info.file_path); } diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index f1f6661bb6b..3586b814f04 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -58,6 +58,9 @@ class FlushedFileCollector : public EventListener { ~FlushedFileCollector() override {} void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override { + if (!info.status.ok()) { + return; + } std::lock_guard lock(mutex_); flushed_files_.push_back(info.file_path); } @@ -103,7 +106,10 @@ class CompactionStatsCollector : public EventListener { compaction_completed_[k]++; } - void OnFlushCompleted(DB* /* db */, const FlushJobInfo& /* info */) override { + void OnFlushCompleted(DB* /* db */, const FlushJobInfo& info) override { + if (!info.status.ok()) { + return; + } int k = static_cast(CompactionReason::kFlush); compaction_completed_[k]++; } diff --git a/db/db_impl.h b/db/db_impl.h index 11750a02897..82bb398bf61 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -817,7 +817,8 @@ class DBImpl : public DB { void NotifyOnFlushCompleted(ColumnFamilyData* cfd, FileMetaData* file_meta, const MutableCFOptions& mutable_cf_options, - int job_id, TableProperties prop); + const Status& st, int job_id, + TableProperties prop); void NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c, const Status& st, diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index f208b873dd8..a5519c5e1e5 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -205,11 +205,12 @@ Status DBImpl::FlushMemTableToOutputFile( Status new_bg_error = s; error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); } - if (s.ok()) { #ifndef ROCKSDB_LITE - // may temporarily unlock and lock the mutex. - NotifyOnFlushCompleted(cfd, &file_meta, mutable_cf_options, - job_context->job_id, flush_job.GetTableProperties()); + // may temporarily unlock and lock the mutex. + NotifyOnFlushCompleted(cfd, &file_meta, mutable_cf_options, s, + job_context->job_id, flush_job.GetTableProperties()); + + if (s.ok()) { auto sfm = static_cast( immutable_db_options_.sst_file_manager.get()); if (sfm) { @@ -226,8 +227,8 @@ Status DBImpl::FlushMemTableToOutputFile( error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); } } -#endif // ROCKSDB_LITE } +#endif // ROCKSDB_LITE return s; } @@ -495,7 +496,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( continue; } NotifyOnFlushCompleted(cfds[i], &file_meta[i], all_mutable_cf_options[i], - job_context->job_id, jobs[i].GetTableProperties()); + s, job_context->job_id, + jobs[i].GetTableProperties()); if (sfm) { std::string file_path = MakeTableFileName( cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber()); @@ -561,6 +563,7 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, info.cf_name = cfd->GetName(); // TODO(yhchiang): make db_paths dynamic in case flush does not // go to L0 in the future. + info.status = Status::OK(); info.file_path = MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_meta->fd.GetNumber()); info.thread_id = env_->GetThreadID(); @@ -590,7 +593,8 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd, FileMetaData* file_meta, const MutableCFOptions& mutable_cf_options, - int job_id, TableProperties prop) { + const Status& st, int job_id, + TableProperties prop) { #ifndef ROCKSDB_LITE if (immutable_db_options_.listeners.size() == 0U) { return; @@ -613,6 +617,7 @@ void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd, info.cf_name = cfd->GetName(); // TODO(yhchiang): make db_paths dynamic in case flush does not // go to L0 in the future. + info.status = st; info.file_path = MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_meta->fd.GetNumber()); info.thread_id = env_->GetThreadID(); diff --git a/db/db_sst_test.cc b/db/db_sst_test.cc index dcd5847eb22..5fcdfb9c1fa 100644 --- a/db/db_sst_test.cc +++ b/db/db_sst_test.cc @@ -28,6 +28,9 @@ class FlushedFileCollector : public EventListener { ~FlushedFileCollector() override {} void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override { + if (!info.status.ok()) { + return; + } std::lock_guard lock(mutex_); flushed_files_.push_back(info.file_path); } diff --git a/db/listener_test.cc b/db/listener_test.cc index 60d02ed0ae9..ab7d0f8f375 100644 --- a/db/listener_test.cc +++ b/db/listener_test.cc @@ -197,6 +197,9 @@ class TestFlushListener : public EventListener { void OnFlushCompleted( DB* db, const FlushJobInfo& info) override { + if (!info.status.ok()) { + return; + } flushed_dbs_.push_back(db); flushed_column_family_names_.push_back(info.cf_name); if (info.triggered_writes_slowdown) { @@ -743,6 +746,9 @@ class MemTableSealedListener : public EventListener { void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& flush_job_info) override { + if (!flush_job_info.status.ok()) { + return; + } ASSERT_LE(flush_job_info.smallest_seqno, latest_seq_number_); } }; diff --git a/examples/compact_files_example.cc b/examples/compact_files_example.cc index c27df8ee79d..8f455feb3fc 100644 --- a/examples/compact_files_example.cc +++ b/examples/compact_files_example.cc @@ -72,6 +72,9 @@ class FullCompactor : public Compactor { // compaction-task to true. void OnFlushCompleted( DB* db, const FlushJobInfo& info) override { + if (!info.status.ok()) { + return; + } CompactionTask* task = PickCompaction(db, info.cf_name); if (task != nullptr) { if (info.triggered_writes_stop) { diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index d5ccf47e54f..2dcdde20196 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -190,6 +190,8 @@ struct FlushJobInfo { TableProperties table_properties; FlushReason flush_reason; + // the status indicating whether the flush was successful or not. + Status status; }; struct CompactionJobInfo { diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 7f8c4b53f7b..9e3612ae55f 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -1261,6 +1261,9 @@ class DbStressListener : public EventListener { } #ifndef ROCKSDB_LITE virtual void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override { + if (!info.status.ok()) { + return; + } assert(IsValidColumnFamilyName(info.cf_name)); VerifyFilePath(info.file_path); // pretending doing some work here diff --git a/utilities/blob_db/blob_db_listener.h b/utilities/blob_db/blob_db_listener.h index f096d238ba3..8b93ac04f66 100644 --- a/utilities/blob_db/blob_db_listener.h +++ b/utilities/blob_db/blob_db_listener.h @@ -26,7 +26,10 @@ class BlobDBListener : public EventListener { blob_db_impl_->SyncBlobFiles(); } - void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& /*info*/) override { + void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override { + if (!info.status.ok()) { + return; + } assert(blob_db_impl_ != nullptr); blob_db_impl_->UpdateLiveSSTSize(); }