Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry block reads on checksum mismatch #12427

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
218 changes: 218 additions & 0 deletions db/db_io_failure_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,100 @@
#include "util/random.h"

namespace ROCKSDB_NAMESPACE {
namespace {
// A wrapper that allows injection of errors.
class CorruptionFS : public FileSystemWrapper {
public:
bool writable_file_error_;
int num_writable_file_errors_;

explicit CorruptionFS(const std::shared_ptr<FileSystem>& _target)
: FileSystemWrapper(_target),
writable_file_error_(false),
num_writable_file_errors_(0),
corruption_trigger_(INT_MAX),
read_count_(0),
rnd_(300) {}
~CorruptionFS() override {
// Assert that the corruption was reset, which means it got triggered
assert(corruption_trigger_ == INT_MAX);
}
const char* Name() const override { return "ErrorEnv"; }

IOStatus NewWritableFile(const std::string& fname, const FileOptions& opts,
std::unique_ptr<FSWritableFile>* result,
IODebugContext* dbg) override {
result->reset();
if (writable_file_error_) {
++num_writable_file_errors_;
return IOStatus::IOError(fname, "fake error");
}
return target()->NewWritableFile(fname, opts, result, dbg);
}

void SetCorruptionTrigger(const int trigger) {
corruption_trigger_ = trigger;
read_count_ = 0;
}

IOStatus NewRandomAccessFile(const std::string& fname,
const FileOptions& opts,
std::unique_ptr<FSRandomAccessFile>* result,
IODebugContext* dbg) override {
class CorruptionRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
public:
CorruptionRandomAccessFile(CorruptionFS& fs,
std::unique_ptr<FSRandomAccessFile>& file)
: FSRandomAccessFileOwnerWrapper(std::move(file)), fs_(fs) {}

IOStatus Read(uint64_t offset, size_t len, const IOOptions& opts,
Slice* result, char* scratch,
IODebugContext* dbg) const override {
IOStatus s = target()->Read(offset, len, opts, result, scratch, dbg);
if (opts.verify_and_reconstruct_read) {
return s;
}
if (s.ok() && ++fs_.read_count_ >= fs_.corruption_trigger_) {
fs_.read_count_ = 0;
fs_.corruption_trigger_ = INT_MAX;
char* data = const_cast<char*>(result->data());
std::memcpy(
data,
fs_.rnd_.RandomString(static_cast<int>(result->size())).c_str(),
result->size());
}
return s;
}

IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs,
const IOOptions& options,
IODebugContext* dbg) override {
return FSRandomAccessFile::MultiRead(reqs, num_reqs, options, dbg);
}

private:
CorruptionFS& fs_;
};

std::unique_ptr<FSRandomAccessFile> file;
IOStatus s = target()->NewRandomAccessFile(fname, opts, &file, dbg);
EXPECT_OK(s);
result->reset(new CorruptionRandomAccessFile(*this, file));

return s;
}

void SupportedOps(int64_t& supported_ops) override {
supported_ops = 1 << FSSupportedOps::kVerifyAndReconstructRead |
1 << FSSupportedOps::kAsyncIO;
}

private:
int corruption_trigger_;
int read_count_;
Random rnd_;
};
} // anonymous namespace

class DBIOFailureTest : public DBTestBase {
public:
Expand Down Expand Up @@ -579,6 +673,130 @@ TEST_F(DBIOFailureTest, CompactionSstSyncError) {
ASSERT_EQ("bar3", Get(1, "foo"));
}
#endif // !(defined NDEBUG) || !defined(OS_WIN)

class DBIOCorruptionTest : public DBIOFailureTest,
public testing::WithParamInterface<bool> {
public:
DBIOCorruptionTest() : DBIOFailureTest() {
BlockBasedTableOptions bbto;
Options options = CurrentOptions();

base_env_ = env_;
EXPECT_NE(base_env_, nullptr);
fs_.reset(new CorruptionFS(base_env_->GetFileSystem()));
env_guard_ = NewCompositeEnv(fs_);
options.env = env_guard_.get();
bbto.num_file_reads_for_auto_readahead = 0;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
options.disable_auto_compactions = true;

Reopen(options);
}

~DBIOCorruptionTest() {
Close();
db_ = nullptr;
}

protected:
std::unique_ptr<Env> env_guard_;
std::shared_ptr<CorruptionFS> fs_;
Env* base_env_;
};

TEST_P(DBIOCorruptionTest, GetReadCorruptionRetry) {
CorruptionFS* fs =
static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());

ASSERT_OK(Put("key1", "val1"));
ASSERT_OK(Flush());
fs->SetCorruptionTrigger(1);

std::string val;
ReadOptions ro;
ro.async_io = GetParam();
ASSERT_OK(dbfull()->Get(ReadOptions(), "key1", &val));
ASSERT_EQ(val, "val1");
}

TEST_P(DBIOCorruptionTest, IterReadCorruptionRetry) {
CorruptionFS* fs =
static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());

ASSERT_OK(Put("key1", "val1"));
ASSERT_OK(Flush());
fs->SetCorruptionTrigger(1);

ReadOptions ro;
ro.readahead_size = 65536;
ro.async_io = GetParam();

Iterator* iter = dbfull()->NewIterator(ro);
iter->SeekToFirst();
while (iter->status().ok() && iter->Valid()) {
iter->Next();
}
ASSERT_OK(iter->status());
delete iter;
}

TEST_P(DBIOCorruptionTest, MultiGetReadCorruptionRetry) {
CorruptionFS* fs =
static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());

ASSERT_OK(Put("key1", "val1"));
ASSERT_OK(Put("key2", "val2"));
ASSERT_OK(Flush());
fs->SetCorruptionTrigger(1);

std::vector<std::string> keystr{"key1", "key2"};
std::vector<Slice> keys{Slice(keystr[0]), Slice(keystr[1])};
std::vector<PinnableSlice> values(keys.size());
std::vector<Status> statuses(keys.size());
ReadOptions ro;
ro.async_io = GetParam();
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data());
ASSERT_EQ(values[0].ToString(), "val1");
ASSERT_EQ(values[1].ToString(), "val2");
}

TEST_P(DBIOCorruptionTest, CompactionReadCorruptionRetry) {
CorruptionFS* fs =
static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());

ASSERT_OK(Put("key1", "val1"));
ASSERT_OK(Put("key3", "val3"));
ASSERT_OK(Flush());
ASSERT_OK(Put("key2", "val2"));
ASSERT_OK(Flush());
fs->SetCorruptionTrigger(1);
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));

std::string val;
ReadOptions ro;
ro.async_io = GetParam();
ASSERT_OK(dbfull()->Get(ro, "key1", &val));
ASSERT_EQ(val, "val1");
}

TEST_P(DBIOCorruptionTest, FlushReadCorruptionRetry) {
CorruptionFS* fs =
static_cast<CorruptionFS*>(env_guard_->GetFileSystem().get());

ASSERT_OK(Put("key1", "val1"));
fs->SetCorruptionTrigger(1);
ASSERT_OK(Flush());

std::string val;
ReadOptions ro;
ro.async_io = GetParam();
ASSERT_OK(dbfull()->Get(ro, "key1", &val));
ASSERT_EQ(val, "val1");
}

INSTANTIATE_TEST_CASE_P(DBIOCorruptionTest, DBIOCorruptionTest,
testing::Bool());
} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
3 changes: 2 additions & 1 deletion include/rocksdb/file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ struct IOOptions {
rate_limiter_priority(Env::IO_TOTAL),
type(IOType::kUnknown),
force_dir_fsync(force_dir_fsync_),
do_not_recurse(false) {}
do_not_recurse(false),
verify_and_reconstruct_read(false) {}
};

struct DirFsyncOptions {
Expand Down
3 changes: 3 additions & 0 deletions include/rocksdb/statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,9 @@ enum Tickers : uint32_t {
// Number of FS reads avoided due to scan prefetching
PREFETCH_HITS,

// Footer corruption detected when opening an SST file for reading
SST_FOOTER_CORRUPTION_COUNT,

TICKER_ENUM_MAX
};

Expand Down
4 changes: 4 additions & 0 deletions java/rocksjni/portal.h
Original file line number Diff line number Diff line change
Expand Up @@ -5267,6 +5267,8 @@ class TickerTypeJni {
return -0x52;
case ROCKSDB_NAMESPACE::Tickers::PREFETCH_HITS:
return -0x53;
case ROCKSDB_NAMESPACE::Tickers::SST_FOOTER_CORRUPTION_COUNT:
return -0x55;
case ROCKSDB_NAMESPACE::Tickers::TICKER_ENUM_MAX:
// -0x54 is the max value at this time. Since these values are exposed
// directly to Java clients, we'll keep the value the same till the next
Expand Down Expand Up @@ -5722,6 +5724,8 @@ class TickerTypeJni {
return ROCKSDB_NAMESPACE::Tickers::PREFETCH_BYTES_USEFUL;
case -0x53:
return ROCKSDB_NAMESPACE::Tickers::PREFETCH_HITS;
case -0x55:
return ROCKSDB_NAMESPACE::Tickers::SST_FOOTER_CORRUPTION_COUNT;
case -0x54:
// -0x54 is the max value at this time. Since these values are exposed
// directly to Java clients, we'll keep the value the same till the next
Expand Down
2 changes: 2 additions & 0 deletions java/src/main/java/org/rocksdb/TickerType.java
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,8 @@ public enum TickerType {

PREFETCH_HITS((byte) -0x53),

SST_FOOTER_CORRUPTION_COUNT((byte) -0x55),

TICKER_ENUM_MAX((byte) -0x54);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this value be updated after the addition of SST_FOOTER_CORRUPTION_COUNT?


private final byte value;
Expand Down
1 change: 1 addition & 0 deletions monitoring/statistics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{PREFETCH_BYTES, "rocksdb.prefetch.bytes"},
{PREFETCH_BYTES_USEFUL, "rocksdb.prefetch.bytes.useful"},
{PREFETCH_HITS, "rocksdb.prefetch.hits"},
{SST_FOOTER_CORRUPTION_COUNT, "rocksdb.footer.corruption.count"},
};

const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {
Expand Down
13 changes: 13 additions & 0 deletions table/block_based/block_based_table_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,19 @@ Status BlockBasedTable::Open(
prefetch_buffer.get(), file_size, &footer,
kBlockBasedTableMagicNumber);
}
// If the footer is corrupted and the FS supports checksum verification and
// correction, try reading the footer again
if (s.IsCorruption()) {
RecordTick(ioptions.statistics.get(), SST_FOOTER_CORRUPTION_COUNT);
if (CheckFSFeatureSupport(ioptions.fs.get(),
FSSupportedOps::kVerifyAndReconstructRead)) {
IOOptions retry_opts = opts;
retry_opts.verify_and_reconstruct_read = true;
s = ReadFooterFromFile(retry_opts, file.get(), *ioptions.fs,
prefetch_buffer.get(), file_size, &footer,
kBlockBasedTableMagicNumber);
}
}
if (!s.ok()) {
return s;
}
Expand Down
30 changes: 28 additions & 2 deletions table/block_based/block_based_table_reader_sync_and_async.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,16 +214,42 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks)

if (options.verify_checksums) {
PERF_TIMER_GUARD(block_checksum_time);
const char* data = req.result.data();
const char* data = serialized_block.data.data();
// Since the scratch might be shared, the offset of the data block in
// the buffer might not be 0. req.result.data() only point to the
// begin address of each read request, we need to add the offset
// in each read request. Checksum is stored in the block trailer,
// beyond the payload size.
s = VerifyBlockChecksum(footer, data + req_offset, handle.size(),
s = VerifyBlockChecksum(footer, data, handle.size(),
rep_->file->file_name(), handle.offset());
RecordTick(ioptions.stats, BLOCK_CHECKSUM_COMPUTE_COUNT);
TEST_SYNC_POINT_CALLBACK("RetrieveMultipleBlocks:VerifyChecksum", &s);
if (!s.ok() &&
CheckFSFeatureSupport(ioptions.fs.get(),
FSSupportedOps::kVerifyAndReconstructRead)) {
assert(s.IsCorruption());
assert(!ioptions.allow_mmap_reads);
RecordTick(ioptions.stats, BLOCK_CHECKSUM_MISMATCH_COUNT);

// Repeat the read for this particular block using the regular
// synchronous Read API. We can use the same chunk of memory
// pointed to by data, since the size is identical and we know
// its not a memory mapped file
Slice result;
IOOptions opts;
IOStatus io_s = file->PrepareIOOptions(options, opts);
opts.verify_and_reconstruct_read = true;
io_s = file->Read(opts, handle.offset(), BlockSizeWithTrailer(handle),
&result, const_cast<char*>(data), nullptr);
if (io_s.ok()) {
assert(result.data() == data);
assert(result.size() == BlockSizeWithTrailer(handle));
s = VerifyBlockChecksum(footer, data, handle.size(),
rep_->file->file_name(), handle.offset());
} else {
s = io_s;
}
}
}
} else if (!use_shared_buffer) {
// Free the allocated scratch buffer.
Expand Down