Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
refactor: introduce mutation_log::replay_block (#302)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wu Tao authored and qinzuoyan committed Sep 16, 2019
1 parent ec725ed commit f4947c4
Show file tree
Hide file tree
Showing 6 changed files with 364 additions and 190 deletions.
165 changes: 39 additions & 126 deletions src/dist/replication/lib/mutation_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,14 @@
* THE SOFTWARE.
*/

/*
* Description:
* What is this file about?
*
* Revision history:
* xxxx-xx-xx, author, first version
* xxxx-xx-xx, author, fix bug about xxx
*/

#include "mutation_log.h"
#ifdef _WIN32
#include <io.h>
#endif
#include "replica.h"
#include "mutation_log_utils.h"

#include <dsn/utility/filesystem.h>
#include <dsn/utility/crc.h>
#include <dsn/utility/fail_point.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/tool-api/async_calls.h>
#include <dsn/dist/fmt_logging.h>

Expand Down Expand Up @@ -411,10 +403,7 @@ void mutation_log_private::write_pending_mutations(bool release_lock_required)
dassert(_pending_write != nullptr, "");
dassert(_pending_write->size() > 0, "pending write size = %d", (int)_pending_write->size());
auto pr = mark_new_offset(_pending_write->size(), false);
dassert(pr.second == _pending_write_start_offset,
"%" PRId64 " VS %" PRId64 "",
pr.second,
_pending_write_start_offset);
dcheck_eq_replica(pr.second, _pending_write_start_offset);

_is_writing.store(true, std::memory_order_release);

Expand Down Expand Up @@ -878,7 +867,10 @@ std::pair<log_file_ptr, int64_t> mutation_log::mark_new_offset(size_t size,

if (create_file) {
auto ec = create_new_log_file();
dassert(ec == ERR_OK, "create new log file failed");
dassert_f(ec == ERR_OK,
"{} create new log file failed: {}",
_is_private ? _private_gpid.to_string() : "",
ec);
_switch_file_hint = false;
_switch_file_demand = false;
}
Expand All @@ -892,69 +884,7 @@ std::pair<log_file_ptr, int64_t> mutation_log::mark_new_offset(size_t size,
return std::make_pair(_current_log_file, write_start_offset);
}

/*static*/ error_code mutation_log::replay(log_file_ptr log,
replay_callback callback,
/*out*/ int64_t &end_offset)
{
end_offset = log->start_offset();
ddebug("start to replay mutation log %s, offset = [%" PRId64 ", %" PRId64 "), size = %" PRId64,
log->path().c_str(),
log->start_offset(),
log->end_offset(),
log->end_offset() - log->start_offset());

::dsn::blob bb;
log->reset_stream();
error_code err = log->read_next_log_block(bb);
if (err != ERR_OK) {
return err;
}

std::shared_ptr<binary_reader> reader(new binary_reader(std::move(bb)));
end_offset += sizeof(log_block_header);

// read file header
end_offset += log->read_file_header(*reader);
if (!log->is_right_header()) {
return ERR_INVALID_DATA;
}

while (true) {
while (!reader->is_eof()) {
auto old_size = reader->get_remaining_size();
mutation_ptr mu = mutation::read_from(*reader, nullptr);
dassert(nullptr != mu, "");
mu->set_logged();

if (mu->data.header.log_offset != end_offset) {
derror("offset mismatch in log entry and mutation %" PRId64 " vs %" PRId64,
end_offset,
mu->data.header.log_offset);
err = ERR_INVALID_DATA;
break;
}

int log_length = old_size - reader->get_remaining_size();

callback(log_length, mu);

end_offset += log_length;
}

err = log->read_next_log_block(bb);
if (err != ERR_OK) {
// if an error occurs in an log mutation block, then the replay log is stopped
break;
}

reader.reset(new binary_reader(std::move(bb)));
end_offset += sizeof(log_block_header);
}

ddebug("finish to replay mutation log %s, err = %s", log->path().c_str(), err.to_string());
return err;
}

// TODO(wutao1): move it to mutation_log_replay.cpp
/*static*/ error_code mutation_log::replay(std::vector<std::string> &log_files,
replay_callback callback,
/*out*/ int64_t &end_offset)
Expand All @@ -981,6 +911,7 @@ std::pair<log_file_ptr, int64_t> mutation_log::mark_new_offset(size_t size,
return replay(logs, callback, end_offset);
}

// TODO(wutao1): move it to mutation_log_replay.cpp
/*static*/ error_code mutation_log::replay(std::map<int, log_file_ptr> &logs,
replay_callback callback,
/*out*/ int64_t &end_offset)
Expand All @@ -989,20 +920,16 @@ std::pair<log_file_ptr, int64_t> mutation_log::mark_new_offset(size_t size,
int64_t g_end_offset = 0;
error_code err = ERR_OK;
log_file_ptr last;
int last_file_index = 0;

if (logs.size() > 0) {
g_start_offset = logs.begin()->second->start_offset();
g_end_offset = logs.rbegin()->second->end_offset();
last_file_index = logs.begin()->first - 1;
}

// check file index continuity
for (auto &kv : logs) {
if (++last_file_index != kv.first) {
derror("log file missing with index %u", last_file_index);
return ERR_OBJECT_NOT_FOUND;
}
error_s error = log_utils::check_log_files_continuity(logs);
if (!error.is_ok()) {
derror_f("check_log_files_continuity failed: {}", error);
return error.code();
}

end_offset = g_start_offset;
Expand Down Expand Up @@ -1074,38 +1001,6 @@ decree mutation_log::max_commit_on_disk() const
return _private_max_commit_on_disk;
}

decree mutation_log::max_gced_decree(gpid gpid, int64_t valid_start_offset) const
{
check_valid_start_offset(gpid, valid_start_offset);

zauto_lock l(_lock);

if (_log_files.size() == 0) {
if (_is_private)
return _private_log_info.max_decree;
else {
auto it = _shared_log_info_map.find(gpid);
if (it != _shared_log_info_map.end())
return it->second.max_decree;
else
return 0;
}
} else {
for (auto &log : _log_files) {
// when invalid log exits, all new logs are preserved (not gced)
if (valid_start_offset > log.second->start_offset())
return 0;

auto it = log.second->previous_log_max_decrees().find(gpid);
if (it != log.second->previous_log_max_decrees().end()) {
return it->second.max_decree;
} else
return 0;
}
return 0;
}
}

void mutation_log::check_valid_start_offset(gpid gpid, int64_t valid_start_offset) const
{
zauto_lock l(_lock);
Expand Down Expand Up @@ -1206,7 +1101,7 @@ void mutation_log::update_max_decree_no_lock(gpid gpid, decree d)
dassert(false, "replica has not been registered in the log before");
}
} else {
dassert(gpid == _private_gpid, "replica gpid does not match");
dcheck_eq(gpid, _private_gpid);
if (d > _private_log_info.max_decree) {
_private_log_info.max_decree = d;
}
Expand Down Expand Up @@ -1248,6 +1143,11 @@ bool mutation_log::get_learn_state(gpid gpid, decree start, /*out*/ learn_state

if (state.meta.length() == 0 && start > _private_log_info.max_decree) {
// no memory data and no disk data
ddebug_f("gpid({}) get_learn_state returns false"
"learn_start_decree={}, max_decree_in_private_log={}",
gpid,
start,
_private_log_info.max_decree);
return false;
}

Expand Down Expand Up @@ -1910,6 +1810,9 @@ class log_file::file_streamer
}
fill_buffers();
}

// TODO(wutao1): use string_view instead of using blob.
// WARNING: the resulted blob is not guaranteed to be reference counted.
// possible error_code:
// ERR_OK result would always size as expected
// ERR_HANDLE_EOF if there are not enough data in file. result would still be
Expand Down Expand Up @@ -2144,11 +2047,11 @@ log_file::~log_file() { close(); }

log_file::log_file(
const char *path, disk_file *handle, int index, int64_t start_offset, bool is_read)
: _is_read(is_read)
{
_start_offset = start_offset;
_end_offset = start_offset;
_handle = handle;
_is_read = is_read;
_path = path;
_index = index;
_crc32 = 0;
Expand All @@ -2166,6 +2069,8 @@ log_file::log_file(

void log_file::close()
{
zauto_lock lock(_write_lock);

//_stream implicitly refer to _handle so it needs to be cleaned up first.
// TODO: We need better abstraction to avoid those manual stuffs..
_stream.reset(nullptr);
Expand All @@ -2180,6 +2085,7 @@ void log_file::close()
void log_file::flush() const
{
dassert(!_is_read, "log file must be of write mode");
zauto_lock lock(_write_lock);

if (_handle) {
error_code err = file::flush(_handle);
Expand Down Expand Up @@ -2260,6 +2166,11 @@ aio_task_ptr log_file::commit_log_block(log_block &block,
dassert(!_is_read, "log file must be of write mode");
dassert(block.size() > 0, "log_block can not be empty");

zauto_lock lock(_write_lock);
if (!_handle) {
return nullptr;
}

auto size = (long long)block.size();
int64_t local_offset = offset - start_offset();
auto hdr = reinterpret_cast<log_block_header *>(const_cast<char *>(block.front().data()));
Expand Down Expand Up @@ -2311,14 +2222,16 @@ aio_task_ptr log_file::commit_log_block(log_block &block,
return tsk;
}

void log_file::reset_stream()
void log_file::reset_stream(size_t offset /*default = 0*/)
{
if (_stream == nullptr) {
_stream.reset(new file_streamer(_handle, 0));
_stream.reset(new file_streamer(_handle, offset));
} else {
_stream->reset(0);
_stream->reset(offset);
}
if (offset == 0) {
_crc32 = 0;
}
_crc32 = 0;
}

decree log_file::previous_log_max_decree(const dsn::gpid &pid)
Expand Down
Loading

0 comments on commit f4947c4

Please sign in to comment.