Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle return code by io_uring_submit_and_wait() and io_uring_wait_cqe() #8311

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* Fixed a data race between insertion into memtables and the retrieval of the DB properties `rocksdb.cur-size-active-mem-table`, `rocksdb.cur-size-all-mem-tables`, and `rocksdb.size-all-mem-tables`.
* Fixed the false-positive alert when recovering from the WAL file. Avoid reporting "SST file is ahead of WAL" on a newly created empty column family, if the previous WAL file is corrupted.
* Fixed a bug where `GetLiveFiles()` output included a non-existent file called "OPTIONS-000000". Backups and checkpoints, which use `GetLiveFiles()`, failed on DBs impacted by this bug. Read-write DBs were impacted when the latest OPTIONS file failed to write and `fail_if_options_file_error == false`. Read-only DBs were impacted when no OPTIONS files existed.
* Handle return code by io_uring_submit_and_wait() and io_uring_wait_cqe().

### Behavior Changes
* Due to the fix of false-postive alert of "SST file is ahead of WAL", all the CFs with no SST file (CF empty) will bypass the consistency check. We fixed a false-positive, but introduced a very rare true-negative which will be triggered in the following conditions: A CF with some delete operations in the last a few queries which will result in an empty CF (those are flushed to SST file and a compaction triggered which combines this file and all other SST files and generates an empty CF, or there is another reason to write a manifest entry for this CF after a flush that generates no SST file from an empty CF). The deletion entries are logged in a WAL and this WAL was corrupted, while the CF's log number points to the next WAL (due to the flush). Therefore, the DB can only recover to the point without these trailing deletions and cause the inconsistent DB status.
Expand Down
116 changes: 116 additions & 0 deletions env/env_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
#include <sys/ioctl.h>
#endif

#if defined(ROCKSDB_IOURING_PRESENT)
#include <liburing.h>
#include <sys/uio.h>
#endif

#include <sys/types.h>

#include <iostream>
Expand Down Expand Up @@ -1359,6 +1364,117 @@ TEST_F(EnvPosixTest, MultiReadNonAlignedLargeNum) {
}
}

#if defined(ROCKSDB_IOURING_PRESENT)
void GenerateFilesAndRequest(Env* env, const std::string& fname,
std::vector<ReadRequest>* ret_reqs,
std::vector<std::string>* scratches) {
const size_t kTotalSize = 81920;
Random rnd(301);
std::string expected_data = rnd.RandomString(kTotalSize);

// Create file.
{
std::unique_ptr<WritableFile> wfile;
ASSERT_OK(env->NewWritableFile(fname, &wfile, EnvOptions()));
ASSERT_OK(wfile->Append(expected_data));
ASSERT_OK(wfile->Close());
}

// Right now kIoUringDepth is hard coded as 256, so we need very large
// number of keys to cover the case of multiple rounds of submissions.
// Right now the test latency is still acceptable. If it ends up with
// too long, we can modify the io uring depth with SyncPoint here.
const int num_reads = 3;
std::vector<size_t> offsets = {10000, 20000, 30000};
std::vector<size_t> lens = {3000, 200, 100};

// Create requests
scratches->reserve(num_reads);
std::vector<ReadRequest>& reqs = *ret_reqs;
reqs.resize(num_reads);
for (int i = 0; i < num_reads; ++i) {
reqs[i].offset = offsets[i];
reqs[i].len = lens[i];
scratches->emplace_back(reqs[i].len, ' ');
reqs[i].scratch = const_cast<char*>(scratches->back().data());
}
}

TEST_F(EnvPosixTest, MultiReadIOUringError) {
// In this test we don't do aligned read, wo it doesn't work for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is copied from line 1276...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just was not know what "wo" means. Write only or is it a typo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know either...

// direct I/O case.
EnvOptions soptions;
soptions.use_direct_reads = soptions.use_direct_writes = false;
std::string fname = test::PerThreadDBPath(env_, "testfile");

std::vector<std::string> scratches;
std::vector<ReadRequest> reqs;
GenerateFilesAndRequest(env_, fname, &reqs, &scratches);
// Query the data
std::unique_ptr<RandomAccessFile> file;
ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));

bool io_uring_wait_cqe_called = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
Comment on lines +1417 to +1418
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens on platforms that do not support io_uring?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

io_uring_wait_cqe_called will not be called and we won't check return value to be non-OK.

"PosixRandomAccessFile::MultiRead:io_uring_wait_cqe:return",
[&](void* arg) {
if (!io_uring_wait_cqe_called) {
io_uring_wait_cqe_called = true;
ssize_t& ret = *(static_cast<ssize_t*>(arg));
ret = 1;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

Status s = file->MultiRead(reqs.data(), reqs.size());
if (io_uring_wait_cqe_called) {
ASSERT_NOK(s);
}
Comment on lines +1429 to +1432
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it should fail the ASSERT_STATUS_CHECKED calls if the platform does not support io_uring.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "fail the ASSERT_STATUS_CHECKED calls" mean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you compile with ASSERT_STATUS_CHECKED=1, then a status code must be checked or it aborts with a stack trace. If the uring condition is false, then "s" is never checked.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, good point. Since the tests are passing, I am going to do it as a follow up.


ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}

TEST_F(EnvPosixTest, MultiReadIOUringError2) {
// In this test we don't do aligned read, wo it doesn't work for
// direct I/O case.
EnvOptions soptions;
soptions.use_direct_reads = soptions.use_direct_writes = false;
std::string fname = test::PerThreadDBPath(env_, "testfile");

std::vector<std::string> scratches;
std::vector<ReadRequest> reqs;
GenerateFilesAndRequest(env_, fname, &reqs, &scratches);
// Query the data
std::unique_ptr<RandomAccessFile> file;
ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));

bool io_uring_submit_and_wait_called = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return1",
[&](void* arg) {
io_uring_submit_and_wait_called = true;
ssize_t* ret = static_cast<ssize_t*>(arg);
(*ret)--;
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return2",
[&](void* arg) {
struct io_uring* iu = static_cast<struct io_uring*>(arg);
struct io_uring_cqe* cqe;
assert(io_uring_wait_cqe(iu, &cqe) == 0);
io_uring_cqe_seen(iu, cqe);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

Status s = file->MultiRead(reqs.data(), reqs.size());
if (io_uring_submit_and_wait_called) {
ASSERT_NOK(s);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it should fail the ASSERT_STATUS_CHECKED calls if the platform does not support io_uring.


ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
#endif // ROCKSDB_IOURING_PRESENT

// Only works in linux platforms
#ifdef OS_WIN
TEST_P(EnvPosixTestWithParam, DISABLED_InvalidateCache) {
Expand Down
38 changes: 34 additions & 4 deletions env/io_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,8 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs,
return FSRandomAccessFile::MultiRead(reqs, num_reqs, options, dbg);
}

IOStatus ios = IOStatus::OK();

struct WrappedReadRequest {
FSReadRequest* req;
struct iovec iov;
Expand Down Expand Up @@ -679,19 +681,47 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs,

ssize_t ret =
io_uring_submit_and_wait(iu, static_cast<unsigned int>(this_reqs));
TEST_SYNC_POINT_CALLBACK(
"PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return1",
&ret);
TEST_SYNC_POINT_CALLBACK(
"PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return2",
iu);

if (static_cast<size_t>(ret) != this_reqs) {
fprintf(stderr, "ret = %ld this_reqs: %ld\n", (long)ret, (long)this_reqs);
// If error happens and we submitted fewer than expected, it is an
// exception case and we don't retry here. We should still consume
// what is is submitted in the ring.
for (ssize_t i = 0; i < ret; i++) {
struct io_uring_cqe* cqe = nullptr;
io_uring_wait_cqe(iu, &cqe);
if (cqe != nullptr) {
io_uring_cqe_seen(iu, cqe);
}
}
return IOStatus::IOError("io_uring_submit_and_wait() requested " +
ToString(this_reqs) + " but returned " +
ToString(ret));
}
assert(static_cast<size_t>(ret) == this_reqs);

for (size_t i = 0; i < this_reqs; i++) {
struct io_uring_cqe* cqe;
struct io_uring_cqe* cqe = nullptr;
WrappedReadRequest* req_wrap;

// We could use the peek variant here, but this seems safer in terms
// of our initial wait not reaping all completions
ret = io_uring_wait_cqe(iu, &cqe);
assert(!ret);
TEST_SYNC_POINT_CALLBACK(
"PosixRandomAccessFile::MultiRead:io_uring_wait_cqe:return", &ret);
if (ret) {
ios = IOStatus::IOError("io_uring_wait_cqe() returns " + ToString(ret));

if (cqe != nullptr) {
io_uring_cqe_seen(iu, cqe);
}
continue;
}

req_wrap = static_cast<WrappedReadRequest*>(io_uring_cqe_get_data(cqe));
FSReadRequest* req = req_wrap->req;
Expand Down Expand Up @@ -740,7 +770,7 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs,
io_uring_cqe_seen(iu, cqe);
}
}
return IOStatus::OK();
return ios;
#else
return FSRandomAccessFile::MultiRead(reqs, num_reqs, options, dbg);
#endif
Expand Down