diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index a704226d4e4..daf5b67a6b9 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -69,12 +69,138 @@ void VerifyInitializationOfCompactionJobStats( #endif // !defined(IOS_CROSS_COMPILE) } +// Mock FSWritableFile for testing io priority. +// Only override the essential functions for testing compaction io priority. +class MockTestWritableFile : public FSWritableFileOwnerWrapper { + public: + MockTestWritableFile(std::unique_ptr&& file, + Env::IOPriority io_priority) + : FSWritableFileOwnerWrapper(std::move(file)), + write_io_priority_(io_priority) {} + IOStatus Append(const Slice& data, const IOOptions& options, + IODebugContext* dbg) override { + EXPECT_EQ(options.rate_limiter_priority, write_io_priority_); + return target()->Append(data, options, dbg); + } + IOStatus Append(const Slice& data, const IOOptions& options, + const DataVerificationInfo& verification_info, + IODebugContext* dbg) override { + EXPECT_EQ(options.rate_limiter_priority, write_io_priority_); + return target()->Append(data, options, verification_info, dbg); + } + IOStatus Close(const IOOptions& options, IODebugContext* dbg) override { + EXPECT_EQ(options.rate_limiter_priority, write_io_priority_); + return target()->Close(options, dbg); + } + IOStatus Flush(const IOOptions& options, IODebugContext* dbg) override { + EXPECT_EQ(options.rate_limiter_priority, write_io_priority_); + return target()->Flush(options, dbg); + } + IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override { + EXPECT_EQ(options.rate_limiter_priority, write_io_priority_); + return target()->Sync(options, dbg); + } + IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) override { + EXPECT_EQ(options.rate_limiter_priority, write_io_priority_); + return target()->Fsync(options, dbg); + } + uint64_t GetFileSize(const IOOptions& options, IODebugContext* dbg) override { + EXPECT_EQ(options.rate_limiter_priority, write_io_priority_); + return target()->GetFileSize(options, dbg); + } + IOStatus RangeSync(uint64_t offset, uint64_t nbytes, const IOOptions& options, + IODebugContext* dbg) override { + EXPECT_EQ(options.rate_limiter_priority, write_io_priority_); + return target()->RangeSync(offset, nbytes, options, dbg); + } + + void PrepareWrite(size_t offset, size_t len, const IOOptions& options, + IODebugContext* dbg) override { + EXPECT_EQ(options.rate_limiter_priority, write_io_priority_); + target()->PrepareWrite(offset, len, options, dbg); + } + + IOStatus Allocate(uint64_t offset, uint64_t len, const IOOptions& options, + IODebugContext* dbg) override { + EXPECT_EQ(options.rate_limiter_priority, write_io_priority_); + return target()->Allocate(offset, len, options, dbg); + } + + private: + Env::IOPriority write_io_priority_; +}; + +// Mock FSRandomAccessFile for testing io priority. +// Only override the essential functions for testing compaction io priority. +class MockTestRandomAccessFile : public FSRandomAccessFileOwnerWrapper { + public: + MockTestRandomAccessFile(std::unique_ptr&& file, + Env::IOPriority io_priority) + : FSRandomAccessFileOwnerWrapper(std::move(file)), + read_io_priority_(io_priority) {} + + IOStatus Read(uint64_t offset, size_t n, const IOOptions& options, + Slice* result, char* scratch, + IODebugContext* dbg) const override { + EXPECT_EQ(options.rate_limiter_priority, read_io_priority_); + return target()->Read(offset, n, options, result, scratch, dbg); + } + IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options, + IODebugContext* dbg) override { + EXPECT_EQ(options.rate_limiter_priority, read_io_priority_); + return target()->Prefetch(offset, n, options, dbg); + } + + private: + Env::IOPriority read_io_priority_; +}; + +// Mock FileSystem for testing io priority. +class MockTestFileSystem : public FileSystemWrapper { + public: + explicit MockTestFileSystem(const std::shared_ptr& base, + Env::IOPriority read_io_priority, + Env::IOPriority write_io_priority) + : FileSystemWrapper(base), + read_io_priority_(read_io_priority), + write_io_priority_(write_io_priority) {} + + static const char* kClassName() { return "MockTestFileSystem"; } + const char* Name() const override { return kClassName(); } + + IOStatus NewRandomAccessFile(const std::string& fname, + const FileOptions& file_opts, + std::unique_ptr* result, + IODebugContext* dbg) override { + IOStatus s = target()->NewRandomAccessFile(fname, file_opts, result, dbg); + EXPECT_OK(s); + result->reset( + new MockTestRandomAccessFile(std::move(*result), read_io_priority_)); + return s; + } + IOStatus NewWritableFile(const std::string& fname, + const FileOptions& file_opts, + std::unique_ptr* result, + IODebugContext* dbg) override { + IOStatus s = target()->NewWritableFile(fname, file_opts, result, dbg); + EXPECT_OK(s); + result->reset( + new MockTestWritableFile(std::move(*result), write_io_priority_)); + return s; + } + + private: + Env::IOPriority read_io_priority_; + Env::IOPriority write_io_priority_; +}; + } // namespace class CompactionJobTestBase : public testing::Test { protected: CompactionJobTestBase(std::string dbname, const Comparator* ucmp, - std::function encode_u64_ts) + std::function encode_u64_ts, + bool test_io_priority) : dbname_(std::move(dbname)), ucmp_(ucmp), db_options_(), @@ -90,7 +216,8 @@ class CompactionJobTestBase : public testing::Test { shutting_down_(false), mock_table_factory_(new mock::MockTableFactory()), error_handler_(nullptr, db_options_, &mutex_), - encode_u64_ts_(std::move(encode_u64_ts)) { + encode_u64_ts_(std::move(encode_u64_ts)), + test_io_priority_(test_io_priority) { Env* base_env = Env::Default(); EXPECT_OK( test::CreateEnvFromSystem(ConfigOptions(), &base_env, &env_guard_)); @@ -105,7 +232,12 @@ class CompactionJobTestBase : public testing::Test { db_options_.db_paths.emplace_back(dbname_, std::numeric_limits::max()); cf_options_.comparator = ucmp_; - cf_options_.table_factory = mock_table_factory_; + if (test_io_priority_) { + BlockBasedTableOptions table_options; + cf_options_.table_factory.reset(NewBlockBasedTableFactory(table_options)); + } else { + cf_options_.table_factory = mock_table_factory_; + } } std::string GenerateFileName(uint64_t file_number) { @@ -145,6 +277,33 @@ class CompactionJobTestBase : public testing::Test { return blob_index; } + // Creates a table with the specificied key value pairs. + void CreateTable(const std::string& table_name, + const mock::KVVector& contents, uint64_t& file_size) { + std::unique_ptr file_writer; + Status s = WritableFileWriter::Create(fs_, table_name, FileOptions(), + &file_writer, nullptr); + ASSERT_OK(s); + std::unique_ptr table_builder( + cf_options_.table_factory->NewTableBuilder( + TableBuilderOptions(*cfd_->ioptions(), mutable_cf_options_, + cfd_->internal_comparator(), + cfd_->int_tbl_prop_collector_factories(), + CompressionType::kNoCompression, + CompressionOptions(), 0 /* column_family_id */, + kDefaultColumnFamilyName, -1 /* level */), + file_writer.get())); + // Build table. + for (auto kv : contents) { + std::string key; + std::string value; + std::tie(key, value) = kv; + table_builder->Add(key, value); + } + ASSERT_OK(table_builder->Finish()); + file_size = table_builder->FileSize(); + } + void AddMockFile(const mock::KVVector& contents, int level = 0) { assert(contents.size() > 0); @@ -198,11 +357,18 @@ class CompactionJobTestBase : public testing::Test { } uint64_t file_number = versions_->NewFileNumber(); - EXPECT_OK(mock_table_factory_->CreateMockTable( - env_, GenerateFileName(file_number), std::move(contents))); + + uint64_t file_size; + if (test_io_priority_) { + CreateTable(GenerateFileName(file_number), contents, file_size); + } else { + file_size = 10; + EXPECT_OK(mock_table_factory_->CreateMockTable( + env_, GenerateFileName(file_number), std::move(contents))); + } VersionEdit edit; - edit.AddFile(level, file_number, 0, 10, smallest_key, largest_key, + edit.AddFile(level, file_number, 0, file_size, smallest_key, largest_key, smallest_seqno, largest_seqno, false, Temperature::kUnknown, oldest_blob_file_number, kUnknownOldestAncesterTime, kUnknownFileCreationTime, kUnknownFileChecksum, @@ -323,7 +489,15 @@ class CompactionJobTestBase : public testing::Test { SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber, int output_level = 1, bool verify = true, uint64_t expected_oldest_blob_file_number = kInvalidBlobFileNumber, - bool check_get_priority = false) { + bool check_get_priority = false, + Env::IOPriority read_io_priority = Env::IO_TOTAL, + Env::IOPriority write_io_priority = Env::IO_TOTAL) { + // For compaction, set fs as MockTestFileSystem to check the io_priority. + if (test_io_priority_) { + db_options_.fs.reset( + new MockTestFileSystem(fs_, read_io_priority, write_io_priority)); + } + auto cfd = versions_->GetColumnFamilySet()->GetDefault(); size_t num_input_files = 0; @@ -445,15 +619,16 @@ class CompactionJobTestBase : public testing::Test { ErrorHandler error_handler_; std::string full_history_ts_low_; const std::function encode_u64_ts_; + bool test_io_priority_; }; // TODO(icanadi) Make it simpler once we mock out VersionSet class CompactionJobTest : public CompactionJobTestBase { public: CompactionJobTest() - : CompactionJobTestBase(test::PerThreadDBPath("compaction_job_test"), - BytewiseComparator(), - [](uint64_t /*ts*/) { return ""; }) {} + : CompactionJobTestBase( + test::PerThreadDBPath("compaction_job_test"), BytewiseComparator(), + [](uint64_t /*ts*/) { return ""; }, false) {} }; TEST_F(CompactionJobTest, Simple) { @@ -1343,23 +1518,13 @@ TEST_F(CompactionJobTest, ResultSerialization) { } } -TEST_F(CompactionJobTest, GetRateLimiterPriority) { - NewDB(); - - auto expected_results = CreateTwoFiles(false); - auto cfd = versions_->GetColumnFamilySet()->GetDefault(); - auto files = cfd->current()->storage_info()->LevelFiles(0); - ASSERT_EQ(2U, files.size()); - RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, true, - kInvalidBlobFileNumber, true); -} class CompactionJobTimestampTest : public CompactionJobTestBase { public: CompactionJobTimestampTest() : CompactionJobTestBase(test::PerThreadDBPath("compaction_job_ts_test"), test::BytewiseComparatorWithU64TsWrapper(), - test::EncodeInt) {} + test::EncodeInt, false) {} }; TEST_F(CompactionJobTimestampTest, GCDisabled) { @@ -1475,6 +1640,69 @@ TEST_F(CompactionJobTimestampTest, SomeKeysExpired) { RunCompaction({files}, expected_results); } +// The io priority of the compaction reads and writes are different from +// other DB reads and writes. To prepare the compaction input files, use the +// default filesystem from Env. To test the io priority of the compaction +// reads and writes, db_options_.fs is set as MockTestFileSystem. +class CompactionJobIOPriorityTest : public CompactionJobTestBase { + public: + CompactionJobIOPriorityTest() + : CompactionJobTestBase( + test::PerThreadDBPath("compaction_job_io_priority_test"), + BytewiseComparator(), [](uint64_t /*ts*/) { return ""; }, true) {} +}; + +TEST_F(CompactionJobIOPriorityTest, WriteControllerStateNormal) { + // When the state from WriteController is normal. + NewDB(); + mock::KVVector expected_results = CreateTwoFiles(false); + auto cfd = versions_->GetColumnFamilySet()->GetDefault(); + auto files = cfd->current()->storage_info()->LevelFiles(0); + ASSERT_EQ(2U, files.size()); + RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, false, + kInvalidBlobFileNumber, false, Env::IO_LOW, Env::IO_LOW); +} + +TEST_F(CompactionJobIOPriorityTest, WriteControllerStateDelayed) { + // When the state from WriteController is Delayed. + NewDB(); + mock::KVVector expected_results = CreateTwoFiles(false); + auto cfd = versions_->GetColumnFamilySet()->GetDefault(); + auto files = cfd->current()->storage_info()->LevelFiles(0); + ASSERT_EQ(2U, files.size()); + { + std::unique_ptr delay_token = + write_controller_.GetDelayToken(1000000); + RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, false, + kInvalidBlobFileNumber, false, Env::IO_USER, Env::IO_USER); + } +} + +TEST_F(CompactionJobIOPriorityTest, WriteControllerStateStalled) { + // When the state from WriteController is Stalled. + NewDB(); + mock::KVVector expected_results = CreateTwoFiles(false); + auto cfd = versions_->GetColumnFamilySet()->GetDefault(); + auto files = cfd->current()->storage_info()->LevelFiles(0); + ASSERT_EQ(2U, files.size()); + { + std::unique_ptr stop_token = + write_controller_.GetStopToken(); + RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, false, + kInvalidBlobFileNumber, false, Env::IO_USER, Env::IO_USER); + } +} + +TEST_F(CompactionJobIOPriorityTest, GetRateLimiterPriority) { + NewDB(); + mock::KVVector expected_results = CreateTwoFiles(false); + auto cfd = versions_->GetColumnFamilySet()->GetDefault(); + auto files = cfd->current()->storage_info()->LevelFiles(0); + ASSERT_EQ(2U, files.size()); + RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, false, + kInvalidBlobFileNumber, true, Env::IO_LOW, Env::IO_LOW); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) {