From 08683ee5605394e362aa7d7499fd3830bc5d84ba Mon Sep 17 00:00:00 2001 From: sdong Date: Mon, 17 May 2021 22:41:09 -0700 Subject: [PATCH 1/7] Handle return code by io_uring_submit_and_wait() and io_uring_wait_cqe(). Summary: Right now return codes by io_uring_submit_and_wait() and io_uring_wait_cqe() are not handled. It is not the good practice. Although these two functions are not supposed to return non-0 values in normal exeuction, people suspect that they might return non-0 value when an interruption happens, and the code might cause hanging. Test Plan: Make sure at least normal test cases still pass. --- HISTORY.md | 1 + env/io_posix.cc | 19 ++++++++++++++++--- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 1f23fac2f70..356ddacc589 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,7 @@ * Fixed a data race between insertion into memtables and the retrieval of the DB properties `rocksdb.cur-size-active-mem-table`, `rocksdb.cur-size-all-mem-tables`, and `rocksdb.size-all-mem-tables`. * Fixed the false-positive alert when recovering from the WAL file. Avoid reporting "SST file is ahead of WAL" on a newly created empty column family, if the previous WAL file is corrupted. * Fixed a bug where `GetLiveFiles()` output included a non-existent file called "OPTIONS-000000". Backups and checkpoints, which use `GetLiveFiles()`, failed on DBs impacted by this bug. Read-write DBs were impacted when the latest OPTIONS file failed to write and `fail_if_options_file_error == false`. Read-only DBs were impacted when no OPTIONS files existed. +* Handle return code by io_uring_submit_and_wait() and io_uring_wait_cqe(). ### Behavior Changes * Due to the fix of false-postive alert of "SST file is ahead of WAL", all the CFs with no SST file (CF empty) will bypass the consistency check. We fixed a false-positive, but introduced a very rare true-negative which will be triggered in the following conditions: A CF with some delete operations in the last a few queries which will result in an empty CF (those are flushed to SST file and a compaction triggered which combines this file and all other SST files and generates an empty CF, or there is another reason to write a manifest entry for this CF after a flush that generates no SST file from an empty CF). The deletion entries are logged in a WAL and this WAL was corrupted, while the CF's log number points to the next WAL (due to the flush). Therefore, the DB can only recover to the point without these trailing deletions and cause the inconsistent DB status. diff --git a/env/io_posix.cc b/env/io_posix.cc index de0a7453972..44c11edaea1 100644 --- a/env/io_posix.cc +++ b/env/io_posix.cc @@ -633,6 +633,8 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, return FSRandomAccessFile::MultiRead(reqs, num_reqs, options, dbg); } + IOStatus ios = IOStatus::OK(); + struct WrappedReadRequest { FSReadRequest* req; struct iovec iov; @@ -681,8 +683,13 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, io_uring_submit_and_wait(iu, static_cast(this_reqs)); if (static_cast(ret) != this_reqs) { fprintf(stderr, "ret = %ld this_reqs: %ld\n", (long)ret, (long)this_reqs); + assert(false); + // It should not happen. In case it happens, we simply return to prevent + // following io_uring callse to hang. + return IOStatus::IOError("io_uring_submit_and_wait() requested " + + ToString(this_reqs) + " but returned " + + ToString(ret)); } - assert(static_cast(ret) == this_reqs); for (size_t i = 0; i < this_reqs; i++) { struct io_uring_cqe* cqe; @@ -691,7 +698,13 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, // We could use the peek variant here, but this seems safer in terms // of our initial wait not reaping all completions ret = io_uring_wait_cqe(iu, &cqe); - assert(!ret); + if (!ret) { + assert(false); + ios = IOStatus::IOError("io_uring_wait_cqe() returns " + ToString(ret)); + // It's not safe to use cqe anymore, so we don't konw which reqeuest + // it is. + continue; + } req_wrap = static_cast(io_uring_cqe_get_data(cqe)); FSReadRequest* req = req_wrap->req; @@ -740,7 +753,7 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, io_uring_cqe_seen(iu, cqe); } } - return IOStatus::OK(); + return ios; #else return FSRandomAccessFile::MultiRead(reqs, num_reqs, options, dbg); #endif From f50869d608df2566d0688c624e8bd2d63cf011d2 Mon Sep 17 00:00:00 2001 From: sdong Date: Tue, 18 May 2021 10:30:14 -0700 Subject: [PATCH 2/7] Fix a bug --- env/io_posix.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/env/io_posix.cc b/env/io_posix.cc index 44c11edaea1..f49e30c81d9 100644 --- a/env/io_posix.cc +++ b/env/io_posix.cc @@ -698,9 +698,9 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, // We could use the peek variant here, but this seems safer in terms // of our initial wait not reaping all completions ret = io_uring_wait_cqe(iu, &cqe); - if (!ret) { + if (ret) { assert(false); - ios = IOStatus::IOError("io_uring_wait_cqe() returns " + ToString(ret)); + ios = IOStatus::IOError("io_uring_wait_cqe() returns 0"); // It's not safe to use cqe anymore, so we don't konw which reqeuest // it is. continue; From 93293e83e2df4832d07a958bf1a6bf2cbaaa8e7c Mon Sep 17 00:00:00 2001 From: sdong Date: Tue, 18 May 2021 10:34:32 -0700 Subject: [PATCH 3/7] Another fix --- env/io_posix.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/env/io_posix.cc b/env/io_posix.cc index f49e30c81d9..0eb117cc524 100644 --- a/env/io_posix.cc +++ b/env/io_posix.cc @@ -700,7 +700,7 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, ret = io_uring_wait_cqe(iu, &cqe); if (ret) { assert(false); - ios = IOStatus::IOError("io_uring_wait_cqe() returns 0"); + ios = IOStatus::IOError("io_uring_wait_cqe() returns " + ToString(ret)); // It's not safe to use cqe anymore, so we don't konw which reqeuest // it is. continue; From f3522a10bb9e482e8ae39d5b53a8b724c851b4eb Mon Sep 17 00:00:00 2001 From: sdong Date: Tue, 18 May 2021 12:02:38 -0700 Subject: [PATCH 4/7] Add unit tests to handle those failure handling logics. --- env/env_test.cc | 114 ++++++++++++++++++++++++++++++++++++++++++++++++ env/io_posix.cc | 28 +++++++++--- 2 files changed, 136 insertions(+), 6 deletions(-) diff --git a/env/env_test.cc b/env/env_test.cc index 1c4340b43ec..0408ca4eb80 100644 --- a/env/env_test.cc +++ b/env/env_test.cc @@ -11,6 +11,11 @@ #include #endif +#if defined(ROCKSDB_IOURING_PRESENT) +#include +#include +#endif + #include #include @@ -1359,6 +1364,115 @@ TEST_F(EnvPosixTest, MultiReadNonAlignedLargeNum) { } } +void GenerateFilesAndRequest(Env* env, const std::string& fname, + std::vector* ret_reqs, + std::vector* scratches) { + const size_t kTotalSize = 81920; + Random rnd(301); + std::string expected_data = rnd.RandomString(kTotalSize); + + // Create file. + { + std::unique_ptr wfile; + ASSERT_OK(env->NewWritableFile(fname, &wfile, EnvOptions())); + ASSERT_OK(wfile->Append(expected_data)); + ASSERT_OK(wfile->Close()); + } + + // Right now kIoUringDepth is hard coded as 256, so we need very large + // number of keys to cover the case of multiple rounds of submissions. + // Right now the test latency is still acceptable. If it ends up with + // too long, we can modify the io uring depth with SyncPoint here. + const int num_reads = 3; + std::vector offsets = {10000, 20000, 30000}; + std::vector lens = {3000, 200, 100}; + + // Create requests + scratches->reserve(num_reads); + std::vector& reqs = *ret_reqs; + reqs.resize(num_reads); + for (int i = 0; i < num_reads; ++i) { + reqs[i].offset = offsets[i]; + reqs[i].len = lens[i]; + scratches->emplace_back(reqs[i].len, ' '); + reqs[i].scratch = const_cast(scratches->back().data()); + } +} + +TEST_F(EnvPosixTest, MultiReadIOUringError) { + // In this test we don't do aligned read, wo it doesn't work for + // direct I/O case. + EnvOptions soptions; + soptions.use_direct_reads = soptions.use_direct_writes = false; + std::string fname = test::PerThreadDBPath(env_, "testfile"); + + std::vector scratches; + std::vector reqs; + GenerateFilesAndRequest(env_, fname, &reqs, &scratches); + // Query the data + std::unique_ptr file; + ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); + + bool io_uring_wait_cqe_called = false; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "PosixRandomAccessFile::MultiRead:io_uring_wait_cqe:return", + [&](void* arg) { + if (!io_uring_wait_cqe_called) { + io_uring_wait_cqe_called = true; + ssize_t& ret = *(static_cast(arg)); + ret = 1; + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + Status s = file->MultiRead(reqs.data(), reqs.size()); + if (io_uring_wait_cqe_called) { + ASSERT_NOK(s); + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(EnvPosixTest, MultiReadIOUringError2) { + // In this test we don't do aligned read, wo it doesn't work for + // direct I/O case. + EnvOptions soptions; + soptions.use_direct_reads = soptions.use_direct_writes = false; + std::string fname = test::PerThreadDBPath(env_, "testfile"); + + std::vector scratches; + std::vector reqs; + GenerateFilesAndRequest(env_, fname, &reqs, &scratches); + // Query the data + std::unique_ptr file; + ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); + + bool io_uring_submit_and_wait_called = false; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return1", + [&](void* arg) { + io_uring_submit_and_wait_called = true; + ssize_t* ret = static_cast(arg); + (*ret)--; + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return2", + [&](void* arg) { + struct io_uring* iu = static_cast(arg); + struct io_uring_cqe* cqe; + assert(io_uring_wait_cqe(iu, &cqe) == 0); + io_uring_cqe_seen(iu, cqe); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + Status s = file->MultiRead(reqs.data(), reqs.size()); + if (io_uring_submit_and_wait_called) { + ASSERT_NOK(s); + } + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + // Only works in linux platforms #ifdef OS_WIN TEST_P(EnvPosixTestWithParam, DISABLED_InvalidateCache) { diff --git a/env/io_posix.cc b/env/io_posix.cc index 0eb117cc524..ad92226859f 100644 --- a/env/io_posix.cc +++ b/env/io_posix.cc @@ -681,28 +681,44 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, ssize_t ret = io_uring_submit_and_wait(iu, static_cast(this_reqs)); + TEST_SYNC_POINT_CALLBACK( + "PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return1", + &ret); + TEST_SYNC_POINT_CALLBACK( + "PosixRandomAccessFile::MultiRead:io_uring_submit_and_wait:return2", + iu); + if (static_cast(ret) != this_reqs) { fprintf(stderr, "ret = %ld this_reqs: %ld\n", (long)ret, (long)this_reqs); - assert(false); // It should not happen. In case it happens, we simply return to prevent - // following io_uring callse to hang. + // following io_uring callse to hang. However, we still need to consume + // results. + for (ssize_t i = 0; i < ret; i++) { + struct io_uring_cqe* cqe; + if (io_uring_wait_cqe(iu, &cqe) == 0) { // Success + io_uring_cqe_seen(iu, cqe); + } + } return IOStatus::IOError("io_uring_submit_and_wait() requested " + ToString(this_reqs) + " but returned " + ToString(ret)); } for (size_t i = 0; i < this_reqs; i++) { - struct io_uring_cqe* cqe; + struct io_uring_cqe* cqe = nullptr; WrappedReadRequest* req_wrap; // We could use the peek variant here, but this seems safer in terms // of our initial wait not reaping all completions ret = io_uring_wait_cqe(iu, &cqe); + TEST_SYNC_POINT_CALLBACK( + "PosixRandomAccessFile::MultiRead:io_uring_wait_cqe:return", &ret); if (ret) { - assert(false); ios = IOStatus::IOError("io_uring_wait_cqe() returns " + ToString(ret)); - // It's not safe to use cqe anymore, so we don't konw which reqeuest - // it is. + + if (cqe != nullptr) { + io_uring_cqe_seen(iu, cqe); + } continue; } From a8e92ade7321ca99e24539246bfda486cc636a39 Mon Sep 17 00:00:00 2001 From: sdong Date: Tue, 18 May 2021 12:06:39 -0700 Subject: [PATCH 5/7] Improve comments. --- env/io_posix.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/env/io_posix.cc b/env/io_posix.cc index ad92226859f..8c6230f134a 100644 --- a/env/io_posix.cc +++ b/env/io_posix.cc @@ -690,9 +690,9 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, if (static_cast(ret) != this_reqs) { fprintf(stderr, "ret = %ld this_reqs: %ld\n", (long)ret, (long)this_reqs); - // It should not happen. In case it happens, we simply return to prevent - // following io_uring callse to hang. However, we still need to consume - // results. + // If error happens and we submitted fewer than expected, it is an + // exception case and we don't retry here. We should still consume + // what is is submitted in the ring. for (ssize_t i = 0; i < ret; i++) { struct io_uring_cqe* cqe; if (io_uring_wait_cqe(iu, &cqe) == 0) { // Success From 99aa6851174ee3d0842eb2ddef32698f68f504f1 Mon Sep 17 00:00:00 2001 From: sdong Date: Tue, 18 May 2021 12:08:50 -0700 Subject: [PATCH 6/7] Always call io_uring_cqe_seen() as long as cqe is not empty. --- env/io_posix.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/env/io_posix.cc b/env/io_posix.cc index 8c6230f134a..a041b32aa6a 100644 --- a/env/io_posix.cc +++ b/env/io_posix.cc @@ -694,8 +694,9 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, // exception case and we don't retry here. We should still consume // what is is submitted in the ring. for (ssize_t i = 0; i < ret; i++) { - struct io_uring_cqe* cqe; - if (io_uring_wait_cqe(iu, &cqe) == 0) { // Success + struct io_uring_cqe* cqe = nullptr; + io_uring_wait_cqe(iu, &cqe); + if (cqe != nullptr) { io_uring_cqe_seen(iu, cqe); } } From 4219ed599f3e6b317a59f96f08b786456d4fcd53 Mon Sep 17 00:00:00 2001 From: sdong Date: Tue, 18 May 2021 14:33:18 -0700 Subject: [PATCH 7/7] Disable new tests if IO Uring is not available. --- env/env_test.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/env/env_test.cc b/env/env_test.cc index 0408ca4eb80..c1d7173ffbf 100644 --- a/env/env_test.cc +++ b/env/env_test.cc @@ -1364,6 +1364,7 @@ TEST_F(EnvPosixTest, MultiReadNonAlignedLargeNum) { } } +#if defined(ROCKSDB_IOURING_PRESENT) void GenerateFilesAndRequest(Env* env, const std::string& fname, std::vector* ret_reqs, std::vector* scratches) { @@ -1472,6 +1473,7 @@ TEST_F(EnvPosixTest, MultiReadIOUringError2) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); } +#endif // ROCKSDB_IOURING_PRESENT // Only works in linux platforms #ifdef OS_WIN