Skip to content

Commit

Permalink
Do not swallow error returned from SaveTo() (#6801)
Browse files Browse the repository at this point in the history
Summary:
With consistency check enabled, VersionBuilder::SaveTo() may return error once
corruption is detected while building versions. We should handle these errors.
Pull Request resolved: #6801

Test Plan: make check

Reviewed By: siying

Differential Revision: D21385045

Pulled By: riversand963

fbshipit-source-id: 98f6424e2a4699b62befa21e9fe00e70a771118e
  • Loading branch information
riversand963 authored and facebook-github-bot committed May 5, 2020
1 parent 5a61e78 commit 5584595
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 47 deletions.
22 changes: 22 additions & 0 deletions db/db_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1774,6 +1774,28 @@ TEST_F(DBBasicTest, IncrementalRecoveryNoCorrupt) {
}
}

TEST_F(DBBasicTest, BestEffortsRecoveryWithVersionBuildingFailure) {
Options options = CurrentOptions();
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "value"));
ASSERT_OK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) {
ASSERT_NE(nullptr, arg);
*(reinterpret_cast<Status*>(arg)) =
Status::Corruption("Inject corruption");
});
SyncPoint::GetInstance()->EnableProcessing();

options.best_efforts_recovery = true;
Status s = TryReopen(options);
ASSERT_TRUE(s.IsCorruption());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}

#ifndef ROCKSDB_LITE
namespace {
class TableFileListener : public EventListener {
Expand Down
58 changes: 57 additions & 1 deletion db/db_impl/db_secondary_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class DBSecondaryTest : public DBTestBase {

void OpenSecondary(const Options& options);

Status TryOpenSecondary(const Options& options);

void OpenSecondaryWithColumnFamilies(
const std::vector<std::string>& column_families, const Options& options);

Expand All @@ -70,9 +72,13 @@ class DBSecondaryTest : public DBTestBase {
};

void DBSecondaryTest::OpenSecondary(const Options& options) {
ASSERT_OK(TryOpenSecondary(options));
}

Status DBSecondaryTest::TryOpenSecondary(const Options& options) {
Status s =
DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_secondary_);
ASSERT_OK(s);
return s;
}

void DBSecondaryTest::OpenSecondaryWithColumnFamilies(
Expand Down Expand Up @@ -858,6 +864,56 @@ TEST_F(DBSecondaryTest, CheckConsistencyWhenOpen) {
thread.join();
ASSERT_TRUE(called);
}

TEST_F(DBSecondaryTest, StartFromInconsistent) {
Options options = CurrentOptions();
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "value"));
ASSERT_OK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) {
ASSERT_NE(nullptr, arg);
*(reinterpret_cast<Status*>(arg)) =
Status::Corruption("Inject corruption");
});
SyncPoint::GetInstance()->EnableProcessing();
Options options1;
Status s = TryOpenSecondary(options1);
ASSERT_TRUE(s.IsCorruption());
}

TEST_F(DBSecondaryTest, InconsistencyDuringCatchUp) {
Options options = CurrentOptions();
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "value"));
ASSERT_OK(Flush());

Options options1;
OpenSecondary(options1);

{
std::string value;
ASSERT_OK(db_secondary_->Get(ReadOptions(), "foo", &value));
ASSERT_EQ("value", value);
}

ASSERT_OK(Put("bar", "value1"));
ASSERT_OK(Flush());

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"VersionBuilder::CheckConsistencyBeforeReturn", [&](void* arg) {
ASSERT_NE(nullptr, arg);
*(reinterpret_cast<Status*>(arg)) =
Status::Corruption("Inject corruption");
});
SyncPoint::GetInstance()->EnableProcessing();
Status s = db_secondary_->TryCatchUpWithPrimary();
ASSERT_TRUE(s.IsCorruption());
}
#endif //! ROCKSDB_LITE

} // namespace ROCKSDB_NAMESPACE
Expand Down
40 changes: 25 additions & 15 deletions db/version_edit_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -377,20 +377,26 @@ Status VersionEditHandler::MaybeCreateVersion(const VersionEdit& /*edit*/,
ColumnFamilyData* cfd,
bool force_create_version) {
assert(cfd->initialized());
Status s;
if (force_create_version) {
auto builder_iter = builders_.find(cfd->GetID());
assert(builder_iter != builders_.end());
auto* builder = builder_iter->second->version_builder();
auto* v = new Version(cfd, version_set_, version_set_->file_options_,
*cfd->GetLatestMutableCFOptions(),
version_set_->current_version_number_++);
builder->SaveTo(v->storage_info());
// Install new version
v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
!(version_set_->db_options_->skip_stats_update_on_db_open));
version_set_->AppendVersion(cfd, v);
s = builder->SaveTo(v->storage_info());
if (s.ok()) {
// Install new version
v->PrepareApply(
*cfd->GetLatestMutableCFOptions(),
!(version_set_->db_options_->skip_stats_update_on_db_open));
version_set_->AppendVersion(cfd, v);
} else {
delete v;
}
}
return Status::OK();
return s;
}

Status VersionEditHandler::LoadTables(ColumnFamilyData* cfd,
Expand Down Expand Up @@ -558,16 +564,20 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion(
auto* version = new Version(cfd, version_set_, version_set_->file_options_,
*cfd->GetLatestMutableCFOptions(),
version_set_->current_version_number_++);
builder->SaveTo(version->storage_info());
version->PrepareApply(
*cfd->GetLatestMutableCFOptions(),
!version_set_->db_options_->skip_stats_update_on_db_open);
auto v_iter = versions_.find(cfd->GetID());
if (v_iter != versions_.end()) {
delete v_iter->second;
v_iter->second = version;
s = builder->SaveTo(version->storage_info());
if (s.ok()) {
version->PrepareApply(
*cfd->GetLatestMutableCFOptions(),
!version_set_->db_options_->skip_stats_update_on_db_open);
auto v_iter = versions_.find(cfd->GetID());
if (v_iter != versions_.end()) {
delete v_iter->second;
v_iter->second = version;
} else {
versions_.emplace(cfd->GetID(), version);
}
} else {
versions_.emplace(cfd->GetID(), version);
delete version;
}
}
return s;
Expand Down
78 changes: 49 additions & 29 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6010,13 +6010,23 @@ Status ReactiveVersionSet::Recover(
Version* v = new Version(cfd, this, file_options_,
*cfd->GetLatestMutableCFOptions(),
current_version_number_++);
builder->SaveTo(v->storage_info());
s = builder->SaveTo(v->storage_info());

// Install recovered version
v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
!(db_options_->skip_stats_update_on_db_open));
AppendVersion(cfd, v);
if (s.ok()) {
// Install recovered version
v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
!(db_options_->skip_stats_update_on_db_open));
AppendVersion(cfd, v);
} else {
ROCKS_LOG_ERROR(db_options_->info_log,
"[%s]: inconsistent version: %s\n",
cfd->GetName().c_str(), s.ToString().c_str());
delete v;
break;
}
}
}
if (s.ok()) {
next_file_number_.store(version_edit.next_file_number_ + 1);
last_allocated_sequence_ = version_edit.last_sequence_;
last_published_sequence_ = version_edit.last_sequence_;
Expand Down Expand Up @@ -6093,6 +6103,8 @@ Status ReactiveVersionSet::ReadAndApply(
s = ApplyOneVersionEditToBuilder(edit, cfds_changed, &temp_edit);
if (s.ok()) {
applied_edits++;
} else {
break;
}
}
}
Expand All @@ -6102,13 +6114,14 @@ Status ReactiveVersionSet::ReadAndApply(
}
// It's possible that:
// 1) s.IsCorruption(), indicating the current MANIFEST is corrupted.
// Or the version(s) rebuilt from tailing the MANIFEST is inconsistent.
// 2) we have finished reading the current MANIFEST.
// 3) we have encountered an IOError reading the current MANIFEST.
// We need to look for the next MANIFEST and start from there. If we cannot
// find the next MANIFEST, we should exit the loop.
s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
Status tmp_s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
reader = manifest_reader->get();
if (s.ok()) {
if (tmp_s.ok()) {
if (reader->file()->file_name() == old_manifest_path) {
// Still processing the same MANIFEST, thus no need to continue this
// loop since no record is available if we have reached here.
Expand Down Expand Up @@ -6138,6 +6151,7 @@ Status ReactiveVersionSet::ReadAndApply(
number_of_edits_to_skip_ += 2;
}
}
s = tmp_s;
}
}
}
Expand Down Expand Up @@ -6230,36 +6244,42 @@ Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
auto version = new Version(cfd, this, file_options_,
*cfd->GetLatestMutableCFOptions(),
current_version_number_++);
builder->SaveTo(version->storage_info());
version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true);
AppendVersion(cfd, version);
active_version_builders_.erase(builder_iter);
if (cfds_changed->count(cfd) == 0) {
cfds_changed->insert(cfd);
s = builder->SaveTo(version->storage_info());
if (s.ok()) {
version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true);
AppendVersion(cfd, version);
active_version_builders_.erase(builder_iter);
if (cfds_changed->count(cfd) == 0) {
cfds_changed->insert(cfd);
}
} else {
delete version;
}
} else if (s.IsPathNotFound()) {
s = Status::OK();
}
// Some other error has occurred during LoadTableHandlers.
}

if (version_edit->HasNextFile()) {
next_file_number_.store(version_edit->next_file_number_ + 1);
}
if (version_edit->has_last_sequence_) {
last_allocated_sequence_ = version_edit->last_sequence_;
last_published_sequence_ = version_edit->last_sequence_;
last_sequence_ = version_edit->last_sequence_;
}
if (version_edit->has_prev_log_number_) {
prev_log_number_ = version_edit->prev_log_number_;
MarkFileNumberUsed(version_edit->prev_log_number_);
}
if (version_edit->has_log_number_) {
MarkFileNumberUsed(version_edit->log_number_);
if (s.ok()) {
if (version_edit->HasNextFile()) {
next_file_number_.store(version_edit->next_file_number_ + 1);
}
if (version_edit->has_last_sequence_) {
last_allocated_sequence_ = version_edit->last_sequence_;
last_published_sequence_ = version_edit->last_sequence_;
last_sequence_ = version_edit->last_sequence_;
}
if (version_edit->has_prev_log_number_) {
prev_log_number_ = version_edit->prev_log_number_;
MarkFileNumberUsed(version_edit->prev_log_number_);
}
if (version_edit->has_log_number_) {
MarkFileNumberUsed(version_edit->log_number_);
}
column_family_set_->UpdateMaxColumnFamily(version_edit->max_column_family_);
MarkMinLogNumberToKeep2PC(version_edit->min_log_number_to_keep_);
}
column_family_set_->UpdateMaxColumnFamily(version_edit->max_column_family_);
MarkMinLogNumberToKeep2PC(version_edit->min_log_number_to_keep_);
return s;
}

Expand Down
4 changes: 2 additions & 2 deletions db/version_set_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1448,7 +1448,7 @@ TEST_F(VersionSetAtomicGroupTest,
// Write the corrupted edits.
AddNewEditsToLog(kAtomicGroupSize);
mu.Lock();
EXPECT_OK(
EXPECT_NOK(
reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
mu.Unlock();
EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
Expand Down Expand Up @@ -1498,7 +1498,7 @@ TEST_F(VersionSetAtomicGroupTest,
&manifest_reader_status));
AddNewEditsToLog(kAtomicGroupSize);
mu.Lock();
EXPECT_OK(
EXPECT_NOK(
reactive_versions_->ReadAndApply(&mu, &manifest_reader, &cfds_changed));
mu.Unlock();
EXPECT_EQ(edits_[1].DebugString(),
Expand Down

0 comments on commit 5584595

Please sign in to comment.