From d07801b5cc55f2bb83d33e5452d832951fba2a15 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Thu, 19 Sep 2019 19:23:01 +0800 Subject: [PATCH 1/4] feat(dup): implement procedure load_from_private_log --- .../lib/duplication/load_from_private_log.cpp | 152 ++++++- .../lib/duplication/load_from_private_log.h | 31 ++ .../duplication/test/duplication_test_base.h | 9 + .../test/load_from_private_log_test.cpp | 371 ++++++++++++++++++ .../test/log.1.0.all_loaded_are_write_empties | Bin 0 -> 405 bytes .../test/log.1.0.handle_real_private_log | Bin 0 -> 917 bytes .../test/log.1.0.handle_real_private_log2 | Bin 0 -> 426 bytes .../test/replica_test/unit_test/mock_utils.h | 3 + 8 files changed, 564 insertions(+), 2 deletions(-) create mode 100644 src/dist/replication/lib/duplication/test/load_from_private_log_test.cpp create mode 100644 src/dist/replication/lib/duplication/test/log.1.0.all_loaded_are_write_empties create mode 100644 src/dist/replication/lib/duplication/test/log.1.0.handle_real_private_log create mode 100644 src/dist/replication/lib/duplication/test/log.1.0.handle_real_private_log2 diff --git a/src/dist/replication/lib/duplication/load_from_private_log.cpp b/src/dist/replication/lib/duplication/load_from_private_log.cpp index 808b7d7840..18c9a1f228 100644 --- a/src/dist/replication/lib/duplication/load_from_private_log.cpp +++ b/src/dist/replication/lib/duplication/load_from_private_log.cpp @@ -2,21 +2,169 @@ // This source code is licensed under the Apache License Version 2.0, which // can be found in the LICENSE file in the root directory of this source tree. +#include + +#include "dist/replication/lib/replica_stub.h" #include "dist/replication/lib/replica.h" +#include "dist/replication/lib/mutation_log_utils.h" #include "load_from_private_log.h" #include "replica_duplicator.h" namespace dsn { namespace replication { +static constexpr int MAX_ALLOWED_REPEATS = 3; + +// Fast path to next file. If next file (_current->index + 1) is invalid, +// we try to list all files and select a new one to start (find_log_file_to_start). +void load_from_private_log::switch_to_next_log_file() +{ + std::string new_path = fmt::format( + "{}/log.{}.{}", _private_log->dir(), _current->index() + 1, _current_global_end_offset); + + if (utils::filesystem::file_exists(new_path)) { + log_file_ptr file; + error_s es = log_utils::open_read(new_path, file); + if (!es.is_ok()) { + derror_replica("{}", es); + _current = nullptr; + return; + } + start_from_log_file(file); + } else { + _current = nullptr; + } +} + void load_from_private_log::run() { - // TBD + dassert_replica(_start_decree != invalid_decree, "{}", _start_decree); + _duplicator->verify_start_decree(_start_decree); + + if (_current == nullptr) { + find_log_file_to_start(); + if (_current == nullptr) { + ddebug_replica("no private log file is currently available"); + repeat(_repeat_delay); + return; + } + } + + replay_log_block(); +} + +void load_from_private_log::find_log_file_to_start() +{ + // the file set already excluded the useless log files during replica init. + auto file_map = _private_log->get_log_file_map(); + + // reopen the files, because the internal file handle of private_log->log_file_map() + // is cleared and unable to be used for us. + std::map new_file_map; + for (const auto &pr : file_map) { + log_file_ptr file; + error_s es = log_utils::open_read(pr.second->path(), file); + if (!es.is_ok()) { + derror_replica("{}", es); + return; + } + new_file_map.emplace(pr.first, file); + } + + find_log_file_to_start(new_file_map); +} + +void load_from_private_log::find_log_file_to_start(std::map log_file_map) +{ + if (dsn_unlikely(log_file_map.empty())) { + derror_replica("unable to start duplication since no log file is available"); + return; + } + + // ensure start decree is not compacted + auto begin = log_file_map.begin(); + for (auto it = begin; it != log_file_map.end(); it++) { + auto next_it = std::next(it); + if (next_it == log_file_map.end()) { + // use the last file + start_from_log_file(it->second); + return; + } + if (it->second->previous_log_max_decree(get_gpid()) < _start_decree && + _start_decree <= next_it->second->previous_log_max_decree(get_gpid())) { + start_from_log_file(it->second); + return; + } + } + + __builtin_unreachable(); +} + +void load_from_private_log::replay_log_block() +{ + error_s err = + mutation_log::replay_block(_current, + [this](int log_bytes_length, mutation_ptr &mu) -> bool { + auto es = _mutation_batch.add(std::move(mu)); + if (!es.is_ok()) { + dfatal_replica(es.description()); + } + return true; + }, + _start_offset, + _current_global_end_offset); + if (!err.is_ok()) { + // EOF appears only when end of log file is reached. + if (err.code() == ERR_HANDLE_EOF) { + switch_to_next_log_file(); + repeat(_repeat_delay); + return; + } + + _err_repeats_num++; + if (_err_repeats_num > MAX_ALLOWED_REPEATS) { + derror_replica("loading mutation logs failed for {} times: [err: {}, file: {}, " + "start_offset: {}], retry from start", + MAX_ALLOWED_REPEATS, + err, + _current->path(), + _start_offset); + start_from_log_file(_current); + } + + repeat(_repeat_delay); + return; + } + + _start_offset = static_cast(_current_global_end_offset - _current->start_offset()); + + // update last_decree even for empty batch. + step_down_next_stage(_mutation_batch.last_decree(), _mutation_batch.move_all_mutations()); } load_from_private_log::load_from_private_log(replica *r, replica_duplicator *dup) - : replica_base(r), _private_log(r->private_log()), _duplicator(dup) + : replica_base(r), + _private_log(r->private_log()), + _duplicator(dup), + _stub(r->get_replica_stub()), + _mutation_batch(dup) +{ +} + +void load_from_private_log::set_start_decree(decree start_decree) { + _start_decree = start_decree; + _mutation_batch.set_start_decree(start_decree); +} + +void load_from_private_log::start_from_log_file(log_file_ptr f) +{ + ddebug_replica("start loading from log file {}", f->path()); + + _current = std::move(f); + _start_offset = 0; + _current_global_end_offset = _current->start_offset(); + _err_repeats_num = 0; } } // namespace replication diff --git a/src/dist/replication/lib/duplication/load_from_private_log.h b/src/dist/replication/lib/duplication/load_from_private_log.h index 2bb7cba4f4..f9f2c99457 100644 --- a/src/dist/replication/lib/duplication/load_from_private_log.h +++ b/src/dist/replication/lib/duplication/load_from_private_log.h @@ -9,6 +9,7 @@ #include #include "dist/replication/lib/mutation_log.h" +#include "mutation_batch.h" namespace dsn { namespace replication { @@ -31,11 +32,41 @@ class load_from_private_log : public replica_base, // The loaded mutations will be passed down to `ship_mutation`. void run() override; + void set_start_decree(decree start_decree); + + /// ==== Implementation ==== /// + + void find_log_file_to_start(); + + /// Find the log file that contains decree `d`. + void find_log_file_to_start(std::map log_files); + + void replay_log_block(); + + // Switches to the log file with index = current_log_index + 1. + void switch_to_next_log_file(); + + void start_from_log_file(log_file_ptr f); + private: friend class load_from_private_log_test; mutation_log_ptr _private_log; replica_duplicator *_duplicator; + replica_stub *_stub; + + log_file_ptr _current; + + size_t _start_offset{0}; + int64_t _current_global_end_offset{0}; + mutation_batch _mutation_batch; + + // How many times it repeats reading from _start_offset but failed. + int _err_repeats_num{0}; + + decree _start_decree{0}; + + std::chrono::milliseconds _repeat_delay{10_s}; }; } // namespace replication diff --git a/src/dist/replication/lib/duplication/test/duplication_test_base.h b/src/dist/replication/lib/duplication/test/duplication_test_base.h index 30a244ae8e..22cd40b49f 100644 --- a/src/dist/replication/lib/duplication/test/duplication_test_base.h +++ b/src/dist/replication/lib/duplication/test/duplication_test_base.h @@ -4,6 +4,7 @@ #pragma once +#include "dist/replication/lib/mutation_log_utils.h" #include "dist/replication/test/replica_test/unit_test/replica_test_base.h" #include "dist/replication/lib/duplication/replica_duplicator.h" #include "dist/replication/lib/duplication/replica_duplicator_manager.h" @@ -44,6 +45,14 @@ class duplication_test_base : public replica_test_base dup_ent.progress[_replica->get_gpid().get_partition_index()] = confirmed; return make_unique(dup_ent, _replica.get()); } + + std::map open_log_file_map(const std::string &log_dir) + { + std::map log_file_map; + error_s err = log_utils::open_log_file_map(log_dir, log_file_map); + EXPECT_EQ(err, error_s::ok()); + return log_file_map; + } }; } // namespace replication diff --git a/src/dist/replication/lib/duplication/test/load_from_private_log_test.cpp b/src/dist/replication/lib/duplication/test/load_from_private_log_test.cpp new file mode 100644 index 0000000000..fdfa1488f0 --- /dev/null +++ b/src/dist/replication/lib/duplication/test/load_from_private_log_test.cpp @@ -0,0 +1,371 @@ +// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include +#include +#include + +#define BOOST_NO_CXX11_SCOPED_ENUMS +#include +#undef BOOST_NO_CXX11_SCOPED_ENUMS + +#include "dist/replication/lib/mutation_log_utils.h" +#include "dist/replication/lib/duplication/load_from_private_log.h" +#include "duplication_test_base.h" + +namespace dsn { +namespace replication { + +class load_from_private_log_test : public duplication_test_base +{ +public: + load_from_private_log_test() + { + _replica->init_private_log(_log_dir); + duplicator = create_test_duplicator(); + } + + void test_find_log_file_to_start() + { + load_from_private_log load(_replica.get(), duplicator.get()); + + std::vector mutations; + int max_log_file_mb = 1; + + mutation_log_ptr mlog = new mutation_log_private(_replica->dir(), + max_log_file_mb, + _replica->get_gpid(), + _replica.get(), + 1024, + 512, + 10000); + EXPECT_EQ(mlog->open(nullptr, nullptr), ERR_OK); + + load.find_log_file_to_start({}); + ASSERT_FALSE(load._current); + + { // writing mutations to log which will generate multiple files + for (int i = 0; i < 1000 * 50; i++) { + std::string msg = "hello!"; + mutations.push_back(msg); + mutation_ptr mu = create_test_mutation(2 + i, msg); + mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0); + } + mlog->tracker()->wait_outstanding_tasks(); + } + + auto files = open_log_file_map(_log_dir); + + load.set_start_decree(1); + load.find_log_file_to_start(files); + ASSERT_TRUE(load._current); + ASSERT_EQ(load._current->index(), 1); + + load.set_start_decree(50); + load.find_log_file_to_start(files); + ASSERT_TRUE(load._current); + ASSERT_EQ(load._current->index(), 1); + + int last_idx = files.rbegin()->first; + load.set_start_decree(1000 * 50 + 200); + load.find_log_file_to_start(files); + ASSERT_TRUE(load._current); + ASSERT_EQ(load._current->index(), last_idx); + } + + void test_start_duplication(int num_entries, int private_log_size_mb) + { + std::vector mutations; + + mutation_log_ptr mlog = new mutation_log_private(_replica->dir(), + private_log_size_mb, + _replica->get_gpid(), + _replica.get(), + 1024, + 512, + 50000); + EXPECT_EQ(mlog->open(nullptr, nullptr), ERR_OK); + + { + for (int i = 1; i <= num_entries; i++) { + std::string msg = "hello!"; + mutations.push_back(msg); + mutation_ptr mu = create_test_mutation(i, msg); + mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0); + } + + // commit the last entry + mutation_ptr mu = create_test_mutation(1 + num_entries, "hello!"); + mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0); + + mlog->close(); + } + + load_and_wait_all_entries_loaded(num_entries, num_entries, 1); + } + + mutation_tuple_set + load_and_wait_all_entries_loaded(int total, int last_decree, decree start_decree) + { + return load_and_wait_all_entries_loaded( + total, last_decree, _replica->get_gpid(), start_decree); + } + mutation_tuple_set load_and_wait_all_entries_loaded(int total, int last_decree) + { + return load_and_wait_all_entries_loaded(total, last_decree, _replica->get_gpid(), 1); + } + mutation_tuple_set + load_and_wait_all_entries_loaded(int total, int last_decree, gpid id, decree start_decree) + { + mutation_log_ptr mlog = create_private_log(id); + for (const auto &pr : mlog->get_log_file_map()) { + EXPECT_TRUE(pr.second->file_handle() == nullptr); + } + _replica->init_private_log(mlog); + duplicator = create_test_duplicator(start_decree - 1); + + load_from_private_log load(_replica.get(), duplicator.get()); + const_cast(load._repeat_delay) = 1_s; + load.set_start_decree(duplicator->progress().last_decree + 1); + + mutation_tuple_set loaded_mutations; + pipeline::do_when end_stage( + [&loaded_mutations, &load, total, last_decree](decree &&d, + mutation_tuple_set &&mutations) { + // we create one mutation_update per mutation + // the mutations are started from 1 + for (mutation_tuple mut : mutations) { + loaded_mutations.emplace(mut); + } + + if (loaded_mutations.size() < total || d < last_decree) { + load.run(); + } + }); + + duplicator->from(load).link(end_stage); + + // inject some faults + fail::setup(); + fail::cfg("open_read", "25%1*return()"); + fail::cfg("mutation_log_read_log_block", "25%1*return()"); + duplicator->run_pipeline(); + duplicator->wait_all(); + fail::teardown(); + + return loaded_mutations; + } + + void test_restart_duplication() + { + load_from_private_log load(_replica.get(), duplicator.get()); + + // start duplication from a compacted plog dir. + // first log file is log.2.xxx + for (int f = 0; f < 2; f++) { + mutation_log_ptr mlog = create_private_log(); + for (int i = 0; i < 100; i++) { + std::string msg = "hello!"; + mutation_ptr mu = create_test_mutation(39000 + 100 * f + i, msg); + mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0); + } + mlog->tracker()->wait_outstanding_tasks(); + } + + std::vector files; + ASSERT_EQ(log_utils::list_all_files(_log_dir, files), error_s::ok()); + ASSERT_EQ(files.size(), 2); + boost::filesystem::remove(_log_dir + "/log.1.0"); + + mutation_log_ptr mlog = create_private_log(); + decree max_gced_dercee = mlog->max_gced_decree_no_lock(_replica->get_gpid()); + + // new duplication, start_decree = max_gced_decree + 1 + // ensure we can find the first file. + load.set_start_decree(max_gced_dercee + 1); + load.find_log_file_to_start(mlog->get_log_file_map()); + ASSERT_TRUE(load._current); + ASSERT_EQ(load._current->index(), 2); + } + + mutation_log_ptr create_private_log(gpid id) { return create_private_log(1, id); } + + mutation_log_ptr create_private_log(int private_log_size_mb = 1, gpid id = gpid(1, 1)) + { + std::map replay_condition; + replay_condition[id] = 0; // duplicating + mutation_log::replay_callback cb = [](int, mutation_ptr &) { return true; }; + mutation_log_ptr mlog = new mutation_log_private( + _replica->dir(), private_log_size_mb, id, _replica.get(), 1024, 512, 10000); + EXPECT_EQ(mlog->open(cb, nullptr, replay_condition), ERR_OK); + return mlog; + } + + void test_restart_duplication2() + { + load_from_private_log load(_replica.get(), duplicator.get()); + + // create a log file indexed 3, starting from 38200 + for (int f = 0; f < 3; f++) { + mutation_log_ptr mlog = create_private_log(); + for (int i = 0; i < 100; i++) { + std::string msg = "hello!"; + mutation_ptr mu = create_test_mutation(38000 + 100 * f + i, msg); + mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0); + } + mlog->tracker()->wait_outstanding_tasks(); + } + auto files1 = open_log_file_map(_log_dir); + ASSERT_EQ(files1.size(), 3); + boost::filesystem::remove(files1[1]->path()); + boost::filesystem::remove(files1[2]->path()); + boost::filesystem::rename( + files1[3]->path(), + fmt::format("./log.{}.{}", files1[3]->index(), files1[3]->start_offset())); + + // first log is 39100 + { + for (int f = 0; f < 2; f++) { + mutation_log_ptr mlog = create_private_log(); + for (int i = 0; i < 100; i++) { + std::string msg = "hello!"; + mutation_ptr mu = create_test_mutation(39000 + 100 * f + i, msg); + mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0); + } + mlog->tracker()->wait_outstanding_tasks(); + } + boost::filesystem::remove(files1[1]->path()); + } + + { + // This test simulates the following case: + // the replica has written logs [39100 -> 39199], but after some sort of failure, + // it became learner and copied plogs starting from 38200. + boost::filesystem::rename( + fmt::format("./log.{}.{}", files1[3]->index(), files1[3]->start_offset()), + files1[3]->path()); + } + + // log.2.xxx starts from 39100 + // log.3.xxx starts from 38200 + // all log files are reserved for duplication + mutation_log_ptr mlog = create_private_log(); + auto files = mlog->get_log_file_map(); + ASSERT_EQ(files.size(), 2); + + decree max_gced_decree = mlog->max_gced_decree_no_lock(_replica->get_gpid()); + ASSERT_EQ(max_gced_decree, 38199); + + // new duplication, ensure we can start at log.3.xxx + load._private_log = mlog; + load.set_start_decree(max_gced_decree + 1); + load.find_log_file_to_start(); + ASSERT_TRUE(load._current); + ASSERT_EQ(load._current->index(), 3); + } + + std::unique_ptr duplicator; +}; + +TEST_F(load_from_private_log_test, find_log_file_to_start) { test_find_log_file_to_start(); } + +TEST_F(load_from_private_log_test, start_duplication_10000_4MB) +{ + test_start_duplication(10000, 4); +} + +TEST_F(load_from_private_log_test, start_duplication_50000_4MB) +{ + test_start_duplication(50000, 4); +} + +TEST_F(load_from_private_log_test, start_duplication_10000_1MB) +{ + test_start_duplication(10000, 1); +} + +TEST_F(load_from_private_log_test, start_duplication_50000_1MB) +{ + test_start_duplication(50000, 1); +} + +TEST_F(load_from_private_log_test, start_duplication_100000_4MB) +{ + test_start_duplication(100000, 4); +} + +// Ensure replica_duplicator can correctly handle real-world log file +TEST_F(load_from_private_log_test, handle_real_private_log) +{ + struct test_data + { + std::string fname; + int puts; + int total; + gpid id; + } tests[] = { + // PUT, PUT, PUT, EMPTY, PUT, EMPTY, EMPTY + {"log.1.0.handle_real_private_log", 4, 6, gpid(1, 4)}, + + // EMPTY, PUT, EMPTY + {"log.1.0.handle_real_private_log2", 1, 2, gpid(1, 4)}, + + // EMPTY, EMPTY, EMPTY + {"log.1.0.all_loaded_are_write_empties", 0, 2, gpid(1, 5)}, + }; + + for (auto tt : tests) { + boost::filesystem::path file(tt.fname); + boost::filesystem::copy_file( + file, _log_dir + "/log.1.0", boost::filesystem::copy_option::overwrite_if_exists); + + // reset replica to specified gpid + duplicator.reset(nullptr); + _replica = create_mock_replica( + stub.get(), tt.id.get_app_id(), tt.id.get_partition_index(), _log_dir.c_str()); + + load_and_wait_all_entries_loaded(tt.puts, tt.total, tt.id, 1); + } +} + +TEST_F(load_from_private_log_test, restart_duplication) { test_restart_duplication(); } + +TEST_F(load_from_private_log_test, restart_duplication2) { test_restart_duplication2(); } + +TEST_F(load_from_private_log_test, ignore_useless) +{ + utils::filesystem::remove_path(_log_dir); + + mutation_log_ptr mlog = create_private_log(); + + int num_entries = 100; + for (int i = 1; i <= num_entries; i++) { + std::string msg = "hello!"; + mutation_ptr mu = create_test_mutation(i, msg); + mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0); + } + + // commit the last entry + mutation_ptr mu = create_test_mutation(1 + num_entries, "hello!"); + mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0); + mlog->close(); + + // starts from 51 + mutation_tuple_set result = load_and_wait_all_entries_loaded(50, 100, 51); + ASSERT_EQ(result.size(), 50); + + // starts from 100 + result = load_and_wait_all_entries_loaded(1, 100, 100); + ASSERT_EQ(result.size(), 1); + + // a new duplication's confirmed_decree is invalid_decree, + // so start_decree is 0. + // In this case duplication will starts from last_commit(100), + // no mutation will be loaded. + result = load_and_wait_all_entries_loaded(0, 100, 0); + ASSERT_EQ(result.size(), 0); +} + +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/lib/duplication/test/log.1.0.all_loaded_are_write_empties b/src/dist/replication/lib/duplication/test/log.1.0.all_loaded_are_write_empties new file mode 100644 index 0000000000000000000000000000000000000000..83c5954dc284c2e0f14f4cb799d8dad747549601 GIT binary patch literal 405 zcmaFAZ|yxD1_lPkyJwPt6vKNUpAp1>0uUR7Sz!W5G)OH-(}Nl*J{u%)n0jWY7|c8$ zBsCx=qjGt6T^=jgd}$y*D8M;B$Th&n)7de^)88*XJjgS|HQv=XAS4o|6K3V_)julN z!vsM_0WrvlFe{m$a{Hk)%n1@~xrJm}8LPcrTM1?k&{%XUVOASK)xeza>_t*JiBca-ngK!Lr>D$#|{sHpQtz-o$1_E=a8kiFv{pWlNvy$|vPQmcHaHB)adOYr6gE)g> wKU59OT`FQm?_lmA!Rw&VKI6!AT?rP1Kt8(H*`Zb%LDj&VaPhDD7ZR-m0H^?SWdHyG literal 0 HcmV?d00001 diff --git a/src/dist/replication/lib/duplication/test/log.1.0.handle_real_private_log2 b/src/dist/replication/lib/duplication/test/log.1.0.handle_real_private_log2 new file mode 100644 index 0000000000000000000000000000000000000000..685cb24ddd4837a51fdbbee7d9c7b073fd5ad021 GIT binary patch literal 426 zcmaFAZ|yxD1_p*bTKyqFis3zw&j?~b0f-I4EHD8i8l*NE$WM5Z;$VX$4pYwz6@!`Q zgQNz;y#3$SJ3o&VY`!#*9~9smALJU~`fww~`4ew;xKwoS?+M$`@v(AW#l$Wsr+gd=Q8T2n_)#lm}uS5Q3P>n3%}J v05Xsq%8f_WWAyF=B$-1YyMW;o0pM0y _mu_list; }; From c3f2969d924f544c78a601c60b472dcc34d6e185 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Fri, 20 Sep 2019 08:24:45 +0800 Subject: [PATCH 2/4] some refinement --- .../lib/duplication/load_from_private_log.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/dist/replication/lib/duplication/load_from_private_log.cpp b/src/dist/replication/lib/duplication/load_from_private_log.cpp index 18c9a1f228..22c1d6f14b 100644 --- a/src/dist/replication/lib/duplication/load_from_private_log.cpp +++ b/src/dist/replication/lib/duplication/load_from_private_log.cpp @@ -55,11 +55,11 @@ void load_from_private_log::run() void load_from_private_log::find_log_file_to_start() { - // the file set already excluded the useless log files during replica init. + // `file_map` has already excluded the useless log files during replica init. auto file_map = _private_log->get_log_file_map(); - // reopen the files, because the internal file handle of private_log->log_file_map() - // is cleared and unable to be used for us. + // Reopen the files. Because the internal file handle of `file_map` + // is cleared once WAL replay finished. They are unable to read. std::map new_file_map; for (const auto &pr : file_map) { log_file_ptr file; @@ -71,7 +71,7 @@ void load_from_private_log::find_log_file_to_start() new_file_map.emplace(pr.first, file); } - find_log_file_to_start(new_file_map); + find_log_file_to_start(std::move(new_file_map)); } void load_from_private_log::find_log_file_to_start(std::map log_file_map) @@ -81,17 +81,17 @@ void load_from_private_log::find_log_file_to_start(std::map l return; } - // ensure start decree is not compacted auto begin = log_file_map.begin(); for (auto it = begin; it != log_file_map.end(); it++) { auto next_it = std::next(it); if (next_it == log_file_map.end()) { - // use the last file + // use the last file if new file added start_from_log_file(it->second); return; } if (it->second->previous_log_max_decree(get_gpid()) < _start_decree && _start_decree <= next_it->second->previous_log_max_decree(get_gpid())) { + // `start_decree` is within the range start_from_log_file(it->second); return; } From 922527bd3686952aa86ec59fb751cb935ebceb09 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Fri, 20 Sep 2019 11:59:16 +0800 Subject: [PATCH 3/4] find the latest log file --- .../lib/duplication/load_from_private_log.cpp | 30 ++++++++++--------- .../lib/duplication/load_from_private_log.h | 3 +- .../lib/duplication/test/CMakeLists.txt | 3 ++ .../test/load_from_private_log_test.cpp | 4 +++ 4 files changed, 25 insertions(+), 15 deletions(-) diff --git a/src/dist/replication/lib/duplication/load_from_private_log.cpp b/src/dist/replication/lib/duplication/load_from_private_log.cpp index 22c1d6f14b..74fecf7223 100644 --- a/src/dist/replication/lib/duplication/load_from_private_log.cpp +++ b/src/dist/replication/lib/duplication/load_from_private_log.cpp @@ -17,7 +17,7 @@ static constexpr int MAX_ALLOWED_REPEATS = 3; // Fast path to next file. If next file (_current->index + 1) is invalid, // we try to list all files and select a new one to start (find_log_file_to_start). -void load_from_private_log::switch_to_next_log_file() +bool load_from_private_log::switch_to_next_log_file() { std::string new_path = fmt::format( "{}/log.{}.{}", _private_log->dir(), _current->index() + 1, _current_global_end_offset); @@ -28,11 +28,13 @@ void load_from_private_log::switch_to_next_log_file() if (!es.is_ok()) { derror_replica("{}", es); _current = nullptr; - return; + return false; } start_from_log_file(file); + return true; } else { _current = nullptr; + return false; } } @@ -81,23 +83,22 @@ void load_from_private_log::find_log_file_to_start(std::map l return; } - auto begin = log_file_map.begin(); - for (auto it = begin; it != log_file_map.end(); it++) { + for (auto it = log_file_map.begin(); it != log_file_map.end(); it++) { auto next_it = std::next(it); if (next_it == log_file_map.end()) { - // use the last file if new file added - start_from_log_file(it->second); + // use the last file if no file to read + if (!_current) { + start_from_log_file(it->second); + } return; } if (it->second->previous_log_max_decree(get_gpid()) < _start_decree && _start_decree <= next_it->second->previous_log_max_decree(get_gpid())) { // `start_decree` is within the range start_from_log_file(it->second); - return; + // find the latest file that matches the condition } } - - __builtin_unreachable(); } void load_from_private_log::replay_log_block() @@ -106,9 +107,7 @@ void load_from_private_log::replay_log_block() mutation_log::replay_block(_current, [this](int log_bytes_length, mutation_ptr &mu) -> bool { auto es = _mutation_batch.add(std::move(mu)); - if (!es.is_ok()) { - dfatal_replica(es.description()); - } + dassert_replica(es.is_ok(), es.description()); return true; }, _start_offset, @@ -116,8 +115,11 @@ void load_from_private_log::replay_log_block() if (!err.is_ok()) { // EOF appears only when end of log file is reached. if (err.code() == ERR_HANDLE_EOF) { - switch_to_next_log_file(); - repeat(_repeat_delay); + if (switch_to_next_log_file()) { + repeat(); + } else { + repeat(_repeat_delay); + } return; } diff --git a/src/dist/replication/lib/duplication/load_from_private_log.h b/src/dist/replication/lib/duplication/load_from_private_log.h index f9f2c99457..bfbfafa3c9 100644 --- a/src/dist/replication/lib/duplication/load_from_private_log.h +++ b/src/dist/replication/lib/duplication/load_from_private_log.h @@ -44,7 +44,8 @@ class load_from_private_log : public replica_base, void replay_log_block(); // Switches to the log file with index = current_log_index + 1. - void switch_to_next_log_file(); + // Returns true if succeeds. + bool switch_to_next_log_file(); void start_from_log_file(log_file_ptr f); diff --git a/src/dist/replication/lib/duplication/test/CMakeLists.txt b/src/dist/replication/lib/duplication/test/CMakeLists.txt index efe5f3e692..bacf339147 100644 --- a/src/dist/replication/lib/duplication/test/CMakeLists.txt +++ b/src/dist/replication/lib/duplication/test/CMakeLists.txt @@ -25,6 +25,9 @@ set(MY_BOOST_LIBS Boost::system Boost::filesystem Boost::regex) set(MY_BINPLACES config-test.ini + log.1.0.handle_real_private_log + log.1.0.all_loaded_are_write_empties + log.1.0.handle_real_private_log2 run.sh ) diff --git a/src/dist/replication/lib/duplication/test/load_from_private_log_test.cpp b/src/dist/replication/lib/duplication/test/load_from_private_log_test.cpp index fdfa1488f0..293e69a13e 100644 --- a/src/dist/replication/lib/duplication/test/load_from_private_log_test.cpp +++ b/src/dist/replication/lib/duplication/test/load_from_private_log_test.cpp @@ -17,6 +17,8 @@ namespace dsn { namespace replication { +DEFINE_TASK_CODE_RPC(RPC_RRDB_RRDB_PUT, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT); + class load_from_private_log_test : public duplication_test_base { public: @@ -62,12 +64,14 @@ class load_from_private_log_test : public duplication_test_base ASSERT_TRUE(load._current); ASSERT_EQ(load._current->index(), 1); + load._current = nullptr; load.set_start_decree(50); load.find_log_file_to_start(files); ASSERT_TRUE(load._current); ASSERT_EQ(load._current->index(), 1); int last_idx = files.rbegin()->first; + load._current = nullptr; load.set_start_decree(1000 * 50 + 200); load.find_log_file_to_start(files); ASSERT_TRUE(load._current); From cb9b9b33685fe8ff65613a3d04990ded92c5f6e7 Mon Sep 17 00:00:00 2001 From: neverchanje Date: Mon, 23 Sep 2019 11:43:51 +0800 Subject: [PATCH 4/4] fix on review, fix error handling on error --- .../lib/duplication/load_from_private_log.cpp | 12 +++--------- .../lib/duplication/load_from_private_log.h | 3 +-- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/src/dist/replication/lib/duplication/load_from_private_log.cpp b/src/dist/replication/lib/duplication/load_from_private_log.cpp index 74fecf7223..f7acb9447b 100644 --- a/src/dist/replication/lib/duplication/load_from_private_log.cpp +++ b/src/dist/replication/lib/duplication/load_from_private_log.cpp @@ -113,13 +113,8 @@ void load_from_private_log::replay_log_block() _start_offset, _current_global_end_offset); if (!err.is_ok()) { - // EOF appears only when end of log file is reached. - if (err.code() == ERR_HANDLE_EOF) { - if (switch_to_next_log_file()) { - repeat(); - } else { - repeat(_repeat_delay); - } + if (err.code() == ERR_HANDLE_EOF && switch_to_next_log_file()) { + repeat(); return; } @@ -131,9 +126,8 @@ void load_from_private_log::replay_log_block() err, _current->path(), _start_offset); - start_from_log_file(_current); + find_log_file_to_start(); } - repeat(_repeat_delay); return; } diff --git a/src/dist/replication/lib/duplication/load_from_private_log.h b/src/dist/replication/lib/duplication/load_from_private_log.h index bfbfafa3c9..e3b5cf17ce 100644 --- a/src/dist/replication/lib/duplication/load_from_private_log.h +++ b/src/dist/replication/lib/duplication/load_from_private_log.h @@ -36,9 +36,8 @@ class load_from_private_log : public replica_base, /// ==== Implementation ==== /// + /// Find the log file that contains `_start_decree`. void find_log_file_to_start(); - - /// Find the log file that contains decree `d`. void find_log_file_to_start(std::map log_files); void replay_log_block();