From b0e78341008e7b5e61f2e84ccf6b3953bd0c108e Mon Sep 17 00:00:00 2001 From: Levi Tamasi Date: Mon, 14 Sep 2020 21:10:09 -0700 Subject: [PATCH] Integrate blob file writing with the flush logic (#7345) Summary: The patch adds support for writing blob files during flush by integrating `BlobFileBuilder` with the flush logic, most importantly, `BuildTable` and `CompactionIterator`. If `enable_blob_files` is set, large values are extracted to blob files and replaced with references. The resulting blob files are then logged to the MANIFEST as part of the flush job's `VersionEdit` and added to the `Version`, similarly to table files. Errors related to writing blob files fail the flush, and any blob files written by such jobs are immediately deleted (again, similarly to how SST files are handled). In addition, the patch extends the logging and statistics around flushes to account for the presence of blob files (e.g. `InternalStats::CompactionStats::bytes_written`, which is used for calculating write amplification, now considers the blob files as well). Pull Request resolved: https://github.com/facebook/rocksdb/pull/7345 Test Plan: Tested using `make check` and `db_bench`. Reviewed By: riversand963 Differential Revision: D23506369 Pulled By: ltamasi fbshipit-source-id: 646885f22dfbe063f650d38a1fedc132f499a159 --- db/blob/blob_file_builder.cc | 18 ++- db/blob/blob_file_builder.h | 3 + db/blob/blob_file_builder_test.cc | 149 ++++++++++++++------ db/builder.cc | 43 +++++- db/builder.h | 7 +- db/compaction/compaction_iterator.cc | 55 +++++--- db/compaction/compaction_iterator.h | 6 + db/compaction/compaction_iterator_test.cc | 3 +- db/compaction/compaction_job.cc | 6 +- db/db_flush_test.cc | 164 ++++++++++++++++++++++ db/db_impl/db_impl_compaction_flush.cc | 47 ++++++- db/db_impl/db_impl_open.cc | 10 +- db/flush_job.cc | 39 ++++- db/internal_stats.h | 2 + db/memtable_list.cc | 97 ++++++++++--- db/repair.cc | 21 +-- db/version_edit.cc | 4 +- db/version_edit.h | 16 +++ 18 files changed, 574 insertions(+), 116 deletions(-) diff --git a/db/blob/blob_file_builder.cc b/db/blob/blob_file_builder.cc index 4f84dd36011..57f05438c47 100644 --- a/db/blob/blob_file_builder.cc +++ b/db/blob/blob_file_builder.cc @@ -31,12 +31,13 @@ BlobFileBuilder::BlobFileBuilder( int job_id, uint32_t column_family_id, const std::string& column_family_name, Env::IOPriority io_priority, Env::WriteLifeTimeHint write_hint, + std::vector* blob_file_paths, std::vector* blob_file_additions) : BlobFileBuilder([versions]() { return versions->NewFileNumber(); }, env, fs, immutable_cf_options, mutable_cf_options, file_options, job_id, column_family_id, column_family_name, io_priority, write_hint, - blob_file_additions) {} + blob_file_paths, blob_file_additions) {} BlobFileBuilder::BlobFileBuilder( std::function file_number_generator, Env* env, FileSystem* fs, @@ -45,6 +46,7 @@ BlobFileBuilder::BlobFileBuilder( int job_id, uint32_t column_family_id, const std::string& column_family_name, Env::IOPriority io_priority, Env::WriteLifeTimeHint write_hint, + std::vector* blob_file_paths, std::vector* blob_file_additions) : file_number_generator_(std::move(file_number_generator)), env_(env), @@ -59,6 +61,7 @@ BlobFileBuilder::BlobFileBuilder( column_family_name_(column_family_name), io_priority_(io_priority), write_hint_(write_hint), + blob_file_paths_(blob_file_paths), blob_file_additions_(blob_file_additions), blob_count_(0), blob_bytes_(0) { @@ -67,7 +70,10 @@ BlobFileBuilder::BlobFileBuilder( assert(fs_); assert(immutable_cf_options_); assert(file_options_); + assert(blob_file_paths_); + assert(blob_file_paths_->empty()); assert(blob_file_additions_); + assert(blob_file_additions_->empty()); } BlobFileBuilder::~BlobFileBuilder() = default; @@ -145,7 +151,7 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() { assert(immutable_cf_options_); assert(!immutable_cf_options_->cf_paths.empty()); - const std::string blob_file_path = BlobFileName( + std::string blob_file_path = BlobFileName( immutable_cf_options_->cf_paths.front().path, blob_file_number); std::unique_ptr file; @@ -161,6 +167,12 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() { } } + // Note: files get added to blob_file_paths_ right after the open, so they + // can be cleaned up upon failure. Contrast this with blob_file_additions_, + // which only contains successfully written files. + assert(blob_file_paths_); + blob_file_paths_->emplace_back(std::move(blob_file_path)); + assert(file); file->SetIOPriority(io_priority_); file->SetWriteLifeTimeHint(write_hint_); @@ -168,7 +180,7 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() { Statistics* const statistics = immutable_cf_options_->statistics; std::unique_ptr file_writer(new WritableFileWriter( - std::move(file), blob_file_path, *file_options_, env_, + std::move(file), blob_file_paths_->back(), *file_options_, env_, nullptr /*IOTracer*/, statistics, immutable_cf_options_->listeners, immutable_cf_options_->file_checksum_gen_factory)); diff --git a/db/blob/blob_file_builder.h b/db/blob/blob_file_builder.h index 86018ec01cb..755ab435094 100644 --- a/db/blob/blob_file_builder.h +++ b/db/blob/blob_file_builder.h @@ -36,6 +36,7 @@ class BlobFileBuilder { const std::string& column_family_name, Env::IOPriority io_priority, Env::WriteLifeTimeHint write_hint, + std::vector* blob_file_paths, std::vector* blob_file_additions); BlobFileBuilder(std::function file_number_generator, Env* env, @@ -47,6 +48,7 @@ class BlobFileBuilder { const std::string& column_family_name, Env::IOPriority io_priority, Env::WriteLifeTimeHint write_hint, + std::vector* blob_file_paths, std::vector* blob_file_additions); BlobFileBuilder(const BlobFileBuilder&) = delete; @@ -79,6 +81,7 @@ class BlobFileBuilder { std::string column_family_name_; Env::IOPriority io_priority_; Env::WriteLifeTimeHint write_hint_; + std::vector* blob_file_paths_; std::vector* blob_file_additions_; std::unique_ptr writer_; uint64_t blob_count_; diff --git a/db/blob/blob_file_builder_test.cc b/db/blob/blob_file_builder_test.cc index e09cc2340e6..dbcdcb9b094 100644 --- a/db/blob/blob_file_builder_test.cc +++ b/db/blob/blob_file_builder_test.cc @@ -42,17 +42,15 @@ class BlobFileBuilderTest : public testing::Test { protected: BlobFileBuilderTest() : mock_env_(Env::Default()), fs_(&mock_env_) {} - void VerifyBlobFile(const ImmutableCFOptions& immutable_cf_options, - uint64_t blob_file_number, uint32_t column_family_id, + void VerifyBlobFile(uint64_t blob_file_number, + const std::string& blob_file_path, + uint32_t column_family_id, CompressionType blob_compression_type, const std::vector>& expected_key_value_pairs, const std::vector& blob_indexes) { assert(expected_key_value_pairs.size() == blob_indexes.size()); - const std::string blob_file_path = BlobFileName( - immutable_cf_options.cf_paths.front().path, blob_file_number); - std::unique_ptr file; constexpr IODebugContext* dbg = nullptr; ASSERT_OK( @@ -137,12 +135,14 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckOneFile) { constexpr Env::IOPriority io_priority = Env::IO_HIGH; constexpr Env::WriteLifeTimeHint write_hint = Env::WLTH_MEDIUM; + std::vector blob_file_paths; std::vector blob_file_additions; - BlobFileBuilder builder( - TestFileNumberGenerator(), &mock_env_, &fs_, &immutable_cf_options, - &mutable_cf_options, &file_options_, job_id, column_family_id, - column_family_name, io_priority, write_hint, &blob_file_additions); + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_, + &immutable_cf_options, &mutable_cf_options, + &file_options_, job_id, column_family_id, + column_family_name, io_priority, write_hint, + &blob_file_paths, &blob_file_additions); std::vector> expected_key_value_pairs( number_of_blobs); @@ -168,12 +168,20 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckOneFile) { ASSERT_OK(builder.Finish()); // Check the metadata generated + constexpr uint64_t blob_file_number = 2; + + ASSERT_EQ(blob_file_paths.size(), 1); + + const std::string& blob_file_path = blob_file_paths[0]; + + ASSERT_EQ(blob_file_path, + BlobFileName(immutable_cf_options.cf_paths.front().path, + blob_file_number)); + ASSERT_EQ(blob_file_additions.size(), 1); const auto& blob_file_addition = blob_file_additions[0]; - constexpr uint64_t blob_file_number = 2; - ASSERT_EQ(blob_file_addition.GetBlobFileNumber(), blob_file_number); ASSERT_EQ(blob_file_addition.GetTotalBlobCount(), number_of_blobs); ASSERT_EQ( @@ -181,7 +189,7 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckOneFile) { number_of_blobs * (BlobLogRecord::kHeaderSize + key_size + value_size)); // Verify the contents of the new blob file as well as the blob references - VerifyBlobFile(immutable_cf_options, blob_file_number, column_family_id, + VerifyBlobFile(blob_file_number, blob_file_path, column_family_id, kNoCompression, expected_key_value_pairs, blob_indexes); } @@ -210,12 +218,14 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckMultipleFiles) { constexpr Env::IOPriority io_priority = Env::IO_HIGH; constexpr Env::WriteLifeTimeHint write_hint = Env::WLTH_MEDIUM; + std::vector blob_file_paths; std::vector blob_file_additions; - BlobFileBuilder builder( - TestFileNumberGenerator(), &mock_env_, &fs_, &immutable_cf_options, - &mutable_cf_options, &file_options_, job_id, column_family_id, - column_family_name, io_priority, write_hint, &blob_file_additions); + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_, + &immutable_cf_options, &mutable_cf_options, + &file_options_, job_id, column_family_id, + column_family_name, io_priority, write_hint, + &blob_file_paths, &blob_file_additions); std::vector> expected_key_value_pairs( number_of_blobs); @@ -241,12 +251,19 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckMultipleFiles) { ASSERT_OK(builder.Finish()); // Check the metadata generated + ASSERT_EQ(blob_file_paths.size(), number_of_blobs); ASSERT_EQ(blob_file_additions.size(), number_of_blobs); for (size_t i = 0; i < number_of_blobs; ++i) { + const uint64_t blob_file_number = i + 2; + + ASSERT_EQ(blob_file_paths[i], + BlobFileName(immutable_cf_options.cf_paths.front().path, + blob_file_number)); + const auto& blob_file_addition = blob_file_additions[i]; - ASSERT_EQ(blob_file_addition.GetBlobFileNumber(), i + 2); + ASSERT_EQ(blob_file_addition.GetBlobFileNumber(), blob_file_number); ASSERT_EQ(blob_file_addition.GetTotalBlobCount(), 1); ASSERT_EQ(blob_file_addition.GetTotalBlobBytes(), BlobLogRecord::kHeaderSize + key_size + value_size); @@ -258,8 +275,8 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckMultipleFiles) { expected_key_value_pairs[i]}; std::vector blob_index{blob_indexes[i]}; - VerifyBlobFile(immutable_cf_options, i + 2, column_family_id, - kNoCompression, expected_key_value_pair, blob_index); + VerifyBlobFile(i + 2, blob_file_paths[i], column_family_id, kNoCompression, + expected_key_value_pair, blob_index); } } @@ -286,12 +303,14 @@ TEST_F(BlobFileBuilderTest, InlinedValues) { constexpr Env::IOPriority io_priority = Env::IO_HIGH; constexpr Env::WriteLifeTimeHint write_hint = Env::WLTH_MEDIUM; + std::vector blob_file_paths; std::vector blob_file_additions; - BlobFileBuilder builder( - TestFileNumberGenerator(), &mock_env_, &fs_, &immutable_cf_options, - &mutable_cf_options, &file_options_, job_id, column_family_id, - column_family_name, io_priority, write_hint, &blob_file_additions); + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_, + &immutable_cf_options, &mutable_cf_options, + &file_options_, job_id, column_family_id, + column_family_name, io_priority, write_hint, + &blob_file_paths, &blob_file_additions); for (size_t i = 0; i < number_of_blobs; ++i) { const std::string key = std::to_string(i); @@ -308,6 +327,7 @@ TEST_F(BlobFileBuilderTest, InlinedValues) { ASSERT_OK(builder.Finish()); // Check the metadata generated + ASSERT_TRUE(blob_file_paths.empty()); ASSERT_TRUE(blob_file_additions.empty()); } @@ -335,12 +355,14 @@ TEST_F(BlobFileBuilderTest, Compression) { constexpr Env::IOPriority io_priority = Env::IO_HIGH; constexpr Env::WriteLifeTimeHint write_hint = Env::WLTH_MEDIUM; + std::vector blob_file_paths; std::vector blob_file_additions; - BlobFileBuilder builder( - TestFileNumberGenerator(), &mock_env_, &fs_, &immutable_cf_options, - &mutable_cf_options, &file_options_, job_id, column_family_id, - column_family_name, io_priority, write_hint, &blob_file_additions); + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_, + &immutable_cf_options, &mutable_cf_options, + &file_options_, job_id, column_family_id, + column_family_name, io_priority, write_hint, + &blob_file_paths, &blob_file_additions); const std::string key("1"); const std::string uncompressed_value(value_size, 'x'); @@ -353,12 +375,20 @@ TEST_F(BlobFileBuilderTest, Compression) { ASSERT_OK(builder.Finish()); // Check the metadata generated + constexpr uint64_t blob_file_number = 2; + + ASSERT_EQ(blob_file_paths.size(), 1); + + const std::string& blob_file_path = blob_file_paths[0]; + + ASSERT_EQ(blob_file_path, + BlobFileName(immutable_cf_options.cf_paths.front().path, + blob_file_number)); + ASSERT_EQ(blob_file_additions.size(), 1); const auto& blob_file_addition = blob_file_additions[0]; - constexpr uint64_t blob_file_number = 2; - ASSERT_EQ(blob_file_addition.GetBlobFileNumber(), blob_file_number); ASSERT_EQ(blob_file_addition.GetTotalBlobCount(), 1); @@ -381,7 +411,7 @@ TEST_F(BlobFileBuilderTest, Compression) { {key, compressed_value}}; std::vector blob_indexes{blob_index}; - VerifyBlobFile(immutable_cf_options, blob_file_number, column_family_id, + VerifyBlobFile(blob_file_number, blob_file_path, column_family_id, kSnappyCompression, expected_key_value_pairs, blob_indexes); } @@ -407,12 +437,14 @@ TEST_F(BlobFileBuilderTest, CompressionError) { constexpr Env::IOPriority io_priority = Env::IO_HIGH; constexpr Env::WriteLifeTimeHint write_hint = Env::WLTH_MEDIUM; + std::vector blob_file_paths; std::vector blob_file_additions; - BlobFileBuilder builder( - TestFileNumberGenerator(), &mock_env_, &fs_, &immutable_cf_options, - &mutable_cf_options, &file_options_, job_id, column_family_id, - column_family_name, io_priority, write_hint, &blob_file_additions); + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_, + &immutable_cf_options, &mutable_cf_options, + &file_options_, job_id, column_family_id, + column_family_name, io_priority, write_hint, + &blob_file_paths, &blob_file_additions); SyncPoint::GetInstance()->SetCallBack("CompressData:TamperWithReturnValue", [](void* arg) { @@ -430,6 +462,15 @@ TEST_F(BlobFileBuilderTest, CompressionError) { SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); + + constexpr uint64_t blob_file_number = 2; + + ASSERT_EQ(blob_file_paths.size(), 1); + ASSERT_EQ(blob_file_paths[0], + BlobFileName(immutable_cf_options.cf_paths.front().path, + blob_file_number)); + + ASSERT_TRUE(blob_file_additions.empty()); } TEST_F(BlobFileBuilderTest, Checksum) { @@ -473,12 +514,14 @@ TEST_F(BlobFileBuilderTest, Checksum) { constexpr Env::IOPriority io_priority = Env::IO_HIGH; constexpr Env::WriteLifeTimeHint write_hint = Env::WLTH_MEDIUM; + std::vector blob_file_paths; std::vector blob_file_additions; - BlobFileBuilder builder( - TestFileNumberGenerator(), &mock_env_, &fs_, &immutable_cf_options, - &mutable_cf_options, &file_options_, job_id, column_family_id, - column_family_name, io_priority, write_hint, &blob_file_additions); + BlobFileBuilder builder(TestFileNumberGenerator(), &mock_env_, &fs_, + &immutable_cf_options, &mutable_cf_options, + &file_options_, job_id, column_family_id, + column_family_name, io_priority, write_hint, + &blob_file_paths, &blob_file_additions); const std::string key("1"); const std::string value("deadbeef"); @@ -491,12 +534,20 @@ TEST_F(BlobFileBuilderTest, Checksum) { ASSERT_OK(builder.Finish()); // Check the metadata generated + constexpr uint64_t blob_file_number = 2; + + ASSERT_EQ(blob_file_paths.size(), 1); + + const std::string& blob_file_path = blob_file_paths[0]; + + ASSERT_EQ(blob_file_path, + BlobFileName(immutable_cf_options.cf_paths.front().path, + blob_file_number)); + ASSERT_EQ(blob_file_additions.size(), 1); const auto& blob_file_addition = blob_file_additions[0]; - constexpr uint64_t blob_file_number = 2; - ASSERT_EQ(blob_file_addition.GetBlobFileNumber(), blob_file_number); ASSERT_EQ(blob_file_addition.GetTotalBlobCount(), 1); ASSERT_EQ(blob_file_addition.GetTotalBlobBytes(), @@ -509,7 +560,7 @@ TEST_F(BlobFileBuilderTest, Checksum) { {key, value}}; std::vector blob_indexes{blob_index}; - VerifyBlobFile(immutable_cf_options, blob_file_number, column_family_id, + VerifyBlobFile(blob_file_number, blob_file_path, column_family_id, kNoCompression, expected_key_value_pairs, blob_indexes); } @@ -561,13 +612,14 @@ TEST_P(BlobFileBuilderIOErrorTest, IOError) { constexpr Env::IOPriority io_priority = Env::IO_HIGH; constexpr Env::WriteLifeTimeHint write_hint = Env::WLTH_MEDIUM; + std::vector blob_file_paths; std::vector blob_file_additions; BlobFileBuilder builder(TestFileNumberGenerator(), &fault_injection_env_, &fs_, &immutable_cf_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, write_hint, - &blob_file_additions); + &blob_file_paths, &blob_file_additions); SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) { fault_injection_env_.SetFilesystemActive(false, @@ -584,6 +636,19 @@ TEST_P(BlobFileBuilderIOErrorTest, IOError) { SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); + + if (sync_point_ == "BlobFileBuilder::OpenBlobFileIfNeeded:NewWritableFile") { + ASSERT_TRUE(blob_file_paths.empty()); + } else { + constexpr uint64_t blob_file_number = 2; + + ASSERT_EQ(blob_file_paths.size(), 1); + ASSERT_EQ(blob_file_paths[0], + BlobFileName(immutable_cf_options.cf_paths.front().path, + blob_file_number)); + } + + ASSERT_TRUE(blob_file_additions.empty()); } } // namespace ROCKSDB_NAMESPACE diff --git a/db/builder.cc b/db/builder.cc index 9dfdf421778..a8bfff26471 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -13,6 +13,7 @@ #include #include +#include "db/blob/blob_file_builder.h" #include "db/compaction/compaction_iterator.h" #include "db/dbformat.h" #include "db/event_helpers.h" @@ -67,13 +68,14 @@ TableBuilder* NewTableBuilder( } Status BuildTable( - const std::string& dbname, Env* env, FileSystem* fs, + const std::string& dbname, VersionSet* versions, Env* env, FileSystem* fs, const ImmutableCFOptions& ioptions, const MutableCFOptions& mutable_cf_options, const FileOptions& file_options, TableCache* table_cache, InternalIterator* iter, std::vector> range_del_iters, - FileMetaData* meta, const InternalKeyComparator& internal_comparator, + FileMetaData* meta, std::vector* blob_file_additions, + const InternalKeyComparator& internal_comparator, const std::vector>* int_tbl_prop_collector_factories, uint32_t column_family_id, const std::string& column_family_name, @@ -107,6 +109,7 @@ Status BuildTable( std::string fname = TableFileName(ioptions.cf_paths, meta->fd.GetNumber(), meta->fd.GetPathId()); + std::vector blob_file_paths; std::string file_checksum = kUnknownFileChecksum; std::string file_checksum_func_name = kUnknownFileChecksumFuncName; #ifndef ROCKSDB_LITE @@ -163,11 +166,22 @@ Status BuildTable( snapshots.empty() ? 0 : snapshots.back(), snapshot_checker); + std::unique_ptr blob_file_builder( + (mutable_cf_options.enable_blob_files && blob_file_additions) + ? new BlobFileBuilder(versions, env, fs, &ioptions, + &mutable_cf_options, &file_options, job_id, + column_family_id, column_family_name, + io_priority, write_hint, &blob_file_paths, + blob_file_additions) + : nullptr); + CompactionIterator c_iter( iter, internal_comparator.user_comparator(), &merge, kMaxSequenceNumber, &snapshots, earliest_write_conflict_snapshot, snapshot_checker, env, ShouldReportDetailedTime(env, ioptions.statistics), - true /* internal key corruption is not ok */, range_del_agg.get()); + true /* internal key corruption is not ok */, range_del_agg.get(), + blob_file_builder.get()); + c_iter.SeekToFirst(); for (; c_iter.Valid(); c_iter.Next()) { const Slice& key = c_iter.key(); @@ -200,9 +214,16 @@ Status BuildTable( } // Finish and check for builder errors - bool empty = builder->IsEmpty(); s = c_iter.status(); + + if (blob_file_builder) { + if (s.ok()) { + s = blob_file_builder->Finish(); + } + } + TEST_SYNC_POINT("BuildTable:BeforeFinishBuildTable"); + const bool empty = builder->IsEmpty(); if (!s.ok() || empty) { builder->Abandon(); } else { @@ -290,7 +311,19 @@ Status BuildTable( } if (!s.ok() || meta->fd.GetFileSize() == 0) { - fs->DeleteFile(fname, IOOptions(), nullptr); + constexpr IODebugContext* dbg = nullptr; + + fs->DeleteFile(fname, IOOptions(), dbg); + + assert(blob_file_additions || blob_file_paths.empty()); + + if (blob_file_additions) { + for (const std::string& blob_file_path : blob_file_paths) { + fs->DeleteFile(blob_file_path, IOOptions(), dbg); + } + + blob_file_additions->clear(); + } } if (meta->fd.GetFileSize() == 0) { diff --git a/db/builder.h b/db/builder.h index bcabb79357f..8c80c637955 100644 --- a/db/builder.h +++ b/db/builder.h @@ -27,8 +27,10 @@ namespace ROCKSDB_NAMESPACE { struct Options; struct FileMetaData; +class VersionSet; class Env; struct EnvOptions; +class BlobFileAddition; class Iterator; class SnapshotChecker; class TableCache; @@ -63,13 +65,14 @@ TableBuilder* NewTableBuilder( // @param column_family_name Name of the column family that is also identified // by column_family_id, or empty string if unknown. extern Status BuildTable( - const std::string& dbname, Env* env, FileSystem* fs, + const std::string& dbname, VersionSet* versions, Env* env, FileSystem* fs, const ImmutableCFOptions& options, const MutableCFOptions& mutable_cf_options, const FileOptions& file_options, TableCache* table_cache, InternalIterator* iter, std::vector> range_del_iters, - FileMetaData* meta, const InternalKeyComparator& internal_comparator, + FileMetaData* meta, std::vector* blob_file_additions, + const InternalKeyComparator& internal_comparator, const std::vector>* int_tbl_prop_collector_factories, uint32_t column_family_id, const std::string& column_family_name, diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 9076a256afd..7f249632ff7 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -3,9 +3,11 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). +#include "db/compaction/compaction_iterator.h" + #include -#include "db/compaction/compaction_iterator.h" +#include "db/blob/blob_file_builder.h" #include "db/snapshot_checker.h" #include "port/likely.h" #include "rocksdb/listener.h" @@ -36,7 +38,8 @@ CompactionIterator::CompactionIterator( SequenceNumber earliest_write_conflict_snapshot, const SnapshotChecker* snapshot_checker, Env* env, bool report_detailed_time, bool expect_valid_internal_key, - CompactionRangeDelAggregator* range_del_agg, const Compaction* compaction, + CompactionRangeDelAggregator* range_del_agg, + BlobFileBuilder* blob_file_builder, const Compaction* compaction, const CompactionFilter* compaction_filter, const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, @@ -46,6 +49,7 @@ CompactionIterator::CompactionIterator( input, cmp, merge_helper, last_sequence, snapshots, earliest_write_conflict_snapshot, snapshot_checker, env, report_detailed_time, expect_valid_internal_key, range_del_agg, + blob_file_builder, std::unique_ptr( compaction ? new CompactionProxy(compaction) : nullptr), compaction_filter, shutting_down, preserve_deletes_seqnum, @@ -58,6 +62,7 @@ CompactionIterator::CompactionIterator( const SnapshotChecker* snapshot_checker, Env* env, bool report_detailed_time, bool expect_valid_internal_key, CompactionRangeDelAggregator* range_del_agg, + BlobFileBuilder* blob_file_builder, std::unique_ptr compaction, const CompactionFilter* compaction_filter, const std::atomic* shutting_down, @@ -74,6 +79,7 @@ CompactionIterator::CompactionIterator( report_detailed_time_(report_detailed_time), expect_valid_internal_key_(expect_valid_internal_key), range_del_agg_(range_del_agg), + blob_file_builder_(blob_file_builder), compaction_(std::move(compaction)), compaction_filter_(compaction_filter), shutting_down_(shutting_down), @@ -661,20 +667,37 @@ void CompactionIterator::NextFromInput() { void CompactionIterator::PrepareOutput() { if (valid_) { - if (compaction_filter_ && ikey_.type == kTypeBlobIndex) { - const auto blob_decision = compaction_filter_->PrepareBlobOutput( - user_key(), value_, &compaction_filter_value_); - - if (blob_decision == CompactionFilter::BlobDecision::kCorruption) { - status_ = Status::Corruption( - "Corrupted blob reference encountered during GC"); - valid_ = false; - } else if (blob_decision == CompactionFilter::BlobDecision::kIOError) { - status_ = Status::IOError("Could not relocate blob during GC"); - valid_ = false; - } else if (blob_decision == - CompactionFilter::BlobDecision::kChangeValue) { - value_ = compaction_filter_value_; + if (ikey_.type == kTypeValue) { + if (blob_file_builder_) { + blob_index_.clear(); + const Status s = + blob_file_builder_->Add(user_key(), value_, &blob_index_); + + if (!s.ok()) { + status_ = s; + valid_ = false; + } else if (!blob_index_.empty()) { + value_ = blob_index_; + ikey_.type = kTypeBlobIndex; + current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); + } + } + } else if (ikey_.type == kTypeBlobIndex) { + if (compaction_filter_) { + const auto blob_decision = compaction_filter_->PrepareBlobOutput( + user_key(), value_, &compaction_filter_value_); + + if (blob_decision == CompactionFilter::BlobDecision::kCorruption) { + status_ = Status::Corruption( + "Corrupted blob reference encountered during GC"); + valid_ = false; + } else if (blob_decision == CompactionFilter::BlobDecision::kIOError) { + status_ = Status::IOError("Could not relocate blob during GC"); + valid_ = false; + } else if (blob_decision == + CompactionFilter::BlobDecision::kChangeValue) { + value_ = compaction_filter_value_; + } } } diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index dc79841800f..e5b1cc8b1b6 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -21,6 +21,8 @@ namespace ROCKSDB_NAMESPACE { +class BlobFileBuilder; + class CompactionIterator { public: // A wrapper around Compaction. Has a much smaller interface, only what @@ -66,6 +68,7 @@ class CompactionIterator { const SnapshotChecker* snapshot_checker, Env* env, bool report_detailed_time, bool expect_valid_internal_key, CompactionRangeDelAggregator* range_del_agg, + BlobFileBuilder* blob_file_builder, const Compaction* compaction = nullptr, const CompactionFilter* compaction_filter = nullptr, const std::atomic* shutting_down = nullptr, @@ -81,6 +84,7 @@ class CompactionIterator { const SnapshotChecker* snapshot_checker, Env* env, bool report_detailed_time, bool expect_valid_internal_key, CompactionRangeDelAggregator* range_del_agg, + BlobFileBuilder* blob_file_builder, std::unique_ptr compaction, const CompactionFilter* compaction_filter = nullptr, const std::atomic* shutting_down = nullptr, @@ -163,6 +167,7 @@ class CompactionIterator { bool report_detailed_time_; bool expect_valid_internal_key_; CompactionRangeDelAggregator* range_del_agg_; + BlobFileBuilder* blob_file_builder_; std::unique_ptr compaction_; const CompactionFilter* compaction_filter_; const std::atomic* shutting_down_; @@ -211,6 +216,7 @@ class CompactionIterator { // PinnedIteratorsManager used to pin input_ Iterator blocks while reading // merge operands and then releasing them after consuming them. PinnedIteratorsManager pinned_iters_mgr_; + std::string blob_index_; std::string compaction_filter_value_; InternalKey compaction_filter_skip_until_; // "level_ptrs" holds indices that remember which file of an associated diff --git a/db/compaction/compaction_iterator_test.cc b/db/compaction/compaction_iterator_test.cc index 76a207e0001..c134013a6c7 100644 --- a/db/compaction/compaction_iterator_test.cc +++ b/db/compaction/compaction_iterator_test.cc @@ -258,7 +258,8 @@ class CompactionIteratorTest : public testing::TestWithParam { iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_, earliest_write_conflict_snapshot, snapshot_checker_.get(), Env::Default(), false /* report_detailed_time */, false, - range_del_agg_.get(), std::move(compaction), filter, &shutting_down_)); + range_del_agg_.get(), nullptr /* blob_file_builder */, + std::move(compaction), filter, &shutting_down_)); } void AddSnapshot(SequenceNumber snapshot, diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 603a0c98197..672d40b96ba 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -914,9 +914,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { &existing_snapshots_, earliest_write_conflict_snapshot_, snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), /*expect_valid_internal_key=*/true, &range_del_agg, - sub_compact->compaction, compaction_filter, shutting_down_, - preserve_deletes_seqnum_, manual_compaction_paused_, - db_options_.info_log)); + /* blob_file_builder */ nullptr, sub_compact->compaction, + compaction_filter, shutting_down_, preserve_deletes_seqnum_, + manual_compaction_paused_, db_options_.info_log)); auto c_iter = sub_compact->c_iter.get(); c_iter->SeekToFirst(); if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) { diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 81c0a478df9..529acbc51dc 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -11,6 +11,7 @@ #include "db/db_impl/db_impl.h" #include "db/db_test_util.h" +#include "file/filename.h" #include "port/port.h" #include "port/stack_trace.h" #include "test_util/sync_point.h" @@ -443,6 +444,169 @@ TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) { } #endif // !ROCKSDB_LITE +TEST_F(DBFlushTest, FlushWithBlob) { + constexpr uint64_t min_blob_size = 10; + + Options options; + options.enable_blob_files = true; + options.min_blob_size = min_blob_size; + options.disable_auto_compactions = true; + + Reopen(options); + + constexpr char short_value[] = "short"; + static_assert(sizeof(short_value) - 1 < min_blob_size, + "short_value too long"); + + constexpr char long_value[] = "long_value"; + static_assert(sizeof(long_value) - 1 >= min_blob_size, + "long_value too short"); + + ASSERT_OK(Put("key1", short_value)); + ASSERT_OK(Put("key2", long_value)); + + ASSERT_OK(Flush()); + + ASSERT_EQ(Get("key1"), short_value); + + // TODO: enable once Get support is implemented for blobs + // ASSERT_EQ(Get("key2"), long_value); + + VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + assert(versions); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + assert(cfd); + + Version* const current = cfd->current(); + assert(current); + + const VersionStorageInfo* const storage_info = current->storage_info(); + assert(storage_info); + + const auto& l0_files = storage_info->LevelFiles(0); + ASSERT_EQ(l0_files.size(), 1); + + const FileMetaData* const table_file = l0_files[0]; + assert(table_file); + + const auto& blob_files = storage_info->GetBlobFiles(); + ASSERT_EQ(blob_files.size(), 1); + + const auto& blob_file = blob_files.begin()->second; + assert(blob_file); + + ASSERT_EQ(table_file->smallest.user_key(), "key1"); + ASSERT_EQ(table_file->largest.user_key(), "key2"); + ASSERT_EQ(table_file->fd.smallest_seqno, 1); + ASSERT_EQ(table_file->fd.largest_seqno, 2); + ASSERT_EQ(table_file->oldest_blob_file_number, + blob_file->GetBlobFileNumber()); + + ASSERT_EQ(blob_file->GetTotalBlobCount(), 1); + +#ifndef ROCKSDB_LITE + const InternalStats* const internal_stats = cfd->internal_stats(); + assert(internal_stats); + + const uint64_t expected_bytes = + table_file->fd.GetFileSize() + blob_file->GetTotalBlobBytes(); + + const auto& compaction_stats = internal_stats->TEST_GetCompactionStats(); + ASSERT_FALSE(compaction_stats.empty()); + ASSERT_EQ(compaction_stats[0].bytes_written, expected_bytes); + ASSERT_EQ(compaction_stats[0].num_output_files, 2); + + const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue(); + ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], expected_bytes); +#endif // ROCKSDB_LITE +} + +class DBFlushTestBlobError : public DBFlushTest, + public testing::WithParamInterface { + public: + DBFlushTestBlobError() : fault_injection_env_(env_) {} + ~DBFlushTestBlobError() { Close(); } + + FaultInjectionTestEnv fault_injection_env_; +}; + +INSTANTIATE_TEST_CASE_P(DBFlushTestBlobError, DBFlushTestBlobError, + ::testing::ValuesIn(std::vector{ + "BlobFileBuilder::WriteBlobToFile:AddRecord", + "BlobFileBuilder::WriteBlobToFile:AppendFooter"})); + +TEST_P(DBFlushTestBlobError, FlushError) { + Options options; + options.enable_blob_files = true; + options.disable_auto_compactions = true; + options.env = &fault_injection_env_; + + Reopen(options); + + ASSERT_OK(Put("key", "blob")); + + SyncPoint::GetInstance()->SetCallBack(GetParam(), [this](void* /* arg */) { + fault_injection_env_.SetFilesystemActive(false, Status::IOError()); + }); + SyncPoint::GetInstance()->SetCallBack( + "BuildTable:BeforeFinishBuildTable", [this](void* /* arg */) { + fault_injection_env_.SetFilesystemActive(true); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_NOK(Flush()); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + assert(versions); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + assert(cfd); + + Version* const current = cfd->current(); + assert(current); + + const VersionStorageInfo* const storage_info = current->storage_info(); + assert(storage_info); + + const auto& l0_files = storage_info->LevelFiles(0); + ASSERT_TRUE(l0_files.empty()); + + const auto& blob_files = storage_info->GetBlobFiles(); + ASSERT_TRUE(blob_files.empty()); + + // Make sure the files generated by the failed job have been deleted + std::vector files; + ASSERT_OK(env_->GetChildren(dbname_, &files)); + for (const auto& file : files) { + uint64_t number = 0; + FileType type = kTableFile; + + if (!ParseFileName(file, &number, &type)) { + continue; + } + + ASSERT_NE(type, kTableFile); + ASSERT_NE(type, kBlobFile); + } + +#ifndef ROCKSDB_LITE + const InternalStats* const internal_stats = cfd->internal_stats(); + assert(internal_stats); + + const auto& compaction_stats = internal_stats->TEST_GetCompactionStats(); + ASSERT_FALSE(compaction_stats.empty()); + ASSERT_EQ(compaction_stats[0].bytes_written, 0); + ASSERT_EQ(compaction_stats[0].num_output_files, 0); + + const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue(); + ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], 0); +#endif // ROCKSDB_LITE +} + TEST_P(DBAtomicFlushTest, ManualAtomicFlush) { Options options = CurrentOptions(); options.create_if_missing = true; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index d5dea4323a1..fe034a49364 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -142,6 +142,7 @@ Status DBImpl::FlushMemTableToOutputFile( SnapshotChecker* snapshot_checker, LogBuffer* log_buffer, Env::Priority thread_pri) { mutex_.AssertHeld(); + assert(cfd); assert(cfd->imm()->NumNotFlushed() != 0); assert(cfd->imm()->IsFlushPending()); @@ -203,10 +204,28 @@ Status DBImpl::FlushMemTableToOutputFile( if (made_progress) { *made_progress = true; } + + const std::string& column_family_name = cfd->GetName(); + + Version* const current = cfd->current(); + assert(current); + + const VersionStorageInfo* const storage_info = current->storage_info(); + assert(storage_info); + VersionStorageInfo::LevelSummaryStorage tmp; ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n", - cfd->GetName().c_str(), - cfd->current()->storage_info()->LevelSummary(&tmp)); + column_family_name.c_str(), + storage_info->LevelSummary(&tmp)); + + const auto& blob_files = storage_info->GetBlobFiles(); + if (!blob_files.empty()) { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64 + "\n", + column_family_name.c_str(), blob_files.begin()->first, + blob_files.rbegin()->first); + } } if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) { @@ -544,16 +563,36 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( assert(num_cfs == static_cast(job_context->superversion_contexts.size())); for (int i = 0; i != num_cfs; ++i) { + assert(cfds[i]); + if (cfds[i]->IsDropped()) { continue; } InstallSuperVersionAndScheduleWork(cfds[i], &job_context->superversion_contexts[i], all_mutable_cf_options[i]); + + const std::string& column_family_name = cfds[i]->GetName(); + + Version* const current = cfds[i]->current(); + assert(current); + + const VersionStorageInfo* const storage_info = current->storage_info(); + assert(storage_info); + VersionStorageInfo::LevelSummaryStorage tmp; ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n", - cfds[i]->GetName().c_str(), - cfds[i]->current()->storage_info()->LevelSummary(&tmp)); + column_family_name.c_str(), + storage_info->LevelSummary(&tmp)); + + const auto& blob_files = storage_info->GetBlobFiles(); + if (!blob_files.empty()) { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Blob file summary: head=%" PRIu64 + ", tail=%" PRIu64 "\n", + column_family_name.c_str(), blob_files.begin()->first, + blob_files.rbegin()->first); + } } if (made_progress) { *made_progress = true; diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 72b6a8ef9fb..ea446037d3c 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1272,7 +1272,9 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, MemTable* mem, VersionEdit* edit) { mutex_.AssertHeld(); const uint64_t start_micros = env_->NowMicros(); + FileMetaData meta; + std::unique_ptr::iterator> pending_outputs_inserted_elem( new std::list::iterator( CaptureCurrentFileNumberInPendingOutputs())); @@ -1319,11 +1321,13 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, if (range_del_iter != nullptr) { range_del_iters.emplace_back(range_del_iter); } + IOStatus io_s; s = BuildTable( - dbname_, env_, fs_.get(), *cfd->ioptions(), mutable_cf_options, - file_options_for_compaction_, cfd->table_cache(), iter.get(), - std::move(range_del_iters), &meta, cfd->internal_comparator(), + dbname_, versions_.get(), env_, fs_.get(), *cfd->ioptions(), + mutable_cf_options, file_options_for_compaction_, cfd->table_cache(), + iter.get(), std::move(range_del_iters), &meta, + nullptr /* blob_file_additions */, cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), diff --git a/db/flush_job.cc b/db/flush_job.cc index 088c06723e4..6b0b3497eec 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -266,6 +266,13 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, stream << vstorage->NumLevelFiles(level); } stream.EndArray(); + + const auto& blob_files = vstorage->GetBlobFiles(); + if (!blob_files.empty()) { + stream << "blob_file_head" << blob_files.begin()->first; + stream << "blob_file_tail" << blob_files.rbegin()->first; + } + stream << "immutable_memtables" << cfd_->imm()->NumNotFlushed(); if (measure_io_stats_) { @@ -300,6 +307,9 @@ Status FlushJob::WriteLevel0Table() { const uint64_t start_micros = db_options_.env->NowMicros(); const uint64_t start_cpu_micros = db_options_.env->NowCPUNanos() / 1000; Status s; + + std::vector blob_file_additions; + { auto write_hint = cfd_->CalculateSSTWriteHint(0); db_mutex_->Unlock(); @@ -388,9 +398,10 @@ Status FlushJob::WriteLevel0Table() { IOStatus io_s; s = BuildTable( - dbname_, db_options_.env, db_options_.fs.get(), *cfd_->ioptions(), - mutable_cf_options_, file_options_, cfd_->table_cache(), iter.get(), - std::move(range_del_iters), &meta_, cfd_->internal_comparator(), + dbname_, versions_, db_options_.env, db_options_.fs.get(), + *cfd_->ioptions(), mutable_cf_options_, file_options_, + cfd_->table_cache(), iter.get(), std::move(range_del_iters), &meta_, + &blob_file_additions, cfd_->internal_comparator(), cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(), cfd_->GetName(), existing_snapshots_, earliest_write_conflict_snapshot_, snapshot_checker_, @@ -425,7 +436,10 @@ Status FlushJob::WriteLevel0Table() { // Note that if file_size is zero, the file has been deleted and // should not be added to the manifest. - if (s.ok() && meta_.fd.GetFileSize() > 0) { + const bool has_output = meta_.fd.GetFileSize() > 0; + assert(has_output || blob_file_additions.empty()); + + if (s.ok() && has_output) { // if we have more than 1 background thread, then we cannot // insert files directly into higher levels because some other // threads could be concurrently producing compacted files for @@ -437,6 +451,8 @@ Status FlushJob::WriteLevel0Table() { meta_.marked_for_compaction, meta_.oldest_blob_file_number, meta_.oldest_ancester_time, meta_.file_creation_time, meta_.file_checksum, meta_.file_checksum_func_name); + + edit_->SetBlobFileAdditions(std::move(blob_file_additions)); } #ifndef ROCKSDB_LITE // Piggyback FlushJobInfo on the first first flushed memtable. @@ -447,11 +463,22 @@ Status FlushJob::WriteLevel0Table() { InternalStats::CompactionStats stats(CompactionReason::kFlush, 1); stats.micros = db_options_.env->NowMicros() - start_micros; stats.cpu_micros = db_options_.env->NowCPUNanos() / 1000 - start_cpu_micros; - stats.bytes_written = meta_.fd.GetFileSize(); + + if (has_output) { + stats.bytes_written = meta_.fd.GetFileSize(); + + const auto& blobs = edit_->GetBlobFileAdditions(); + for (const auto& blob : blobs) { + stats.bytes_written += blob.GetTotalBlobBytes(); + } + + stats.num_output_files = static_cast(blobs.size()) + 1; + } + RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros); cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats); cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED, - meta_.fd.GetFileSize()); + stats.bytes_written); RecordFlushIOStats(); return s; } diff --git a/db/internal_stats.h b/db/internal_stats.h index ce83be2447a..1e2de502a94 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -392,6 +392,8 @@ class InternalStats { bool GetIntPropertyOutOfMutex(const DBPropertyInfo& property_info, Version* version, uint64_t* value); + const uint64_t* TEST_GetCFStatsValue() const { return cf_stats_value_; } + const std::vector& TEST_GetCompactionStats() const { return comp_stats_; } diff --git a/db/memtable_list.cc b/db/memtable_list.cc index e2db297e192..dced9f7db7a 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -445,9 +445,18 @@ Status MemTableList::TryInstallMemtableFlushResults( } if (it == memlist.rbegin() || batch_file_number != m->file_number_) { batch_file_number = m->file_number_; - ROCKS_LOG_BUFFER(log_buffer, - "[%s] Level-0 commit table #%" PRIu64 " started", - cfd->GetName().c_str(), m->file_number_); + if (m->edit_.GetBlobFileAdditions().empty()) { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 " started", + cfd->GetName().c_str(), m->file_number_); + } else { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 + " (+%zu blob files) started", + cfd->GetName().c_str(), m->file_number_, + m->edit_.GetBlobFileAdditions().size()); + } + edit_list.push_back(&m->edit_); memtables_to_flush.push_back(m); #ifndef ROCKSDB_LITE @@ -502,9 +511,20 @@ Status MemTableList::TryInstallMemtableFlushResults( if (s.ok() && !cfd->IsDropped()) { // commit new state while (batch_count-- > 0) { MemTable* m = current_->memlist_.back(); - ROCKS_LOG_BUFFER(log_buffer, "[%s] Level-0 commit table #%" PRIu64 - ": memtable #%" PRIu64 " done", - cfd->GetName().c_str(), m->file_number_, mem_id); + if (m->edit_.GetBlobFileAdditions().empty()) { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 + ": memtable #%" PRIu64 " done", + cfd->GetName().c_str(), m->file_number_, mem_id); + } else { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 + " (+%zu blob files)" + ": memtable #%" PRIu64 " done", + cfd->GetName().c_str(), m->file_number_, + m->edit_.GetBlobFileAdditions().size(), mem_id); + } + assert(m->file_number_ > 0); current_->Remove(m, to_delete); UpdateCachedValuesFromMemTableListVersion(); @@ -515,9 +535,20 @@ Status MemTableList::TryInstallMemtableFlushResults( for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; ++it) { MemTable* m = *it; // commit failed. setup state so that we can flush again. - ROCKS_LOG_BUFFER(log_buffer, "Level-0 commit table #%" PRIu64 - ": memtable #%" PRIu64 " failed", - m->file_number_, mem_id); + if (m->edit_.GetBlobFileAdditions().empty()) { + ROCKS_LOG_BUFFER(log_buffer, + "Level-0 commit table #%" PRIu64 + ": memtable #%" PRIu64 " failed", + m->file_number_, mem_id); + } else { + ROCKS_LOG_BUFFER(log_buffer, + "Level-0 commit table #%" PRIu64 + " (+%zu blob files)" + ": memtable #%" PRIu64 " failed", + m->file_number_, + m->edit_.GetBlobFileAdditions().size(), mem_id); + } + m->flush_completed_ = false; m->flush_in_progress_ = false; m->edit_.Clear(); @@ -713,11 +744,25 @@ Status InstallMemtableAtomicFlushResults( for (auto m : *mems_list[i]) { assert(m->GetFileNumber() > 0); uint64_t mem_id = m->GetID(); - ROCKS_LOG_BUFFER(log_buffer, - "[%s] Level-0 commit table #%" PRIu64 - ": memtable #%" PRIu64 " done", - cfds[i]->GetName().c_str(), m->GetFileNumber(), - mem_id); + + const VersionEdit* const edit = m->GetEdits(); + assert(edit); + + if (edit->GetBlobFileAdditions().empty()) { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 + ": memtable #%" PRIu64 " done", + cfds[i]->GetName().c_str(), m->GetFileNumber(), + mem_id); + } else { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 + " (+%zu blob files)" + ": memtable #%" PRIu64 " done", + cfds[i]->GetName().c_str(), m->GetFileNumber(), + edit->GetBlobFileAdditions().size(), mem_id); + } + imm->current_->Remove(m, to_delete); imm->UpdateCachedValuesFromMemTableListVersion(); imm->ResetTrimHistoryNeeded(); @@ -728,11 +773,25 @@ Status InstallMemtableAtomicFlushResults( auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i); for (auto m : *mems_list[i]) { uint64_t mem_id = m->GetID(); - ROCKS_LOG_BUFFER(log_buffer, - "[%s] Level-0 commit table #%" PRIu64 - ": memtable #%" PRIu64 " failed", - cfds[i]->GetName().c_str(), m->GetFileNumber(), - mem_id); + + const VersionEdit* const edit = m->GetEdits(); + assert(edit); + + if (edit->GetBlobFileAdditions().empty()) { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 + ": memtable #%" PRIu64 " failed", + cfds[i]->GetName().c_str(), m->GetFileNumber(), + mem_id); + } else { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 + " (+%zu blob files)" + ": memtable #%" PRIu64 " failed", + cfds[i]->GetName().c_str(), m->GetFileNumber(), + edit->GetBlobFileAdditions().size(), mem_id); + } + m->SetFlushCompleted(false); m->SetFlushInProgress(false); m->GetEdits()->Clear(); diff --git a/db/repair.cc b/db/repair.cc index 25989269807..a96af2c3e95 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -429,18 +429,19 @@ class Repairer { LegacyFileSystemWrapper fs(env_); IOStatus io_s; status = BuildTable( - dbname_, env_, &fs, *cfd->ioptions(), + dbname_, /* versions */ nullptr, env_, &fs, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(), env_options_, table_cache_, iter.get(), std::move(range_del_iters), &meta, - cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), - cfd->GetID(), cfd->GetName(), {}, kMaxSequenceNumber, - snapshot_checker, kNoCompression, 0 /* sample_for_compression */, - CompressionOptions(), false, nullptr /* internal_stats */, - TableFileCreationReason::kRecovery, &io_s, nullptr /*IOTracer*/, - nullptr /* event_logger */, 0 /* job_id */, Env::IO_HIGH, - nullptr /* table_properties */, -1 /* level */, current_time, - 0 /* oldest_key_time */, write_hint, 0 /* file_creation_time */, - "DB Repairer" /* db_id */, db_session_id_); + nullptr /* blob_file_additions */, cfd->internal_comparator(), + cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), + {}, kMaxSequenceNumber, snapshot_checker, kNoCompression, + 0 /* sample_for_compression */, CompressionOptions(), false, + nullptr /* internal_stats */, TableFileCreationReason::kRecovery, + &io_s, nullptr /*IOTracer*/, nullptr /* event_logger */, + 0 /* job_id */, Env::IO_HIGH, nullptr /* table_properties */, + -1 /* level */, current_time, 0 /* oldest_key_time */, write_hint, + 0 /* file_creation_time */, "DB Repairer" /* db_id */, + db_session_id_); ROCKS_LOG_INFO(db_options_.info_log, "Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s", log, counter, meta.fd.GetNumber(), diff --git a/db/version_edit.cc b/db/version_edit.cc index f27b3eaab0e..8879f0e1bca 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -543,7 +543,7 @@ Status VersionEdit::DecodeFrom(const Slice& src) { return s; } - blob_file_additions_.emplace_back(blob_file_addition); + AddBlobFile(std::move(blob_file_addition)); break; } @@ -554,7 +554,7 @@ Status VersionEdit::DecodeFrom(const Slice& src) { return s; } - blob_file_garbages_.emplace_back(blob_file_garbage); + AddBlobFileGarbage(std::move(blob_file_garbage)); break; } diff --git a/db/version_edit.h b/db/version_edit.h index a84e3de09b5..d2d92c27247 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -414,12 +414,20 @@ class VersionEdit { std::move(checksum_method), std::move(checksum_value)); } + void AddBlobFile(BlobFileAddition blob_file_addition) { + blob_file_additions_.emplace_back(std::move(blob_file_addition)); + } + // Retrieve all the blob files added. using BlobFileAdditions = std::vector; const BlobFileAdditions& GetBlobFileAdditions() const { return blob_file_additions_; } + void SetBlobFileAdditions(BlobFileAdditions blob_file_additions) { + blob_file_additions_ = std::move(blob_file_additions); + } + // Add garbage for an existing blob file. Note: intentionally broken English // follows. void AddBlobFileGarbage(uint64_t blob_file_number, @@ -429,12 +437,20 @@ class VersionEdit { garbage_blob_bytes); } + void AddBlobFileGarbage(BlobFileGarbage blob_file_garbage) { + blob_file_garbages_.emplace_back(std::move(blob_file_garbage)); + } + // Retrieve all the blob file garbage added. using BlobFileGarbages = std::vector; const BlobFileGarbages& GetBlobFileGarbages() const { return blob_file_garbages_; } + void SetBlobFileGarbages(BlobFileGarbages blob_file_garbages) { + blob_file_garbages_ = std::move(blob_file_garbages); + } + // Add a WAL (either just created or closed). void AddWal(WalNumber number, WalMetadata metadata = WalMetadata()) { wal_additions_.emplace_back(number, std::move(metadata));