Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make secondary instance use ManifestTailer #7998

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ ColumnFamilyData::~ColumnFamilyData() {

if (dummy_versions_ != nullptr) {
// List must be empty
assert(dummy_versions_->TEST_Next() == dummy_versions_);
assert(dummy_versions_->Next() == dummy_versions_);
bool deleted __attribute__((__unused__));
deleted = dummy_versions_->Unref();
assert(deleted);
Expand Down
3 changes: 2 additions & 1 deletion db/db_impl/db_impl_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,8 @@ Status DBImplSecondary::TryCatchUpWithPrimary() {
{
InstrumentedMutexLock lock_guard(&mutex_);
s = static_cast_with_check<ReactiveVersionSet>(versions_.get())
->ReadAndApply(&mutex_, &manifest_reader_, &cfds_changed);
->ReadAndApply(&mutex_, &manifest_reader_,
manifest_reader_status_.get(), &cfds_changed);

ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64,
static_cast<uint64_t>(versions_->LastSequence()));
Expand Down
29 changes: 8 additions & 21 deletions db/db_impl/db_secondary_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -459,20 +459,6 @@ TEST_F(DBSecondaryTest, MissingTableFileDuringOpen) {
}

TEST_F(DBSecondaryTest, MissingTableFile) {
int table_files_not_exist = 0;
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"ReactiveVersionSet::ApplyOneVersionEditToBuilder:AfterLoadTableHandlers",
[&](void* arg) {
Status s = *reinterpret_cast<Status*>(arg);
if (s.IsPathNotFound()) {
++table_files_not_exist;
} else if (!s.ok()) {
assert(false); // Should not reach here
}
});
SyncPoint::GetInstance()->EnableProcessing();
Options options;
options.env = env_;
options.level0_file_num_compaction_trigger = 4;
Expand All @@ -499,7 +485,6 @@ TEST_F(DBSecondaryTest, MissingTableFile) {
ASSERT_NOK(db_secondary_->Get(ropts, "bar", &value));

ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
ASSERT_EQ(options.level0_file_num_compaction_trigger, table_files_not_exist);
ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
ASSERT_EQ("foo_value" +
std::to_string(options.level0_file_num_compaction_trigger - 1),
Expand Down Expand Up @@ -615,10 +600,7 @@ TEST_F(DBSecondaryTest, SwitchManifest) {
range_scan_db();
}

// Here, "Snapshot" refers to the version edits written by
// VersionSet::WriteSnapshot() at the beginning of the new MANIFEST after
// switching from the old one.
TEST_F(DBSecondaryTest, SkipSnapshotAfterManifestSwitch) {
TEST_F(DBSecondaryTest, SwitchManifestTwice) {
Options options;
options.env = env_;
options.disable_auto_compactions = true;
Expand All @@ -640,10 +622,15 @@ TEST_F(DBSecondaryTest, SkipSnapshotAfterManifestSwitch) {

Reopen(options);
ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
Reopen(options);
ASSERT_OK(Put("0", "value1"));
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());

ASSERT_OK(db_secondary_->Get(ropts, "0", &value));
ASSERT_EQ("value1", value);
}

TEST_F(DBSecondaryTest, SwitchWAL) {
TEST_F(DBSecondaryTest, DISABLED_SwitchWAL) {
const int kNumKeysPerMemtable = 1;
Options options;
options.env = env_;
Expand Down Expand Up @@ -692,7 +679,7 @@ TEST_F(DBSecondaryTest, SwitchWAL) {
}
}

TEST_F(DBSecondaryTest, SwitchWALMultiColumnFamilies) {
TEST_F(DBSecondaryTest, DISABLED_SwitchWALMultiColumnFamilies) {
const int kNumKeysPerMemtable = 1;
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency(
Expand Down
124 changes: 112 additions & 12 deletions db/version_edit_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ void VersionEditHandlerBase::Iterate(log::Reader& reader,
s = *log_read_status;
}

read_buffer_.Clear();

CheckIterationResult(reader, &s);

if (!s.ok()) {
Expand Down Expand Up @@ -129,13 +127,13 @@ Status FileChecksumRetriever::ApplyVersionEdit(VersionEdit& edit,
}

VersionEditHandler::VersionEditHandler(
bool read_only, const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only, std::vector<ColumnFamilyDescriptor> column_families,
VersionSet* version_set, bool track_missing_files,
bool no_error_if_table_files_missing,
const std::shared_ptr<IOTracer>& io_tracer, bool skip_load_table_files)
: VersionEditHandlerBase(),
read_only_(read_only),
column_families_(column_families),
column_families_(std::move(column_families)),
version_set_(version_set),
track_missing_files_(track_missing_files),
no_error_if_table_files_missing_(no_error_if_table_files_missing),
Expand Down Expand Up @@ -351,7 +349,8 @@ void VersionEditHandler::CheckIterationResult(const log::Reader& reader,
}
// There were some column families in the MANIFEST that weren't specified
// in the argument. This is OK in read_only mode
if (s->ok() && !read_only_ && !column_families_not_found_.empty()) {
if (s->ok() && MustOpenAllColumnFamilies() &&
!column_families_not_found_.empty()) {
std::string msg;
for (const auto& cf : column_families_not_found_) {
msg.append(", ");
Expand All @@ -368,6 +367,9 @@ void VersionEditHandler::CheckIterationResult(const log::Reader& reader,
version_set_->MarkFileNumberUsed(version_edit_params_.prev_log_number_);
version_set_->MarkFileNumberUsed(version_edit_params_.log_number_);
for (auto* cfd : *(version_set_->GetColumnFamilySet())) {
if (cfd->IsDropped()) {
continue;
}
auto builder_iter = builders_.find(cfd->GetID());
assert(builder_iter != builders_.end());
auto* builder = builder_iter->second->version_builder();
Expand Down Expand Up @@ -452,11 +454,9 @@ ColumnFamilyData* VersionEditHandler::DestroyCfAndCleanup(
ColumnFamilyData* ret =
version_set_->GetColumnFamilySet()->GetColumnFamily(edit.column_family_);
assert(ret != nullptr);
if (ret->UnrefAndTryDelete()) {
ret = nullptr;
} else {
assert(false);
}
ret->SetDropped();
ret->UnrefAndTryDelete();
ret = nullptr;
return ret;
}

Expand Down Expand Up @@ -572,7 +572,7 @@ Status VersionEditHandler::ExtractInfoFromVersionEdit(ColumnFamilyData* cfd,
}

VersionEditHandlerPointInTime::VersionEditHandlerPointInTime(
bool read_only, const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only, std::vector<ColumnFamilyDescriptor> column_families,
VersionSet* version_set, const std::shared_ptr<IOTracer>& io_tracer)
: VersionEditHandler(read_only, column_families, version_set,
/*track_missing_files=*/true,
Expand Down Expand Up @@ -641,7 +641,7 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion(
uint64_t file_num = fd.GetNumber();
const std::string fpath =
MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_num);
s = version_set_->VerifyFileMetadata(fpath, meta);
s = VerifyFile(fpath, meta);
if (s.IsPathNotFound() || s.IsNotFound() || s.IsCorruption()) {
missing_files.insert(file_num);
s = Status::OK();
Expand Down Expand Up @@ -682,6 +682,106 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion(
return s;
}

Status VersionEditHandlerPointInTime::VerifyFile(const std::string& fpath,
const FileMetaData& fmeta) {
return version_set_->VerifyFileMetadata(fpath, fmeta);
}

Status ManifestTailer::Initialize() {
if (Mode::kRecovery == mode_) {
return VersionEditHandler::Initialize();
}
assert(Mode::kCatchUp == mode_);
Status s;
if (!initialized_) {
ColumnFamilySet* cfd_set = version_set_->GetColumnFamilySet();
assert(cfd_set);
ColumnFamilyData* default_cfd = cfd_set->GetDefault();
assert(default_cfd);
auto builder_iter = builders_.find(default_cfd->GetID());
assert(builder_iter != builders_.end());

Version* dummy_version = default_cfd->dummy_versions();
assert(dummy_version);
Version* base_version = dummy_version->Next();
assert(base_version);
base_version->Ref();
VersionBuilderUPtr new_builder(
new BaseReferencedVersionBuilder(default_cfd, base_version));
builder_iter->second = std::move(new_builder);

initialized_ = true;
}
return s;
}

Status ManifestTailer::ApplyVersionEdit(VersionEdit& edit,
ColumnFamilyData** cfd) {
Status s = VersionEditHandler::ApplyVersionEdit(edit, cfd);
if (s.ok()) {
assert(cfd);
if (*cfd) {
cfds_changed_.insert(*cfd);
}
}
return s;
}

Status ManifestTailer::OnColumnFamilyAdd(VersionEdit& edit,
ColumnFamilyData** cfd) {
if (Mode::kRecovery == mode_) {
return VersionEditHandler::OnColumnFamilyAdd(edit, cfd);
}
assert(Mode::kCatchUp == mode_);
ColumnFamilySet* cfd_set = version_set_->GetColumnFamilySet();
assert(cfd_set);
ColumnFamilyData* tmp_cfd = cfd_set->GetColumnFamily(edit.GetColumnFamily());
assert(cfd);
*cfd = tmp_cfd;
if (!tmp_cfd) {
// For now, ignore new column families created after Recover() succeeds.
return Status::OK();
}
auto builder_iter = builders_.find(edit.GetColumnFamily());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which condition, will it reach here? Should not new CF only be handled in Recovery mode (which is handled by VersionEditHanlder::OnColumnFamilyAdd())?

Copy link
Contributor Author

@riversand963 riversand963 Mar 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the primary later creates a new column family, it will write a VersionEdit to the MANIFEST. The secondary, whether in Recovery mode or Catchup mode, will be able to see this VersionEdit.
If the column family is added by the primary after the secondary is created and the secondary does not specify it in its Open call, then we ignore it. Should the secondary be interested in this column family, it should be opened with this column family in the first place. This is aligned with how we call DB::Open() with column families.
For column families that are specified when creating the secondary, we will see VersionEdit.is_column_family_add == true for them multiple times, whether secondary is in Recovery or Catchup mode. In fact, the primary will write a VersionEdit to add these column families each time it switches to a new MANIFEST.
Maybe an example can help.

MANIFEST-1 : | 'add_cf_1' | 'cf1': 'add [2.sst]' | 'cf1': 'add [3.sst]' |
MANIFEST-4 : |'add_cf_1' | 'cf1': 'add [2.sst,3.sst]' |

The code snippet here handles the case when the add_cf_1 in MANIFEST-4 is seen. Suppose at this time, the secondary instance sees the second version edit of MANIFEST-1 but for some reason switches to MANIFEST-4. We need to build the version storage for cf1 from the empty base version of cf1, but from the version with 2.sst, otherwise the consistency check will fail due to two occurences of 2.sst.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation.

assert(builder_iter != builders_.end());

Version* dummy_version = tmp_cfd->dummy_versions();
assert(dummy_version);
Version* base_version = dummy_version->Next();
assert(base_version);
base_version->Ref();
VersionBuilderUPtr new_builder(
new BaseReferencedVersionBuilder(tmp_cfd, base_version));
builder_iter->second = std::move(new_builder);

#ifndef NDEBUG
auto version_iter = versions_.find(edit.GetColumnFamily());
assert(version_iter != versions_.end());
#endif // !NDEBUG
return Status::OK();
}

void ManifestTailer::CheckIterationResult(const log::Reader& reader,
Status* s) {
VersionEditHandlerPointInTime::CheckIterationResult(reader, s);
assert(s);
if (s->ok()) {
if (Mode::kRecovery == mode_) {
mode_ = Mode::kCatchUp;
} else {
assert(Mode::kCatchUp == mode_);
}
}
}

Status ManifestTailer::VerifyFile(const std::string& fpath,
const FileMetaData& fmeta) {
Status s = VersionEditHandlerPointInTime::VerifyFile(fpath, fmeta);
// TODO: Open file or create hard link to prevent the file from being
// deleted.
return s;
}

void DumpManifestHandler::CheckIterationResult(const log::Reader& reader,
Status* s) {
VersionEditHandler::CheckIterationResult(reader, s);
Expand Down
Loading