Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

refactor: introduce mutation_log::replay_block #302

Merged
merged 8 commits into from
Sep 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 39 additions & 126 deletions src/dist/replication/lib/mutation_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,14 @@
* THE SOFTWARE.
*/

/*
* Description:
* What is this file about?
*
* Revision history:
* xxxx-xx-xx, author, first version
* xxxx-xx-xx, author, fix bug about xxx
*/

#include "mutation_log.h"
#ifdef _WIN32
#include <io.h>
#endif
#include "replica.h"
#include "mutation_log_utils.h"

#include <dsn/utility/filesystem.h>
#include <dsn/utility/crc.h>
#include <dsn/utility/fail_point.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/tool-api/async_calls.h>
#include <dsn/dist/fmt_logging.h>

Expand Down Expand Up @@ -411,10 +403,7 @@ void mutation_log_private::write_pending_mutations(bool release_lock_required)
dassert(_pending_write != nullptr, "");
dassert(_pending_write->size() > 0, "pending write size = %d", (int)_pending_write->size());
auto pr = mark_new_offset(_pending_write->size(), false);
dassert(pr.second == _pending_write_start_offset,
"%" PRId64 " VS %" PRId64 "",
pr.second,
_pending_write_start_offset);
dcheck_eq_replica(pr.second, _pending_write_start_offset);

_is_writing.store(true, std::memory_order_release);

Expand Down Expand Up @@ -878,7 +867,10 @@ std::pair<log_file_ptr, int64_t> mutation_log::mark_new_offset(size_t size,

if (create_file) {
auto ec = create_new_log_file();
dassert(ec == ERR_OK, "create new log file failed");
dassert_f(ec == ERR_OK,
"{} create new log file failed: {}",
_is_private ? _private_gpid.to_string() : "",
ec);
_switch_file_hint = false;
_switch_file_demand = false;
}
Expand All @@ -892,69 +884,7 @@ std::pair<log_file_ptr, int64_t> mutation_log::mark_new_offset(size_t size,
return std::make_pair(_current_log_file, write_start_offset);
}

/*static*/ error_code mutation_log::replay(log_file_ptr log,
replay_callback callback,
/*out*/ int64_t &end_offset)
{
end_offset = log->start_offset();
ddebug("start to replay mutation log %s, offset = [%" PRId64 ", %" PRId64 "), size = %" PRId64,
log->path().c_str(),
log->start_offset(),
log->end_offset(),
log->end_offset() - log->start_offset());

::dsn::blob bb;
log->reset_stream();
error_code err = log->read_next_log_block(bb);
if (err != ERR_OK) {
return err;
}

std::shared_ptr<binary_reader> reader(new binary_reader(std::move(bb)));
end_offset += sizeof(log_block_header);

// read file header
end_offset += log->read_file_header(*reader);
if (!log->is_right_header()) {
return ERR_INVALID_DATA;
}

while (true) {
while (!reader->is_eof()) {
auto old_size = reader->get_remaining_size();
mutation_ptr mu = mutation::read_from(*reader, nullptr);
dassert(nullptr != mu, "");
mu->set_logged();

if (mu->data.header.log_offset != end_offset) {
derror("offset mismatch in log entry and mutation %" PRId64 " vs %" PRId64,
end_offset,
mu->data.header.log_offset);
err = ERR_INVALID_DATA;
break;
}

int log_length = old_size - reader->get_remaining_size();

callback(log_length, mu);

end_offset += log_length;
}

err = log->read_next_log_block(bb);
if (err != ERR_OK) {
// if an error occurs in an log mutation block, then the replay log is stopped
break;
}

reader.reset(new binary_reader(std::move(bb)));
end_offset += sizeof(log_block_header);
}

ddebug("finish to replay mutation log %s, err = %s", log->path().c_str(), err.to_string());
return err;
}

// TODO(wutao1): move it to mutation_log_replay.cpp
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为什么要把replay函数都移到mutation_log_replay.cpp中?如果要移,就这个pr移过去吧,现在两个在mutation_log.cpp,两个在mutation_log_replay.cpp

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个 PR 重构关键路径,另外两个虽然没有移但已经加了 TODO。移是因为这个代码已经两千行了。

Copy link
Contributor

@hycdong hycdong Sep 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mutation_log.cpp文件有2000多行了,replay_log的代码最多500行,如果是为了精简文件行数,可以后面来拆,可能有更好的精简方法,这个pr就专注重构

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯,这里拆成两个文件也是方便对比,我发现直接在原来文件改反而更不容易看

/*static*/ error_code mutation_log::replay(std::vector<std::string> &log_files,
replay_callback callback,
/*out*/ int64_t &end_offset)
Expand All @@ -981,6 +911,7 @@ std::pair<log_file_ptr, int64_t> mutation_log::mark_new_offset(size_t size,
return replay(logs, callback, end_offset);
}

// TODO(wutao1): move it to mutation_log_replay.cpp
/*static*/ error_code mutation_log::replay(std::map<int, log_file_ptr> &logs,
replay_callback callback,
/*out*/ int64_t &end_offset)
Expand All @@ -989,20 +920,16 @@ std::pair<log_file_ptr, int64_t> mutation_log::mark_new_offset(size_t size,
int64_t g_end_offset = 0;
error_code err = ERR_OK;
log_file_ptr last;
int last_file_index = 0;

if (logs.size() > 0) {
g_start_offset = logs.begin()->second->start_offset();
g_end_offset = logs.rbegin()->second->end_offset();
last_file_index = logs.begin()->first - 1;
}

// check file index continuity
for (auto &kv : logs) {
if (++last_file_index != kv.first) {
derror("log file missing with index %u", last_file_index);
return ERR_OBJECT_NOT_FOUND;
}
error_s error = log_utils::check_log_files_continuity(logs);
if (!error.is_ok()) {
derror_f("check_log_files_continuity failed: {}", error);
return error.code();
}

end_offset = g_start_offset;
Expand Down Expand Up @@ -1074,38 +1001,6 @@ decree mutation_log::max_commit_on_disk() const
return _private_max_commit_on_disk;
}

decree mutation_log::max_gced_decree(gpid gpid, int64_t valid_start_offset) const
{
check_valid_start_offset(gpid, valid_start_offset);

zauto_lock l(_lock);

if (_log_files.size() == 0) {
if (_is_private)
return _private_log_info.max_decree;
else {
auto it = _shared_log_info_map.find(gpid);
if (it != _shared_log_info_map.end())
return it->second.max_decree;
else
return 0;
}
} else {
for (auto &log : _log_files) {
// when invalid log exits, all new logs are preserved (not gced)
if (valid_start_offset > log.second->start_offset())
return 0;

auto it = log.second->previous_log_max_decrees().find(gpid);
if (it != log.second->previous_log_max_decrees().end()) {
return it->second.max_decree;
} else
return 0;
}
return 0;
}
}

void mutation_log::check_valid_start_offset(gpid gpid, int64_t valid_start_offset) const
{
zauto_lock l(_lock);
Expand Down Expand Up @@ -1206,7 +1101,7 @@ void mutation_log::update_max_decree_no_lock(gpid gpid, decree d)
dassert(false, "replica has not been registered in the log before");
}
} else {
dassert(gpid == _private_gpid, "replica gpid does not match");
dcheck_eq(gpid, _private_gpid);
if (d > _private_log_info.max_decree) {
_private_log_info.max_decree = d;
}
Expand Down Expand Up @@ -1248,6 +1143,11 @@ bool mutation_log::get_learn_state(gpid gpid, decree start, /*out*/ learn_state

if (state.meta.length() == 0 && start > _private_log_info.max_decree) {
// no memory data and no disk data
ddebug_f("gpid({}) get_learn_state returns false"
"learn_start_decree={}, max_decree_in_private_log={}",
gpid,
start,
_private_log_info.max_decree);
return false;
}

Expand Down Expand Up @@ -1910,6 +1810,9 @@ class log_file::file_streamer
}
fill_buffers();
}

// TODO(wutao1): use string_view instead of using blob.
// WARNING: the resulted blob is not guaranteed to be reference counted.
// possible error_code:
// ERR_OK result would always size as expected
// ERR_HANDLE_EOF if there are not enough data in file. result would still be
Expand Down Expand Up @@ -2144,11 +2047,11 @@ log_file::~log_file() { close(); }

log_file::log_file(
const char *path, disk_file *handle, int index, int64_t start_offset, bool is_read)
: _is_read(is_read)
{
_start_offset = start_offset;
_end_offset = start_offset;
_handle = handle;
_is_read = is_read;
_path = path;
_index = index;
_crc32 = 0;
Expand All @@ -2166,6 +2069,8 @@ log_file::log_file(

void log_file::close()
{
zauto_lock lock(_write_lock);

//_stream implicitly refer to _handle so it needs to be cleaned up first.
// TODO: We need better abstraction to avoid those manual stuffs..
_stream.reset(nullptr);
Expand All @@ -2180,6 +2085,7 @@ void log_file::close()
void log_file::flush() const
{
dassert(!_is_read, "log file must be of write mode");
zauto_lock lock(_write_lock);

if (_handle) {
error_code err = file::flush(_handle);
Expand Down Expand Up @@ -2260,6 +2166,11 @@ aio_task_ptr log_file::commit_log_block(log_block &block,
dassert(!_is_read, "log file must be of write mode");
dassert(block.size() > 0, "log_block can not be empty");

zauto_lock lock(_write_lock);
if (!_handle) {
return nullptr;
}

auto size = (long long)block.size();
int64_t local_offset = offset - start_offset();
auto hdr = reinterpret_cast<log_block_header *>(const_cast<char *>(block.front().data()));
Expand Down Expand Up @@ -2311,14 +2222,16 @@ aio_task_ptr log_file::commit_log_block(log_block &block,
return tsk;
}

void log_file::reset_stream()
void log_file::reset_stream(size_t offset /*default = 0*/)
hycdong marked this conversation as resolved.
Show resolved Hide resolved
{
if (_stream == nullptr) {
_stream.reset(new file_streamer(_handle, 0));
_stream.reset(new file_streamer(_handle, offset));
} else {
_stream->reset(0);
_stream->reset(offset);
}
if (offset == 0) {
_crc32 = 0;
}
_crc32 = 0;
}

decree log_file::previous_log_max_decree(const dsn::gpid &pid)
Expand Down
Loading