Skip to content

Commit

Permalink
Add unit test to verify that the dynamic priority can be passed from …
Browse files Browse the repository at this point in the history
…compaction to FS (#10088)

Summary:
**Summary:**
Add unit tests to verify that the dynamic priority can be passed from compaction to FS. Compaction reads&writes and other DB reads&writes share the same read&write paths to FSRandomAccessFile or FSWritableFile, so a MockTestFileSystem is added to replace the default filesystem from Env to intercept and verify the io_priority. To prepare the compaction input files, use the default filesystem from Env. To test the io priority of the compaction reads and writes, db_options_.fs is set as MockTestFileSystem.

Pull Request resolved: #10088

Test Plan: Add unit tests.

Reviewed By: anand1976

Differential Revision: D36882528

Pulled By: gitbw95

fbshipit-source-id: 120adc15801966f2b8c9fc45285f590a3fff96d1
  • Loading branch information
gitbw95 authored and facebook-github-bot committed Jun 7, 2022
1 parent b6de139 commit 5cbee1f
Showing 1 changed file with 249 additions and 21 deletions.
270 changes: 249 additions & 21 deletions db/compaction/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,138 @@ void VerifyInitializationOfCompactionJobStats(
#endif // !defined(IOS_CROSS_COMPILE)
}

// Mock FSWritableFile for testing io priority.
// Only override the essential functions for testing compaction io priority.
class MockTestWritableFile : public FSWritableFileOwnerWrapper {
public:
MockTestWritableFile(std::unique_ptr<FSWritableFile>&& file,
Env::IOPriority io_priority)
: FSWritableFileOwnerWrapper(std::move(file)),
write_io_priority_(io_priority) {}
IOStatus Append(const Slice& data, const IOOptions& options,
IODebugContext* dbg) override {
EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
return target()->Append(data, options, dbg);
}
IOStatus Append(const Slice& data, const IOOptions& options,
const DataVerificationInfo& verification_info,
IODebugContext* dbg) override {
EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
return target()->Append(data, options, verification_info, dbg);
}
IOStatus Close(const IOOptions& options, IODebugContext* dbg) override {
EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
return target()->Close(options, dbg);
}
IOStatus Flush(const IOOptions& options, IODebugContext* dbg) override {
EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
return target()->Flush(options, dbg);
}
IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override {
EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
return target()->Sync(options, dbg);
}
IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) override {
EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
return target()->Fsync(options, dbg);
}
uint64_t GetFileSize(const IOOptions& options, IODebugContext* dbg) override {
EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
return target()->GetFileSize(options, dbg);
}
IOStatus RangeSync(uint64_t offset, uint64_t nbytes, const IOOptions& options,
IODebugContext* dbg) override {
EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
return target()->RangeSync(offset, nbytes, options, dbg);
}

void PrepareWrite(size_t offset, size_t len, const IOOptions& options,
IODebugContext* dbg) override {
EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
target()->PrepareWrite(offset, len, options, dbg);
}

IOStatus Allocate(uint64_t offset, uint64_t len, const IOOptions& options,
IODebugContext* dbg) override {
EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
return target()->Allocate(offset, len, options, dbg);
}

private:
Env::IOPriority write_io_priority_;
};

// Mock FSRandomAccessFile for testing io priority.
// Only override the essential functions for testing compaction io priority.
class MockTestRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
public:
MockTestRandomAccessFile(std::unique_ptr<FSRandomAccessFile>&& file,
Env::IOPriority io_priority)
: FSRandomAccessFileOwnerWrapper(std::move(file)),
read_io_priority_(io_priority) {}

IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
Slice* result, char* scratch,
IODebugContext* dbg) const override {
EXPECT_EQ(options.rate_limiter_priority, read_io_priority_);
return target()->Read(offset, n, options, result, scratch, dbg);
}
IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options,
IODebugContext* dbg) override {
EXPECT_EQ(options.rate_limiter_priority, read_io_priority_);
return target()->Prefetch(offset, n, options, dbg);
}

private:
Env::IOPriority read_io_priority_;
};

// Mock FileSystem for testing io priority.
class MockTestFileSystem : public FileSystemWrapper {
public:
explicit MockTestFileSystem(const std::shared_ptr<FileSystem>& base,
Env::IOPriority read_io_priority,
Env::IOPriority write_io_priority)
: FileSystemWrapper(base),
read_io_priority_(read_io_priority),
write_io_priority_(write_io_priority) {}

static const char* kClassName() { return "MockTestFileSystem"; }
const char* Name() const override { return kClassName(); }

IOStatus NewRandomAccessFile(const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<FSRandomAccessFile>* result,
IODebugContext* dbg) override {
IOStatus s = target()->NewRandomAccessFile(fname, file_opts, result, dbg);
EXPECT_OK(s);
result->reset(
new MockTestRandomAccessFile(std::move(*result), read_io_priority_));
return s;
}
IOStatus NewWritableFile(const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<FSWritableFile>* result,
IODebugContext* dbg) override {
IOStatus s = target()->NewWritableFile(fname, file_opts, result, dbg);
EXPECT_OK(s);
result->reset(
new MockTestWritableFile(std::move(*result), write_io_priority_));
return s;
}

private:
Env::IOPriority read_io_priority_;
Env::IOPriority write_io_priority_;
};

} // namespace

class CompactionJobTestBase : public testing::Test {
protected:
CompactionJobTestBase(std::string dbname, const Comparator* ucmp,
std::function<std::string(uint64_t)> encode_u64_ts)
std::function<std::string(uint64_t)> encode_u64_ts,
bool test_io_priority)
: dbname_(std::move(dbname)),
ucmp_(ucmp),
db_options_(),
Expand All @@ -90,7 +216,8 @@ class CompactionJobTestBase : public testing::Test {
shutting_down_(false),
mock_table_factory_(new mock::MockTableFactory()),
error_handler_(nullptr, db_options_, &mutex_),
encode_u64_ts_(std::move(encode_u64_ts)) {
encode_u64_ts_(std::move(encode_u64_ts)),
test_io_priority_(test_io_priority) {
Env* base_env = Env::Default();
EXPECT_OK(
test::CreateEnvFromSystem(ConfigOptions(), &base_env, &env_guard_));
Expand All @@ -105,7 +232,12 @@ class CompactionJobTestBase : public testing::Test {
db_options_.db_paths.emplace_back(dbname_,
std::numeric_limits<uint64_t>::max());
cf_options_.comparator = ucmp_;
cf_options_.table_factory = mock_table_factory_;
if (test_io_priority_) {
BlockBasedTableOptions table_options;
cf_options_.table_factory.reset(NewBlockBasedTableFactory(table_options));
} else {
cf_options_.table_factory = mock_table_factory_;
}
}

std::string GenerateFileName(uint64_t file_number) {
Expand Down Expand Up @@ -145,6 +277,33 @@ class CompactionJobTestBase : public testing::Test {
return blob_index;
}

// Creates a table with the specificied key value pairs.
void CreateTable(const std::string& table_name,
const mock::KVVector& contents, uint64_t& file_size) {
std::unique_ptr<WritableFileWriter> file_writer;
Status s = WritableFileWriter::Create(fs_, table_name, FileOptions(),
&file_writer, nullptr);
ASSERT_OK(s);
std::unique_ptr<TableBuilder> table_builder(
cf_options_.table_factory->NewTableBuilder(
TableBuilderOptions(*cfd_->ioptions(), mutable_cf_options_,
cfd_->internal_comparator(),
cfd_->int_tbl_prop_collector_factories(),
CompressionType::kNoCompression,
CompressionOptions(), 0 /* column_family_id */,
kDefaultColumnFamilyName, -1 /* level */),
file_writer.get()));
// Build table.
for (auto kv : contents) {
std::string key;
std::string value;
std::tie(key, value) = kv;
table_builder->Add(key, value);
}
ASSERT_OK(table_builder->Finish());
file_size = table_builder->FileSize();
}

void AddMockFile(const mock::KVVector& contents, int level = 0) {
assert(contents.size() > 0);

Expand Down Expand Up @@ -198,11 +357,18 @@ class CompactionJobTestBase : public testing::Test {
}

uint64_t file_number = versions_->NewFileNumber();
EXPECT_OK(mock_table_factory_->CreateMockTable(
env_, GenerateFileName(file_number), std::move(contents)));

uint64_t file_size;
if (test_io_priority_) {
CreateTable(GenerateFileName(file_number), contents, file_size);
} else {
file_size = 10;
EXPECT_OK(mock_table_factory_->CreateMockTable(
env_, GenerateFileName(file_number), std::move(contents)));
}

VersionEdit edit;
edit.AddFile(level, file_number, 0, 10, smallest_key, largest_key,
edit.AddFile(level, file_number, 0, file_size, smallest_key, largest_key,
smallest_seqno, largest_seqno, false, Temperature::kUnknown,
oldest_blob_file_number, kUnknownOldestAncesterTime,
kUnknownFileCreationTime, kUnknownFileChecksum,
Expand Down Expand Up @@ -323,7 +489,15 @@ class CompactionJobTestBase : public testing::Test {
SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber,
int output_level = 1, bool verify = true,
uint64_t expected_oldest_blob_file_number = kInvalidBlobFileNumber,
bool check_get_priority = false) {
bool check_get_priority = false,
Env::IOPriority read_io_priority = Env::IO_TOTAL,
Env::IOPriority write_io_priority = Env::IO_TOTAL) {
// For compaction, set fs as MockTestFileSystem to check the io_priority.
if (test_io_priority_) {
db_options_.fs.reset(
new MockTestFileSystem(fs_, read_io_priority, write_io_priority));
}

auto cfd = versions_->GetColumnFamilySet()->GetDefault();

size_t num_input_files = 0;
Expand Down Expand Up @@ -445,15 +619,16 @@ class CompactionJobTestBase : public testing::Test {
ErrorHandler error_handler_;
std::string full_history_ts_low_;
const std::function<std::string(uint64_t)> encode_u64_ts_;
bool test_io_priority_;
};

// TODO(icanadi) Make it simpler once we mock out VersionSet
class CompactionJobTest : public CompactionJobTestBase {
public:
CompactionJobTest()
: CompactionJobTestBase(test::PerThreadDBPath("compaction_job_test"),
BytewiseComparator(),
[](uint64_t /*ts*/) { return ""; }) {}
: CompactionJobTestBase(
test::PerThreadDBPath("compaction_job_test"), BytewiseComparator(),
[](uint64_t /*ts*/) { return ""; }, false) {}
};

TEST_F(CompactionJobTest, Simple) {
Expand Down Expand Up @@ -1343,23 +1518,13 @@ TEST_F(CompactionJobTest, ResultSerialization) {
}
}

TEST_F(CompactionJobTest, GetRateLimiterPriority) {
NewDB();

auto expected_results = CreateTwoFiles(false);
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
auto files = cfd->current()->storage_info()->LevelFiles(0);
ASSERT_EQ(2U, files.size());
RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, true,
kInvalidBlobFileNumber, true);
}

class CompactionJobTimestampTest : public CompactionJobTestBase {
public:
CompactionJobTimestampTest()
: CompactionJobTestBase(test::PerThreadDBPath("compaction_job_ts_test"),
test::BytewiseComparatorWithU64TsWrapper(),
test::EncodeInt) {}
test::EncodeInt, false) {}
};

TEST_F(CompactionJobTimestampTest, GCDisabled) {
Expand Down Expand Up @@ -1475,6 +1640,69 @@ TEST_F(CompactionJobTimestampTest, SomeKeysExpired) {
RunCompaction({files}, expected_results);
}

// The io priority of the compaction reads and writes are different from
// other DB reads and writes. To prepare the compaction input files, use the
// default filesystem from Env. To test the io priority of the compaction
// reads and writes, db_options_.fs is set as MockTestFileSystem.
class CompactionJobIOPriorityTest : public CompactionJobTestBase {
public:
CompactionJobIOPriorityTest()
: CompactionJobTestBase(
test::PerThreadDBPath("compaction_job_io_priority_test"),
BytewiseComparator(), [](uint64_t /*ts*/) { return ""; }, true) {}
};

TEST_F(CompactionJobIOPriorityTest, WriteControllerStateNormal) {
// When the state from WriteController is normal.
NewDB();
mock::KVVector expected_results = CreateTwoFiles(false);
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
auto files = cfd->current()->storage_info()->LevelFiles(0);
ASSERT_EQ(2U, files.size());
RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, false,
kInvalidBlobFileNumber, false, Env::IO_LOW, Env::IO_LOW);
}

TEST_F(CompactionJobIOPriorityTest, WriteControllerStateDelayed) {
// When the state from WriteController is Delayed.
NewDB();
mock::KVVector expected_results = CreateTwoFiles(false);
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
auto files = cfd->current()->storage_info()->LevelFiles(0);
ASSERT_EQ(2U, files.size());
{
std::unique_ptr<WriteControllerToken> delay_token =
write_controller_.GetDelayToken(1000000);
RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, false,
kInvalidBlobFileNumber, false, Env::IO_USER, Env::IO_USER);
}
}

TEST_F(CompactionJobIOPriorityTest, WriteControllerStateStalled) {
// When the state from WriteController is Stalled.
NewDB();
mock::KVVector expected_results = CreateTwoFiles(false);
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
auto files = cfd->current()->storage_info()->LevelFiles(0);
ASSERT_EQ(2U, files.size());
{
std::unique_ptr<WriteControllerToken> stop_token =
write_controller_.GetStopToken();
RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, false,
kInvalidBlobFileNumber, false, Env::IO_USER, Env::IO_USER);
}
}

TEST_F(CompactionJobIOPriorityTest, GetRateLimiterPriority) {
NewDB();
mock::KVVector expected_results = CreateTwoFiles(false);
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
auto files = cfd->current()->storage_info()->LevelFiles(0);
ASSERT_EQ(2U, files.size());
RunCompaction({files}, expected_results, {}, kMaxSequenceNumber, 1, false,
kInvalidBlobFileNumber, true, Env::IO_LOW, Env::IO_LOW);
}

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down

0 comments on commit 5cbee1f

Please sign in to comment.