From f4947c481baa17d9e54933d3cdc497dcaea548c0 Mon Sep 17 00:00:00 2001 From: Wu Tao Date: Mon, 16 Sep 2019 18:34:15 +0800 Subject: [PATCH] refactor: introduce mutation_log::replay_block (#302) --- src/dist/replication/lib/mutation_log.cpp | 165 +++++------------- src/dist/replication/lib/mutation_log.h | 53 ++++-- .../replication/lib/mutation_log_replay.cpp | 102 +++++++++++ .../replication/lib/mutation_log_utils.cpp | 98 +++++++++++ src/dist/replication/lib/mutation_log_utils.h | 82 +++++++++ src/dist/replication/lib/replica.cpp | 54 ------ 6 files changed, 364 insertions(+), 190 deletions(-) create mode 100644 src/dist/replication/lib/mutation_log_replay.cpp create mode 100644 src/dist/replication/lib/mutation_log_utils.cpp create mode 100644 src/dist/replication/lib/mutation_log_utils.h diff --git a/src/dist/replication/lib/mutation_log.cpp b/src/dist/replication/lib/mutation_log.cpp index 860f5020d3..de7a4aa7c6 100644 --- a/src/dist/replication/lib/mutation_log.cpp +++ b/src/dist/replication/lib/mutation_log.cpp @@ -24,22 +24,14 @@ * THE SOFTWARE. */ -/* - * Description: - * What is this file about? - * - * Revision history: - * xxxx-xx-xx, author, first version - * xxxx-xx-xx, author, fix bug about xxx - */ - #include "mutation_log.h" -#ifdef _WIN32 -#include -#endif #include "replica.h" +#include "mutation_log_utils.h" + #include #include +#include +#include #include #include @@ -411,10 +403,7 @@ void mutation_log_private::write_pending_mutations(bool release_lock_required) dassert(_pending_write != nullptr, ""); dassert(_pending_write->size() > 0, "pending write size = %d", (int)_pending_write->size()); auto pr = mark_new_offset(_pending_write->size(), false); - dassert(pr.second == _pending_write_start_offset, - "%" PRId64 " VS %" PRId64 "", - pr.second, - _pending_write_start_offset); + dcheck_eq_replica(pr.second, _pending_write_start_offset); _is_writing.store(true, std::memory_order_release); @@ -878,7 +867,10 @@ std::pair mutation_log::mark_new_offset(size_t size, if (create_file) { auto ec = create_new_log_file(); - dassert(ec == ERR_OK, "create new log file failed"); + dassert_f(ec == ERR_OK, + "{} create new log file failed: {}", + _is_private ? _private_gpid.to_string() : "", + ec); _switch_file_hint = false; _switch_file_demand = false; } @@ -892,69 +884,7 @@ std::pair mutation_log::mark_new_offset(size_t size, return std::make_pair(_current_log_file, write_start_offset); } -/*static*/ error_code mutation_log::replay(log_file_ptr log, - replay_callback callback, - /*out*/ int64_t &end_offset) -{ - end_offset = log->start_offset(); - ddebug("start to replay mutation log %s, offset = [%" PRId64 ", %" PRId64 "), size = %" PRId64, - log->path().c_str(), - log->start_offset(), - log->end_offset(), - log->end_offset() - log->start_offset()); - - ::dsn::blob bb; - log->reset_stream(); - error_code err = log->read_next_log_block(bb); - if (err != ERR_OK) { - return err; - } - - std::shared_ptr reader(new binary_reader(std::move(bb))); - end_offset += sizeof(log_block_header); - - // read file header - end_offset += log->read_file_header(*reader); - if (!log->is_right_header()) { - return ERR_INVALID_DATA; - } - - while (true) { - while (!reader->is_eof()) { - auto old_size = reader->get_remaining_size(); - mutation_ptr mu = mutation::read_from(*reader, nullptr); - dassert(nullptr != mu, ""); - mu->set_logged(); - - if (mu->data.header.log_offset != end_offset) { - derror("offset mismatch in log entry and mutation %" PRId64 " vs %" PRId64, - end_offset, - mu->data.header.log_offset); - err = ERR_INVALID_DATA; - break; - } - - int log_length = old_size - reader->get_remaining_size(); - - callback(log_length, mu); - - end_offset += log_length; - } - - err = log->read_next_log_block(bb); - if (err != ERR_OK) { - // if an error occurs in an log mutation block, then the replay log is stopped - break; - } - - reader.reset(new binary_reader(std::move(bb))); - end_offset += sizeof(log_block_header); - } - - ddebug("finish to replay mutation log %s, err = %s", log->path().c_str(), err.to_string()); - return err; -} - +// TODO(wutao1): move it to mutation_log_replay.cpp /*static*/ error_code mutation_log::replay(std::vector &log_files, replay_callback callback, /*out*/ int64_t &end_offset) @@ -981,6 +911,7 @@ std::pair mutation_log::mark_new_offset(size_t size, return replay(logs, callback, end_offset); } +// TODO(wutao1): move it to mutation_log_replay.cpp /*static*/ error_code mutation_log::replay(std::map &logs, replay_callback callback, /*out*/ int64_t &end_offset) @@ -989,20 +920,16 @@ std::pair mutation_log::mark_new_offset(size_t size, int64_t g_end_offset = 0; error_code err = ERR_OK; log_file_ptr last; - int last_file_index = 0; if (logs.size() > 0) { g_start_offset = logs.begin()->second->start_offset(); g_end_offset = logs.rbegin()->second->end_offset(); - last_file_index = logs.begin()->first - 1; } - // check file index continuity - for (auto &kv : logs) { - if (++last_file_index != kv.first) { - derror("log file missing with index %u", last_file_index); - return ERR_OBJECT_NOT_FOUND; - } + error_s error = log_utils::check_log_files_continuity(logs); + if (!error.is_ok()) { + derror_f("check_log_files_continuity failed: {}", error); + return error.code(); } end_offset = g_start_offset; @@ -1074,38 +1001,6 @@ decree mutation_log::max_commit_on_disk() const return _private_max_commit_on_disk; } -decree mutation_log::max_gced_decree(gpid gpid, int64_t valid_start_offset) const -{ - check_valid_start_offset(gpid, valid_start_offset); - - zauto_lock l(_lock); - - if (_log_files.size() == 0) { - if (_is_private) - return _private_log_info.max_decree; - else { - auto it = _shared_log_info_map.find(gpid); - if (it != _shared_log_info_map.end()) - return it->second.max_decree; - else - return 0; - } - } else { - for (auto &log : _log_files) { - // when invalid log exits, all new logs are preserved (not gced) - if (valid_start_offset > log.second->start_offset()) - return 0; - - auto it = log.second->previous_log_max_decrees().find(gpid); - if (it != log.second->previous_log_max_decrees().end()) { - return it->second.max_decree; - } else - return 0; - } - return 0; - } -} - void mutation_log::check_valid_start_offset(gpid gpid, int64_t valid_start_offset) const { zauto_lock l(_lock); @@ -1206,7 +1101,7 @@ void mutation_log::update_max_decree_no_lock(gpid gpid, decree d) dassert(false, "replica has not been registered in the log before"); } } else { - dassert(gpid == _private_gpid, "replica gpid does not match"); + dcheck_eq(gpid, _private_gpid); if (d > _private_log_info.max_decree) { _private_log_info.max_decree = d; } @@ -1248,6 +1143,11 @@ bool mutation_log::get_learn_state(gpid gpid, decree start, /*out*/ learn_state if (state.meta.length() == 0 && start > _private_log_info.max_decree) { // no memory data and no disk data + ddebug_f("gpid({}) get_learn_state returns false" + "learn_start_decree={}, max_decree_in_private_log={}", + gpid, + start, + _private_log_info.max_decree); return false; } @@ -1910,6 +1810,9 @@ class log_file::file_streamer } fill_buffers(); } + + // TODO(wutao1): use string_view instead of using blob. + // WARNING: the resulted blob is not guaranteed to be reference counted. // possible error_code: // ERR_OK result would always size as expected // ERR_HANDLE_EOF if there are not enough data in file. result would still be @@ -2144,11 +2047,11 @@ log_file::~log_file() { close(); } log_file::log_file( const char *path, disk_file *handle, int index, int64_t start_offset, bool is_read) + : _is_read(is_read) { _start_offset = start_offset; _end_offset = start_offset; _handle = handle; - _is_read = is_read; _path = path; _index = index; _crc32 = 0; @@ -2166,6 +2069,8 @@ log_file::log_file( void log_file::close() { + zauto_lock lock(_write_lock); + //_stream implicitly refer to _handle so it needs to be cleaned up first. // TODO: We need better abstraction to avoid those manual stuffs.. _stream.reset(nullptr); @@ -2180,6 +2085,7 @@ void log_file::close() void log_file::flush() const { dassert(!_is_read, "log file must be of write mode"); + zauto_lock lock(_write_lock); if (_handle) { error_code err = file::flush(_handle); @@ -2260,6 +2166,11 @@ aio_task_ptr log_file::commit_log_block(log_block &block, dassert(!_is_read, "log file must be of write mode"); dassert(block.size() > 0, "log_block can not be empty"); + zauto_lock lock(_write_lock); + if (!_handle) { + return nullptr; + } + auto size = (long long)block.size(); int64_t local_offset = offset - start_offset(); auto hdr = reinterpret_cast(const_cast(block.front().data())); @@ -2311,14 +2222,16 @@ aio_task_ptr log_file::commit_log_block(log_block &block, return tsk; } -void log_file::reset_stream() +void log_file::reset_stream(size_t offset /*default = 0*/) { if (_stream == nullptr) { - _stream.reset(new file_streamer(_handle, 0)); + _stream.reset(new file_streamer(_handle, offset)); } else { - _stream->reset(0); + _stream->reset(offset); + } + if (offset == 0) { + _crc32 = 0; } - _crc32 = 0; } decree log_file::previous_log_max_decree(const dsn::gpid &pid) diff --git a/src/dist/replication/lib/mutation_log.h b/src/dist/replication/lib/mutation_log.h index 8f4f1c3045..c0b4198f97 100644 --- a/src/dist/replication/lib/mutation_log.h +++ b/src/dist/replication/lib/mutation_log.h @@ -36,9 +36,11 @@ #pragma once #include "dist/replication/common/replication_common.h" -#include "mutation.h" +#include "dist/replication/lib/mutation.h" + #include #include +#include #include #include @@ -126,9 +128,10 @@ class replica; class mutation_log : public ref_counter { public: - // return true when the mutation's offset is not less than - // the remembered (shared or private) valid_start_offset therefore valid for the replica + // DEPRECATED: The returned bool value will never be evaluated. + // Always return true in the callback. typedef std::function replay_callback; + typedef std::function io_failure_callback; public: @@ -198,6 +201,32 @@ class mutation_log : public ref_counter replay_callback callback, /*out*/ int64_t &end_offset); + // Reads a series of mutations from the log file (from `start_offset` of `log`), + // and iterates over the mutations, executing the provided `callback` for each + // mutation entry. + // Since the logs are packed into multiple blocks, this function retrieves + // only one log block at a time. The size of block depends on configuration + // `log_private_batch_buffer_kb` and `log_private_batch_buffer_count`. + // + // Parameters: + // - callback: the callback to execute for each mutation. + // - start_offset: file offset to start. + // + // Returns: + // - ERR_INVALID_DATA: if the loaded data is incorrect or invalid. + // + static error_s replay_block(log_file_ptr &log, + replay_callback &callback, + size_t start_offset, + /*out*/ int64_t &end_offset); + static error_s replay_block(log_file_ptr &log, + replay_callback &&callback, + size_t start_offset, + /*out*/ int64_t &end_offset) + { + return replay_block(log, callback, start_offset, end_offset); + } + // // maintain max_decree & valid_start_offset // @@ -295,10 +324,6 @@ class mutation_log : public ref_counter // thread safe decree max_commit_on_disk() const; - // maximum decree that is garbage collected - // thread safe - decree max_gced_decree(gpid gpid, int64_t valid_start_offset) const; - // thread-safe std::map get_log_file_map() const; @@ -312,6 +337,8 @@ class mutation_log : public ref_counter void hint_switch_file() { _switch_file_hint = true; } void demand_switch_file() { _switch_file_demand = true; } + task_tracker *tracker() { return &_tracker; } + protected: // thread-safe // 'size' is data size to write; the '_global_end_offset' will be updated by 'size'. @@ -627,8 +654,10 @@ class log_file : public ref_counter // // others // - // reset file_streamer to point to the start of this log file. - void reset_stream(); + + // Reset file_streamer to point to `offset`. + // offset=0 means the start of this log file. + void reset_stream(size_t offset = 0); // end offset in the global space: end_offset = start_offset + file_size int64_t end_offset() const { return _end_offset.load(); } // start offset in the global space @@ -657,6 +686,8 @@ class log_file : public ref_counter void set_last_write_time(uint64_t last_write_time) { _last_write_time = last_write_time; } uint64_t last_write_time() const { return _last_write_time; } + const disk_file *file_handle() const { return _handle; } + private: // make private, user should create log_file through open_read() or open_write() log_file(const char *path, disk_file *handle, int index, int64_t start_offset, bool is_read); @@ -671,12 +702,14 @@ class log_file : public ref_counter class file_streamer; std::unique_ptr _stream; disk_file *_handle; // file handle - bool _is_read; // if opened for read or write + const bool _is_read; // if opened for read or write std::string _path; // file path int _index; // file index log_file_header _header; // file header uint64_t _last_write_time; // seconds from epoch time + mutable zlock _write_lock; + // this data is used for garbage collection, and is part of file header. // for read, the value is read from file header. // for write, the value is set by write_file_header(). diff --git a/src/dist/replication/lib/mutation_log_replay.cpp b/src/dist/replication/lib/mutation_log_replay.cpp new file mode 100644 index 0000000000..a1eb662e04 --- /dev/null +++ b/src/dist/replication/lib/mutation_log_replay.cpp @@ -0,0 +1,102 @@ +// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include "dist/replication/lib/mutation_log.h" +#include +#include +#include + +namespace dsn { +namespace replication { + +/*static*/ error_code mutation_log::replay(log_file_ptr log, + replay_callback callback, + /*out*/ int64_t &end_offset) +{ + end_offset = log->start_offset(); + ddebug("start to replay mutation log %s, offset = [%" PRId64 ", %" PRId64 "), size = %" PRId64, + log->path().c_str(), + log->start_offset(), + log->end_offset(), + log->end_offset() - log->start_offset()); + + ::dsn::blob bb; + log->reset_stream(); + error_s err; + size_t start_offset = 0; + while (true) { + err = replay_block(log, callback, start_offset, end_offset); + if (!err.is_ok()) { + // Stop immediately if failed + break; + } + + start_offset = static_cast(end_offset - log->start_offset()); + } + + ddebug("finish to replay mutation log (%s) [err: %s]", + log->path().c_str(), + err.description().c_str()); + return err.code(); +} + +/*static*/ error_s mutation_log::replay_block(log_file_ptr &log, + replay_callback &callback, + size_t start_offset, + int64_t &end_offset) +{ + FAIL_POINT_INJECT_F("mutation_log_replay_block", [](string_view) -> error_s { + return error_s::make(ERR_INCOMPLETE_DATA, "mutation_log_replay_block"); + }); + + blob bb; + std::unique_ptr reader; + + log->reset_stream(start_offset); // start reading from given offset + int64_t global_start_offset = start_offset + log->start_offset(); + end_offset = global_start_offset; // reset end_offset to the start. + + // reads the entire block into memory + error_code err = log->read_next_log_block(bb); + if (err != ERR_OK) { + return error_s::make(err, "failed to read log block"); + } + + reader = dsn::make_unique(bb); + end_offset += sizeof(log_block_header); + + // The first block is log_file_header. + if (global_start_offset == log->start_offset()) { + end_offset += log->read_file_header(*reader); + if (!log->is_right_header()) { + return error_s::make(ERR_INVALID_DATA, "failed to read log file header"); + } + // continue to parsing the data block + } + + while (!reader->is_eof()) { + auto old_size = reader->get_remaining_size(); + mutation_ptr mu = mutation::read_from(*reader, nullptr); + dassert(nullptr != mu, ""); + mu->set_logged(); + + if (mu->data.header.log_offset != end_offset) { + return FMT_ERR(ERR_INVALID_DATA, + "offset mismatch in log entry and mutation {} vs {}", + end_offset, + mu->data.header.log_offset); + } + + int log_length = old_size - reader->get_remaining_size(); + + callback(log_length, mu); + + end_offset += log_length; + } + + return error_s::ok(); +} + +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/lib/mutation_log_utils.cpp b/src/dist/replication/lib/mutation_log_utils.cpp new file mode 100644 index 0000000000..96a6134839 --- /dev/null +++ b/src/dist/replication/lib/mutation_log_utils.cpp @@ -0,0 +1,98 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) 2015 Microsoft Corporation + * + * -=- Robust Distributed System Nucleus (rDSN) -=- + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include +#include + +#include "mutation_log_utils.h" + +namespace dsn { +namespace replication { +namespace log_utils { + +/*extern*/ error_s open_read(string_view path, /*out*/ log_file_ptr &file) +{ + FAIL_POINT_INJECT_F("open_read", [](string_view) -> error_s { + return error_s::make(ERR_FILE_OPERATION_FAILED, "open_read"); + }); + + error_code ec; + file = log_file::open_read(path.data(), ec); + if (ec != ERR_OK) { + return FMT_ERR(ec, "failed to open the log file ({})", path); + } + return error_s::ok(); +} + +/*extern*/ error_s list_all_files(const std::string &dir, /*out*/ std::vector &files) +{ + FAIL_POINT_INJECT_F("list_all_files", [](string_view) -> error_s { + return error_s::make(ERR_FILE_OPERATION_FAILED, "list_all_files"); + }); + + if (!utils::filesystem::get_subfiles(dir, files, false)) { + return FMT_ERR( + ERR_FILE_OPERATION_FAILED, "unable to list the files under directory ({})", dir); + } + return error_s::ok(); +} + +/*extern*/ +error_s check_log_files_continuity(const std::map &logs) +{ + if (logs.empty()) { + return error_s::ok(); + } + + int last_file_index = logs.begin()->first - 1; + for (const auto &kv : logs) { + if (++last_file_index != kv.first) { + // this is a serious error, print all the files in list. + std::string all_log_files_str; + bool first = true; + for (const auto &id_file : logs) { + if (!first) { + all_log_files_str += ", "; + } + first = false; + all_log_files_str += fmt::format( + "log.{}.{}", id_file.second->index(), id_file.second->start_offset()); + } + + return FMT_ERR( + ERR_OBJECT_NOT_FOUND, + "log file missing with index {}. Here are all the files under dir({}): [{}]", + last_file_index, + logs.begin()->second->path(), + all_log_files_str); + } + } + return error_s::ok(); +} + +} // namespace log_utils +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/lib/mutation_log_utils.h b/src/dist/replication/lib/mutation_log_utils.h new file mode 100644 index 0000000000..e8a05eb5cb --- /dev/null +++ b/src/dist/replication/lib/mutation_log_utils.h @@ -0,0 +1,82 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) 2015 Microsoft Corporation + * + * -=- Robust Distributed System Nucleus (rDSN) -=- + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#pragma once + +#include +#include +#include + +#include "dist/replication/lib/mutation_log.h" + +namespace dsn { +namespace replication { +namespace log_utils { + +extern error_s open_read(string_view path, /*out*/ log_file_ptr &file); + +extern error_s list_all_files(const std::string &dir, /*out*/ std::vector &files); + +inline error_s open_log_file_map(const std::vector &log_files, + /*out*/ std::map &log_file_map) +{ + for (const std::string &fname : log_files) { + log_file_ptr lf; + error_s err = open_read(fname, lf); + if (!err.is_ok()) { + return err << "open_log_file_map(log_files)"; + } + log_file_map[lf->index()] = lf; + } + return error_s::ok(); +} + +inline error_s open_log_file_map(const std::string &dir, + /*out*/ std::map &log_file_map) +{ + std::vector log_files; + error_s es = list_all_files(dir, log_files); + if (!es.is_ok()) { + return es << "open_log_file_map(dir)"; + } + return open_log_file_map(log_files, log_file_map) << "open_log_file_map(dir)"; +} + +extern error_s check_log_files_continuity(const std::map &logs); + +inline error_s check_log_files_continuity(const std::string &dir) +{ + std::map log_file_map; + error_s es = open_log_file_map(dir, log_file_map); + if (!es.is_ok()) { + return es << "check_log_files_continuity(dir)"; + } + return check_log_files_continuity(log_file_map) << "check_log_files_continuity(dir)"; +} + +} // namespace log_utils +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/lib/replica.cpp b/src/dist/replication/lib/replica.cpp index 817de851a7..a22b5af9cf 100644 --- a/src/dist/replication/lib/replica.cpp +++ b/src/dist/replication/lib/replica.cpp @@ -177,46 +177,6 @@ void replica::response_client_write(dsn::message_ex *request, error_code error) _stub->response_client(get_gpid(), false, request, status(), error); } -// error_code replica::check_and_fix_private_log_completeness() -//{ -// error_code err = ERR_OK; -// -// auto mind = _private_log->max_gced_decree(get_gpid()); -// if (_prepare_list->max_decree()) -// -// if (!(mind <= last_durable_decree())) -// { -// err = ERR_INCOMPLETE_DATA; -// derror("%s: private log is incomplete (gced/durable): %" PRId64 " vs %" PRId64, -// name(), -// mind, -// last_durable_decree() -// ); -// } -// else -// { -// mind = _private_log->max_decree(get_gpid()); -// if (!(mind >= _app->last_committed_decree())) -// { -// err = ERR_INCOMPLETE_DATA; -// derror("%s: private log is incomplete (max/commit): %" PRId64 " vs %" PRId64, -// name(), -// mind, -// _app->last_committed_decree() -// ); -// } -// } -// -// if (ERR_INCOMPLETE_DATA == err) -// { -// _private_log->close(true); -// _private_log->open(nullptr); -// _private_log->set_private(get_gpid(), _app->last_durable_decree()); -// } -// -// return err; -//} - void replica::check_state_completeness() { /* prepare commit durable */ @@ -228,20 +188,6 @@ void replica::check_state_completeness() "%" PRId64 " VS %" PRId64 "", last_committed_decree(), last_durable_decree()); - - /* - auto mind = _stub->_log->max_gced_decree(get_gpid(), - _app->init_info().init_offset_in_shared_log); - dassert(mind <= last_durable_decree(), "%" PRId64 " VS %" PRId64, mind, last_durable_decree()); - - if (_private_log != nullptr) - { - auto mind = _private_log->max_gced_decree(get_gpid(), - _app->init_info().init_offset_in_private_log); - dassert(mind <= last_durable_decree(), "%" PRId64 " VS %" PRId64, mind, - last_durable_decree()); - } - */ } void replica::execute_mutation(mutation_ptr &mu)