Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Summary:
1. Pass IOOptions to RandomAccessFileReader::Read and MultiRead instead
of ReadOptions
2. Use SpecialEnv in unit tests to fake time
  • Loading branch information
anand76 committed Apr 30, 2020
1 parent 026c3cd commit a45f154
Show file tree
Hide file tree
Showing 22 changed files with 154 additions and 144 deletions.
36 changes: 17 additions & 19 deletions db/db_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2387,11 +2387,12 @@ class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet {

class DeadlineRandomAccessFile : public FSRandomAccessFileWrapper {
public:
DeadlineRandomAccessFile(DeadlineFS& fs,
DeadlineRandomAccessFile(DeadlineFS& fs, SpecialEnv* env,
std::unique_ptr<FSRandomAccessFile>& file)
: FSRandomAccessFileWrapper(file.get()),
fs_(fs),
file_(std::move(file)) {}
file_(std::move(file)),
env_(env) {}

IOStatus Read(uint64_t offset, size_t len, const IOOptions& opts,
Slice* result, char* scratch, IODebugContext* dbg) const override {
Expand All @@ -2401,7 +2402,7 @@ class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet {
AssertDeadline(deadline, opts);
}
if (fs_.ShouldDelay(&delay)) {
Env::Default()->SleepForMicroseconds(delay);
env_->SleepForMicroseconds(delay);
}
return FSRandomAccessFileWrapper::Read(offset, len, opts, result, scratch,
dbg);
Expand All @@ -2415,7 +2416,7 @@ class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet {
AssertDeadline(deadline, options);
}
if (fs_.ShouldDelay(&delay)) {
Env::Default()->SleepForMicroseconds(delay);
env_->SleepForMicroseconds(delay);
}
return FSRandomAccessFileWrapper::MultiRead(reqs, num_reqs, options, dbg);
}
Expand All @@ -2426,21 +2427,21 @@ class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet {
// Give a leeway of +- 10us as it can take some time for the Get/
// MultiGet call to reach here, in order to avoid false alarms
std::chrono::microseconds now =
std::chrono::microseconds(Env::Default()->NowMicros());
std::chrono::microseconds delta = std::chrono::microseconds(10);
EXPECT_GE(deadline + delta - now, opts.timeout);
EXPECT_LE(deadline - delta - now, opts.timeout);
std::chrono::microseconds(env_->NowMicros());
ASSERT_EQ(deadline - now, opts.timeout);
}
DeadlineFS& fs_;
std::unique_ptr<FSRandomAccessFile> file_;
SpecialEnv* env_;
};

class DeadlineFS : public FileSystemWrapper {
public:
DeadlineFS()
DeadlineFS(SpecialEnv* env)
: FileSystemWrapper(FileSystem::Default()),
delay_idx_(0),
deadline_(std::chrono::microseconds::zero()) {}
deadline_(std::chrono::microseconds::zero()),
env_(env) {}
~DeadlineFS() = default;

IOStatus NewRandomAccessFile(const std::string& fname,
Expand All @@ -2451,7 +2452,7 @@ class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet {
IOStatus s;

s = target()->NewRandomAccessFile(fname, opts, &file, dbg);
result->reset(new DeadlineRandomAccessFile(*this, file));
result->reset(new DeadlineRandomAccessFile(*this, env_, file));
return s;
}

Expand Down Expand Up @@ -2492,6 +2493,7 @@ class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet {
size_t delay_idx_;
int io_count_;
std::chrono::microseconds deadline_;
SpecialEnv* env_;
};

inline void CheckStatus(std::vector<Status>& statuses, size_t num_ok) {
Expand All @@ -2507,8 +2509,10 @@ class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet {

TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
std::shared_ptr<DBBasicTestMultiGetDeadline::DeadlineFS> fs(
new DBBasicTestMultiGetDeadline::DeadlineFS());
std::unique_ptr<Env> env = NewCompositeEnv(fs);
new DBBasicTestMultiGetDeadline::DeadlineFS(env_));
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
env_->no_slowdown_ = true;
env_->time_elapse_only_sleep_.store(true);
Options options = CurrentOptions();

std::shared_ptr<Cache> cache = NewLRUCache(1048576);
Expand Down Expand Up @@ -2540,7 +2544,6 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
fs->SetDelaySequence(ro.deadline, {{0, 20000}});

std::vector<Status> statuses = dbfull()->MultiGet(ro, cfs, keys, &values);
std::cout << "Non-batched MultiGet";
// The first key is successful because we check after the lookup, but
// subsequent keys fail due to deadline exceeded
CheckStatus(statuses, 1);
Expand All @@ -2565,7 +2568,6 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000};
fs->SetDelaySequence(ro.deadline, {{1, 20000}});
statuses = dbfull()->MultiGet(ro, cfs, keys, &values);
std::cout << "Non-batched 2";
CheckStatus(statuses, 3);

// Test batched MultiGet with an IO delay in the first data block read.
Expand All @@ -2581,7 +2583,6 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
fs->SetDelaySequence(ro.deadline, {{0, 20000}});
dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(),
pin_values.data(), statuses.data());
std::cout << "Batched 1";
CheckStatus(statuses, 2);

// Similar to the previous one, but an IO delay in the third CF data block
Expand All @@ -2597,7 +2598,6 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
fs->SetDelaySequence(ro.deadline, {{2, 20000}});
dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(),
pin_values.data(), statuses.data());
std::cout << "Batched 2";
CheckStatus(statuses, 6);

// Similar to the previous one, but an IO delay in the last but one CF
Expand All @@ -2612,7 +2612,6 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
fs->SetDelaySequence(ro.deadline, {{3, 20000}});
dbfull()->MultiGet(ro, keys.size(), cfs.data(), keys.data(),
pin_values.data(), statuses.data());
std::cout << "Batched 3";
CheckStatus(statuses, 8);

// Test batched MultiGet with single CF and lots of keys. Inject delay
Expand All @@ -2639,7 +2638,6 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
fs->SetDelaySequence(ro.deadline, {{1, 20000}});
dbfull()->MultiGet(ro, handles_[0], keys.size(), keys.data(),
pin_values.data(), statuses.data());
std::cout << "Batched single CF";
CheckStatus(statuses, 64);
Close();
}
Expand Down
3 changes: 1 addition & 2 deletions file/file_prefetch_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ Status FilePrefetchBuffer::Prefetch(RandomAccessFileReader* reader,

Slice result;
size_t read_len = static_cast<size_t>(roundup_len - chunk_len);
s = reader->Read(ReadOptions(), rounddown_offset + chunk_len,
read_len, &result,
s = reader->Read(IOOptions(), rounddown_offset + chunk_len, read_len, &result,
buffer_.BufferStart() + chunk_len, nullptr, for_compaction);
#ifndef NDEBUG
if (!s.ok() || result.size() < read_len) {
Expand Down
16 changes: 16 additions & 0 deletions file/file_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,20 @@ extern Status DeleteDBFile(const ImmutableDBOptions* db_options,

extern bool IsWalDirSameAsDBPath(const ImmutableDBOptions* db_options);

inline IOStatus PrepareIOFromReadOptions(const ReadOptions& ro, Env* env,
IOOptions& opts) {
if (!env) {
env = Env::Default();
}

if (ro.deadline.count()) {
std::chrono::microseconds now = std::chrono::microseconds(env->NowMicros());
if (now > ro.deadline) {
return IOStatus::TimedOut("Deadline exceeded");
}
opts.timeout = ro.deadline - now;
}
return IOStatus::OK();
}

} // namespace ROCKSDB_NAMESPACE
64 changes: 32 additions & 32 deletions file/random_access_file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

namespace ROCKSDB_NAMESPACE {

Status RandomAccessFileReader::Read(const ReadOptions& ro, uint64_t offset,
Status RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset,
size_t n, Slice* result, char* scratch,
AlignedBuf* aligned_buf,
bool for_compaction) const {
Expand Down Expand Up @@ -65,19 +65,20 @@ Status RandomAccessFileReader::Read(const ReadOptions& ro, uint64_t offset,
orig_offset = aligned_offset + buf.CurrentSize();
}

IOOptions opts;
s = PrepareIOFromReadOptions(ro, env_, opts);
if (s.ok()) {
{
IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, opts,
&tmp, buf.Destination(), nullptr);
}
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::system_clock::now();
NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
s);
}
{
IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
// Only user reads are expected to specify a timeout. And user reads
// are not subjected to rate_limiter and should go through only
// one iteration of this loop, so we don't need to check and adjust
// the opts.timeout before calling file_->Read
assert(!opts.timeout.count() || allowed == read_size);
s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, opts,
&tmp, buf.Destination(), nullptr);
}
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::system_clock::now();
NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
s);
}

buf.Size(buf.CurrentSize() + tmp.size());
Expand Down Expand Up @@ -124,22 +125,23 @@ Status RandomAccessFileReader::Read(const ReadOptions& ro, uint64_t offset,
}
#endif

IOOptions opts;
s = PrepareIOFromReadOptions(ro, env_, opts);
if (s.ok()) {
{
IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
s = file_->Read(offset + pos, allowed, opts, &tmp_result,
scratch + pos, nullptr);
}
{
IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
// Only user reads are expected to specify a timeout. And user reads
// are not subjected to rate_limiter and should go through only
// one iteration of this loop, so we don't need to check and adjust
// the opts.timeout before calling file_->Read
assert(!opts.timeout.count() || allowed == n);
s = file_->Read(offset + pos, allowed, opts, &tmp_result,
scratch + pos, nullptr);
}
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::system_clock::now();
NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts,
finish_ts, s);
}
#endif
if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::system_clock::now();
NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts,
finish_ts, s);
}
#endif

if (res_scratch == nullptr) {
// we can't simply use `scratch` because reads of mmap'd files return
Expand Down Expand Up @@ -198,7 +200,7 @@ bool TryMerge(FSReadRequest* dest, const FSReadRequest& src) {
return true;
}

Status RandomAccessFileReader::MultiRead(const ReadOptions& ro,
Status RandomAccessFileReader::MultiRead(const IOOptions& opts,
FSReadRequest* read_reqs,
size_t num_reqs,
AlignedBuf* aligned_buf) const {
Expand Down Expand Up @@ -258,9 +260,7 @@ Status RandomAccessFileReader::MultiRead(const ReadOptions& ro,
}
#endif // ROCKSDB_LITE

IOOptions opts;
s = PrepareIOFromReadOptions(ro, env_, opts);
if (s.ok()) {
{
IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
s = file_->MultiRead(fs_reqs, num_fs_reqs, opts, nullptr);
}
Expand Down
6 changes: 4 additions & 2 deletions file/random_access_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class RandomAccessFileReader {
// 2. Otherwise, scratch is not used and can be null, the aligned_buf owns
// the internally allocated buffer on return, and the result refers to a
// region in aligned_buf.
Status Read(const ReadOptions& ro, uint64_t offset, size_t n, Slice* result,
Status Read(const IOOptions& opts, uint64_t offset, size_t n, Slice* result,
char* scratch, AlignedBuf* aligned_buf,
bool for_compaction = false) const;

Expand All @@ -123,7 +123,7 @@ class RandomAccessFileReader {
// In non-direct IO mode, aligned_buf should be null;
// In direct IO mode, aligned_buf stores the aligned buffer allocated inside
// MultiRead, the result Slices in reqs refer to aligned_buf.
Status MultiRead(const ReadOptions& ro, FSReadRequest* reqs, size_t num_reqs,
Status MultiRead(const IOOptions& opts, FSReadRequest* reqs, size_t num_reqs,
AlignedBuf* aligned_buf) const;

Status Prefetch(uint64_t offset, size_t n) const {
Expand All @@ -135,5 +135,7 @@ class RandomAccessFileReader {
std::string file_name() const { return file_name_; }

bool use_direct_io() const { return file_->use_direct_io(); }

Env* env() const { return env_; }
};
} // namespace ROCKSDB_NAMESPACE
18 changes: 9 additions & 9 deletions file/random_access_file_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ TEST_F(RandomAccessFileReaderTest, ReadDirectIO) {
Slice result;
AlignedBuf buf;
for (bool for_compaction : {true, false}) {
ASSERT_OK(r->Read(ReadOptions(), offset, len, &result, nullptr, &buf,
ASSERT_OK(r->Read(IOOptions(), offset, len, &result, nullptr, &buf,
for_compaction));
ASSERT_EQ(result.ToString(), content.substr(offset, len));
}
Expand Down Expand Up @@ -153,8 +153,8 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) {
reqs.push_back(std::move(r0));
reqs.push_back(std::move(r1));
AlignedBuf aligned_buf;
ASSERT_OK(r->MultiRead(ReadOptions(), reqs.data(), reqs.size(),
&aligned_buf));
ASSERT_OK(
r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf));

AssertResult(content, reqs);
}
Expand Down Expand Up @@ -191,8 +191,8 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) {
reqs.push_back(std::move(r1));
reqs.push_back(std::move(r2));
AlignedBuf aligned_buf;
ASSERT_OK(r->MultiRead(ReadOptions(), reqs.data(), reqs.size(),
&aligned_buf));
ASSERT_OK(
r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf));

AssertResult(content, reqs);
}
Expand Down Expand Up @@ -229,8 +229,8 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) {
reqs.push_back(std::move(r1));
reqs.push_back(std::move(r2));
AlignedBuf aligned_buf;
ASSERT_OK(r->MultiRead(ReadOptions(), reqs.data(), reqs.size(),
&aligned_buf));
ASSERT_OK(
r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf));

AssertResult(content, reqs);
}
Expand Down Expand Up @@ -259,8 +259,8 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) {
reqs.push_back(std::move(r0));
reqs.push_back(std::move(r1));
AlignedBuf aligned_buf;
ASSERT_OK(r->MultiRead(ReadOptions(), reqs.data(), reqs.size(),
&aligned_buf));
ASSERT_OK(
r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf));

AssertResult(content, reqs);
}
Expand Down
13 changes: 12 additions & 1 deletion table/block_based/block_based_table_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "db/dbformat.h"
#include "db/pinned_iterators_manager.h"
#include "file/file_prefetch_buffer.h"
#include "file/file_util.h"
#include "file/random_access_file_reader.h"
#include "monitoring/perf_context_imp.h"
#include "options/options_helper.h"
Expand Down Expand Up @@ -1671,7 +1672,17 @@ void BlockBasedTable::RetrieveMultipleBlocks(
read_reqs.emplace_back(req);
}

file->MultiRead(options, &read_reqs[0], read_reqs.size(), nullptr);
{
IOOptions opts;
IOStatus s = PrepareIOFromReadOptions(options, file->env(), opts);
if (s.IsTimedOut()) {
for (FSReadRequest& req : read_reqs) {
req.status = s;
}
} else {
file->MultiRead(opts, &read_reqs[0], read_reqs.size(), nullptr);
}
}

idx_in_batch = 0;
size_t valid_batch_idx = 0;
Expand Down
Loading

0 comments on commit a45f154

Please sign in to comment.