diff --git a/src/dist/replication/lib/duplication/load_from_private_log.cpp b/src/dist/replication/lib/duplication/load_from_private_log.cpp index 808b7d7840..f7acb9447b 100644 --- a/src/dist/replication/lib/duplication/load_from_private_log.cpp +++ b/src/dist/replication/lib/duplication/load_from_private_log.cpp @@ -2,22 +2,166 @@ // This source code is licensed under the Apache License Version 2.0, which // can be found in the LICENSE file in the root directory of this source tree. +#include + +#include "dist/replication/lib/replica_stub.h" #include "dist/replication/lib/replica.h" +#include "dist/replication/lib/mutation_log_utils.h" #include "load_from_private_log.h" #include "replica_duplicator.h" namespace dsn { namespace replication { +static constexpr int MAX_ALLOWED_REPEATS = 3; + +// Fast path to next file. If next file (_current->index + 1) is invalid, +// we try to list all files and select a new one to start (find_log_file_to_start). +bool load_from_private_log::switch_to_next_log_file() +{ + std::string new_path = fmt::format( + "{}/log.{}.{}", _private_log->dir(), _current->index() + 1, _current_global_end_offset); + + if (utils::filesystem::file_exists(new_path)) { + log_file_ptr file; + error_s es = log_utils::open_read(new_path, file); + if (!es.is_ok()) { + derror_replica("{}", es); + _current = nullptr; + return false; + } + start_from_log_file(file); + return true; + } else { + _current = nullptr; + return false; + } +} + void load_from_private_log::run() { - // TBD + dassert_replica(_start_decree != invalid_decree, "{}", _start_decree); + _duplicator->verify_start_decree(_start_decree); + + if (_current == nullptr) { + find_log_file_to_start(); + if (_current == nullptr) { + ddebug_replica("no private log file is currently available"); + repeat(_repeat_delay); + return; + } + } + + replay_log_block(); +} + +void load_from_private_log::find_log_file_to_start() +{ + // `file_map` has already excluded the useless log files during replica init. + auto file_map = _private_log->get_log_file_map(); + + // Reopen the files. Because the internal file handle of `file_map` + // is cleared once WAL replay finished. They are unable to read. + std::map new_file_map; + for (const auto &pr : file_map) { + log_file_ptr file; + error_s es = log_utils::open_read(pr.second->path(), file); + if (!es.is_ok()) { + derror_replica("{}", es); + return; + } + new_file_map.emplace(pr.first, file); + } + + find_log_file_to_start(std::move(new_file_map)); +} + +void load_from_private_log::find_log_file_to_start(std::map log_file_map) +{ + if (dsn_unlikely(log_file_map.empty())) { + derror_replica("unable to start duplication since no log file is available"); + return; + } + + for (auto it = log_file_map.begin(); it != log_file_map.end(); it++) { + auto next_it = std::next(it); + if (next_it == log_file_map.end()) { + // use the last file if no file to read + if (!_current) { + start_from_log_file(it->second); + } + return; + } + if (it->second->previous_log_max_decree(get_gpid()) < _start_decree && + _start_decree <= next_it->second->previous_log_max_decree(get_gpid())) { + // `start_decree` is within the range + start_from_log_file(it->second); + // find the latest file that matches the condition + } + } +} + +void load_from_private_log::replay_log_block() +{ + error_s err = + mutation_log::replay_block(_current, + [this](int log_bytes_length, mutation_ptr &mu) -> bool { + auto es = _mutation_batch.add(std::move(mu)); + dassert_replica(es.is_ok(), es.description()); + return true; + }, + _start_offset, + _current_global_end_offset); + if (!err.is_ok()) { + if (err.code() == ERR_HANDLE_EOF && switch_to_next_log_file()) { + repeat(); + return; + } + + _err_repeats_num++; + if (_err_repeats_num > MAX_ALLOWED_REPEATS) { + derror_replica("loading mutation logs failed for {} times: [err: {}, file: {}, " + "start_offset: {}], retry from start", + MAX_ALLOWED_REPEATS, + err, + _current->path(), + _start_offset); + find_log_file_to_start(); + } + repeat(_repeat_delay); + return; + } + + _start_offset = static_cast(_current_global_end_offset - _current->start_offset()); + + // update last_decree even for empty batch. + step_down_next_stage(_mutation_batch.last_decree(), _mutation_batch.move_all_mutations()); } load_from_private_log::load_from_private_log(replica *r, replica_duplicator *dup) - : replica_base(r), _private_log(r->private_log()), _duplicator(dup) + : replica_base(r), + _private_log(r->private_log()), + _duplicator(dup), + _stub(r->get_replica_stub()), + _mutation_batch(dup) { } +void load_from_private_log::set_start_decree(decree start_decree) +{ + _start_decree = start_decree; + _mutation_batch.set_start_decree(start_decree); +} + +void load_from_private_log::start_from_log_file(log_file_ptr f) +{ + ddebug_replica("start loading from log file {}", f->path()); + + _current = std::move(f); + _start_offset = 0; + _current_global_end_offset = _current->start_offset(); + _err_repeats_num = 0; +} + } // namespace replication } // namespace dsn diff --git a/src/dist/replication/lib/duplication/load_from_private_log.h b/src/dist/replication/lib/duplication/load_from_private_log.h index 2bb7cba4f4..e3b5cf17ce 100644 --- a/src/dist/replication/lib/duplication/load_from_private_log.h +++ b/src/dist/replication/lib/duplication/load_from_private_log.h @@ -9,6 +9,7 @@ #include #include "dist/replication/lib/mutation_log.h" +#include "mutation_batch.h" namespace dsn { namespace replication { @@ -31,11 +32,41 @@ class load_from_private_log : public replica_base, // The loaded mutations will be passed down to `ship_mutation`. void run() override; + void set_start_decree(decree start_decree); + + /// ==== Implementation ==== /// + + /// Find the log file that contains `_start_decree`. + void find_log_file_to_start(); + void find_log_file_to_start(std::map log_files); + + void replay_log_block(); + + // Switches to the log file with index = current_log_index + 1. + // Returns true if succeeds. + bool switch_to_next_log_file(); + + void start_from_log_file(log_file_ptr f); + private: friend class load_from_private_log_test; mutation_log_ptr _private_log; replica_duplicator *_duplicator; + replica_stub *_stub; + + log_file_ptr _current; + + size_t _start_offset{0}; + int64_t _current_global_end_offset{0}; + mutation_batch _mutation_batch; + + // How many times it repeats reading from _start_offset but failed. + int _err_repeats_num{0}; + + decree _start_decree{0}; + + std::chrono::milliseconds _repeat_delay{10_s}; }; } // namespace replication diff --git a/src/dist/replication/lib/duplication/test/CMakeLists.txt b/src/dist/replication/lib/duplication/test/CMakeLists.txt index efe5f3e692..bacf339147 100644 --- a/src/dist/replication/lib/duplication/test/CMakeLists.txt +++ b/src/dist/replication/lib/duplication/test/CMakeLists.txt @@ -25,6 +25,9 @@ set(MY_BOOST_LIBS Boost::system Boost::filesystem Boost::regex) set(MY_BINPLACES config-test.ini + log.1.0.handle_real_private_log + log.1.0.all_loaded_are_write_empties + log.1.0.handle_real_private_log2 run.sh ) diff --git a/src/dist/replication/lib/duplication/test/duplication_test_base.h b/src/dist/replication/lib/duplication/test/duplication_test_base.h index 30a244ae8e..22cd40b49f 100644 --- a/src/dist/replication/lib/duplication/test/duplication_test_base.h +++ b/src/dist/replication/lib/duplication/test/duplication_test_base.h @@ -4,6 +4,7 @@ #pragma once +#include "dist/replication/lib/mutation_log_utils.h" #include "dist/replication/test/replica_test/unit_test/replica_test_base.h" #include "dist/replication/lib/duplication/replica_duplicator.h" #include "dist/replication/lib/duplication/replica_duplicator_manager.h" @@ -44,6 +45,14 @@ class duplication_test_base : public replica_test_base dup_ent.progress[_replica->get_gpid().get_partition_index()] = confirmed; return make_unique(dup_ent, _replica.get()); } + + std::map open_log_file_map(const std::string &log_dir) + { + std::map log_file_map; + error_s err = log_utils::open_log_file_map(log_dir, log_file_map); + EXPECT_EQ(err, error_s::ok()); + return log_file_map; + } }; } // namespace replication diff --git a/src/dist/replication/lib/duplication/test/load_from_private_log_test.cpp b/src/dist/replication/lib/duplication/test/load_from_private_log_test.cpp new file mode 100644 index 0000000000..293e69a13e --- /dev/null +++ b/src/dist/replication/lib/duplication/test/load_from_private_log_test.cpp @@ -0,0 +1,375 @@ +// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include +#include +#include + +#define BOOST_NO_CXX11_SCOPED_ENUMS +#include +#undef BOOST_NO_CXX11_SCOPED_ENUMS + +#include "dist/replication/lib/mutation_log_utils.h" +#include "dist/replication/lib/duplication/load_from_private_log.h" +#include "duplication_test_base.h" + +namespace dsn { +namespace replication { + +DEFINE_TASK_CODE_RPC(RPC_RRDB_RRDB_PUT, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT); + +class load_from_private_log_test : public duplication_test_base +{ +public: + load_from_private_log_test() + { + _replica->init_private_log(_log_dir); + duplicator = create_test_duplicator(); + } + + void test_find_log_file_to_start() + { + load_from_private_log load(_replica.get(), duplicator.get()); + + std::vector mutations; + int max_log_file_mb = 1; + + mutation_log_ptr mlog = new mutation_log_private(_replica->dir(), + max_log_file_mb, + _replica->get_gpid(), + _replica.get(), + 1024, + 512, + 10000); + EXPECT_EQ(mlog->open(nullptr, nullptr), ERR_OK); + + load.find_log_file_to_start({}); + ASSERT_FALSE(load._current); + + { // writing mutations to log which will generate multiple files + for (int i = 0; i < 1000 * 50; i++) { + std::string msg = "hello!"; + mutations.push_back(msg); + mutation_ptr mu = create_test_mutation(2 + i, msg); + mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0); + } + mlog->tracker()->wait_outstanding_tasks(); + } + + auto files = open_log_file_map(_log_dir); + + load.set_start_decree(1); + load.find_log_file_to_start(files); + ASSERT_TRUE(load._current); + ASSERT_EQ(load._current->index(), 1); + + load._current = nullptr; + load.set_start_decree(50); + load.find_log_file_to_start(files); + ASSERT_TRUE(load._current); + ASSERT_EQ(load._current->index(), 1); + + int last_idx = files.rbegin()->first; + load._current = nullptr; + load.set_start_decree(1000 * 50 + 200); + load.find_log_file_to_start(files); + ASSERT_TRUE(load._current); + ASSERT_EQ(load._current->index(), last_idx); + } + + void test_start_duplication(int num_entries, int private_log_size_mb) + { + std::vector mutations; + + mutation_log_ptr mlog = new mutation_log_private(_replica->dir(), + private_log_size_mb, + _replica->get_gpid(), + _replica.get(), + 1024, + 512, + 50000); + EXPECT_EQ(mlog->open(nullptr, nullptr), ERR_OK); + + { + for (int i = 1; i <= num_entries; i++) { + std::string msg = "hello!"; + mutations.push_back(msg); + mutation_ptr mu = create_test_mutation(i, msg); + mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0); + } + + // commit the last entry + mutation_ptr mu = create_test_mutation(1 + num_entries, "hello!"); + mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0); + + mlog->close(); + } + + load_and_wait_all_entries_loaded(num_entries, num_entries, 1); + } + + mutation_tuple_set + load_and_wait_all_entries_loaded(int total, int last_decree, decree start_decree) + { + return load_and_wait_all_entries_loaded( + total, last_decree, _replica->get_gpid(), start_decree); + } + mutation_tuple_set load_and_wait_all_entries_loaded(int total, int last_decree) + { + return load_and_wait_all_entries_loaded(total, last_decree, _replica->get_gpid(), 1); + } + mutation_tuple_set + load_and_wait_all_entries_loaded(int total, int last_decree, gpid id, decree start_decree) + { + mutation_log_ptr mlog = create_private_log(id); + for (const auto &pr : mlog->get_log_file_map()) { + EXPECT_TRUE(pr.second->file_handle() == nullptr); + } + _replica->init_private_log(mlog); + duplicator = create_test_duplicator(start_decree - 1); + + load_from_private_log load(_replica.get(), duplicator.get()); + const_cast(load._repeat_delay) = 1_s; + load.set_start_decree(duplicator->progress().last_decree + 1); + + mutation_tuple_set loaded_mutations; + pipeline::do_when end_stage( + [&loaded_mutations, &load, total, last_decree](decree &&d, + mutation_tuple_set &&mutations) { + // we create one mutation_update per mutation + // the mutations are started from 1 + for (mutation_tuple mut : mutations) { + loaded_mutations.emplace(mut); + } + + if (loaded_mutations.size() < total || d < last_decree) { + load.run(); + } + }); + + duplicator->from(load).link(end_stage); + + // inject some faults + fail::setup(); + fail::cfg("open_read", "25%1*return()"); + fail::cfg("mutation_log_read_log_block", "25%1*return()"); + duplicator->run_pipeline(); + duplicator->wait_all(); + fail::teardown(); + + return loaded_mutations; + } + + void test_restart_duplication() + { + load_from_private_log load(_replica.get(), duplicator.get()); + + // start duplication from a compacted plog dir. + // first log file is log.2.xxx + for (int f = 0; f < 2; f++) { + mutation_log_ptr mlog = create_private_log(); + for (int i = 0; i < 100; i++) { + std::string msg = "hello!"; + mutation_ptr mu = create_test_mutation(39000 + 100 * f + i, msg); + mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0); + } + mlog->tracker()->wait_outstanding_tasks(); + } + + std::vector files; + ASSERT_EQ(log_utils::list_all_files(_log_dir, files), error_s::ok()); + ASSERT_EQ(files.size(), 2); + boost::filesystem::remove(_log_dir + "/log.1.0"); + + mutation_log_ptr mlog = create_private_log(); + decree max_gced_dercee = mlog->max_gced_decree_no_lock(_replica->get_gpid()); + + // new duplication, start_decree = max_gced_decree + 1 + // ensure we can find the first file. + load.set_start_decree(max_gced_dercee + 1); + load.find_log_file_to_start(mlog->get_log_file_map()); + ASSERT_TRUE(load._current); + ASSERT_EQ(load._current->index(), 2); + } + + mutation_log_ptr create_private_log(gpid id) { return create_private_log(1, id); } + + mutation_log_ptr create_private_log(int private_log_size_mb = 1, gpid id = gpid(1, 1)) + { + std::map replay_condition; + replay_condition[id] = 0; // duplicating + mutation_log::replay_callback cb = [](int, mutation_ptr &) { return true; }; + mutation_log_ptr mlog = new mutation_log_private( + _replica->dir(), private_log_size_mb, id, _replica.get(), 1024, 512, 10000); + EXPECT_EQ(mlog->open(cb, nullptr, replay_condition), ERR_OK); + return mlog; + } + + void test_restart_duplication2() + { + load_from_private_log load(_replica.get(), duplicator.get()); + + // create a log file indexed 3, starting from 38200 + for (int f = 0; f < 3; f++) { + mutation_log_ptr mlog = create_private_log(); + for (int i = 0; i < 100; i++) { + std::string msg = "hello!"; + mutation_ptr mu = create_test_mutation(38000 + 100 * f + i, msg); + mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0); + } + mlog->tracker()->wait_outstanding_tasks(); + } + auto files1 = open_log_file_map(_log_dir); + ASSERT_EQ(files1.size(), 3); + boost::filesystem::remove(files1[1]->path()); + boost::filesystem::remove(files1[2]->path()); + boost::filesystem::rename( + files1[3]->path(), + fmt::format("./log.{}.{}", files1[3]->index(), files1[3]->start_offset())); + + // first log is 39100 + { + for (int f = 0; f < 2; f++) { + mutation_log_ptr mlog = create_private_log(); + for (int i = 0; i < 100; i++) { + std::string msg = "hello!"; + mutation_ptr mu = create_test_mutation(39000 + 100 * f + i, msg); + mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0); + } + mlog->tracker()->wait_outstanding_tasks(); + } + boost::filesystem::remove(files1[1]->path()); + } + + { + // This test simulates the following case: + // the replica has written logs [39100 -> 39199], but after some sort of failure, + // it became learner and copied plogs starting from 38200. + boost::filesystem::rename( + fmt::format("./log.{}.{}", files1[3]->index(), files1[3]->start_offset()), + files1[3]->path()); + } + + // log.2.xxx starts from 39100 + // log.3.xxx starts from 38200 + // all log files are reserved for duplication + mutation_log_ptr mlog = create_private_log(); + auto files = mlog->get_log_file_map(); + ASSERT_EQ(files.size(), 2); + + decree max_gced_decree = mlog->max_gced_decree_no_lock(_replica->get_gpid()); + ASSERT_EQ(max_gced_decree, 38199); + + // new duplication, ensure we can start at log.3.xxx + load._private_log = mlog; + load.set_start_decree(max_gced_decree + 1); + load.find_log_file_to_start(); + ASSERT_TRUE(load._current); + ASSERT_EQ(load._current->index(), 3); + } + + std::unique_ptr duplicator; +}; + +TEST_F(load_from_private_log_test, find_log_file_to_start) { test_find_log_file_to_start(); } + +TEST_F(load_from_private_log_test, start_duplication_10000_4MB) +{ + test_start_duplication(10000, 4); +} + +TEST_F(load_from_private_log_test, start_duplication_50000_4MB) +{ + test_start_duplication(50000, 4); +} + +TEST_F(load_from_private_log_test, start_duplication_10000_1MB) +{ + test_start_duplication(10000, 1); +} + +TEST_F(load_from_private_log_test, start_duplication_50000_1MB) +{ + test_start_duplication(50000, 1); +} + +TEST_F(load_from_private_log_test, start_duplication_100000_4MB) +{ + test_start_duplication(100000, 4); +} + +// Ensure replica_duplicator can correctly handle real-world log file +TEST_F(load_from_private_log_test, handle_real_private_log) +{ + struct test_data + { + std::string fname; + int puts; + int total; + gpid id; + } tests[] = { + // PUT, PUT, PUT, EMPTY, PUT, EMPTY, EMPTY + {"log.1.0.handle_real_private_log", 4, 6, gpid(1, 4)}, + + // EMPTY, PUT, EMPTY + {"log.1.0.handle_real_private_log2", 1, 2, gpid(1, 4)}, + + // EMPTY, EMPTY, EMPTY + {"log.1.0.all_loaded_are_write_empties", 0, 2, gpid(1, 5)}, + }; + + for (auto tt : tests) { + boost::filesystem::path file(tt.fname); + boost::filesystem::copy_file( + file, _log_dir + "/log.1.0", boost::filesystem::copy_option::overwrite_if_exists); + + // reset replica to specified gpid + duplicator.reset(nullptr); + _replica = create_mock_replica( + stub.get(), tt.id.get_app_id(), tt.id.get_partition_index(), _log_dir.c_str()); + + load_and_wait_all_entries_loaded(tt.puts, tt.total, tt.id, 1); + } +} + +TEST_F(load_from_private_log_test, restart_duplication) { test_restart_duplication(); } + +TEST_F(load_from_private_log_test, restart_duplication2) { test_restart_duplication2(); } + +TEST_F(load_from_private_log_test, ignore_useless) +{ + utils::filesystem::remove_path(_log_dir); + + mutation_log_ptr mlog = create_private_log(); + + int num_entries = 100; + for (int i = 1; i <= num_entries; i++) { + std::string msg = "hello!"; + mutation_ptr mu = create_test_mutation(i, msg); + mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0); + } + + // commit the last entry + mutation_ptr mu = create_test_mutation(1 + num_entries, "hello!"); + mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0); + mlog->close(); + + // starts from 51 + mutation_tuple_set result = load_and_wait_all_entries_loaded(50, 100, 51); + ASSERT_EQ(result.size(), 50); + + // starts from 100 + result = load_and_wait_all_entries_loaded(1, 100, 100); + ASSERT_EQ(result.size(), 1); + + // a new duplication's confirmed_decree is invalid_decree, + // so start_decree is 0. + // In this case duplication will starts from last_commit(100), + // no mutation will be loaded. + result = load_and_wait_all_entries_loaded(0, 100, 0); + ASSERT_EQ(result.size(), 0); +} + +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/lib/duplication/test/log.1.0.all_loaded_are_write_empties b/src/dist/replication/lib/duplication/test/log.1.0.all_loaded_are_write_empties new file mode 100644 index 0000000000..83c5954dc2 Binary files /dev/null and b/src/dist/replication/lib/duplication/test/log.1.0.all_loaded_are_write_empties differ diff --git a/src/dist/replication/lib/duplication/test/log.1.0.handle_real_private_log b/src/dist/replication/lib/duplication/test/log.1.0.handle_real_private_log new file mode 100644 index 0000000000..9038fe1a1b Binary files /dev/null and b/src/dist/replication/lib/duplication/test/log.1.0.handle_real_private_log differ diff --git a/src/dist/replication/lib/duplication/test/log.1.0.handle_real_private_log2 b/src/dist/replication/lib/duplication/test/log.1.0.handle_real_private_log2 new file mode 100644 index 0000000000..685cb24ddd Binary files /dev/null and b/src/dist/replication/lib/duplication/test/log.1.0.handle_real_private_log2 differ diff --git a/src/dist/replication/test/replica_test/unit_test/mock_utils.h b/src/dist/replication/test/replica_test/unit_test/mock_utils.h index 75389ed271..bbbbd82818 100644 --- a/src/dist/replication/test/replica_test/unit_test/mock_utils.h +++ b/src/dist/replication/test/replica_test/unit_test/mock_utils.h @@ -107,6 +107,8 @@ class mock_replica : public replica dcheck_eq_replica(err, ERR_OK); } + void init_private_log(mutation_log_ptr log) { _private_log = std::move(log); } + replica_duplicator_manager &get_replica_duplicator_manager() { return *_duplication_mgr; } void as_primary() { _config.status = partition_status::PS_PRIMARY; } @@ -269,6 +271,7 @@ class mock_mutation_log_shared : public mutation_log_shared void flush() {} void flush_once() {} + private: std::vector _mu_list; };