Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
refactor: move replay related codes to mutation_log_replay (#324)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wu Tao authored and HuangWei committed Oct 8, 2019
1 parent 4928fa5 commit 16d7f3f
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 95 deletions.
95 changes: 0 additions & 95 deletions src/dist/replication/lib/mutation_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -884,101 +884,6 @@ std::pair<log_file_ptr, int64_t> mutation_log::mark_new_offset(size_t size,
return std::make_pair(_current_log_file, write_start_offset);
}

// TODO(wutao1): move it to mutation_log_replay.cpp
/*static*/ error_code mutation_log::replay(std::vector<std::string> &log_files,
replay_callback callback,
/*out*/ int64_t &end_offset)
{
std::map<int, log_file_ptr> logs;
for (auto &fpath : log_files) {
error_code err;
log_file_ptr log = log_file::open_read(fpath.c_str(), err);
if (log == nullptr) {
if (err == ERR_HANDLE_EOF || err == ERR_INCOMPLETE_DATA ||
err == ERR_INVALID_PARAMETERS) {
dinfo("skip file %s during log replay", fpath.c_str());
continue;
} else {
return err;
}
}

dassert(
logs.find(log->index()) == logs.end(), "invalid log_index, index = %d", log->index());
logs[log->index()] = log;
}

return replay(logs, callback, end_offset);
}

// TODO(wutao1): move it to mutation_log_replay.cpp
/*static*/ error_code mutation_log::replay(std::map<int, log_file_ptr> &logs,
replay_callback callback,
/*out*/ int64_t &end_offset)
{
int64_t g_start_offset = 0;
int64_t g_end_offset = 0;
error_code err = ERR_OK;
log_file_ptr last;

if (logs.size() > 0) {
g_start_offset = logs.begin()->second->start_offset();
g_end_offset = logs.rbegin()->second->end_offset();
}

error_s error = log_utils::check_log_files_continuity(logs);
if (!error.is_ok()) {
derror_f("check_log_files_continuity failed: {}", error);
return error.code();
}

end_offset = g_start_offset;

for (auto &kv : logs) {
log_file_ptr &log = kv.second;

if (log->start_offset() != end_offset) {
derror("offset mismatch in log file offset and global offset %" PRId64 " vs %" PRId64,
log->start_offset(),
end_offset);
return ERR_INVALID_DATA;
}

last = log;
err = mutation_log::replay(log, callback, end_offset);

log->close();

if (err == ERR_OK || err == ERR_HANDLE_EOF) {
// do nothing
} else if (err == ERR_INCOMPLETE_DATA) {
// If the file is not corrupted, it may also return the value of ERR_INCOMPLETE_DATA.
// In this case, the correctness is relying on the check of start_offset.
dwarn("delay handling error: %s", err.to_string());
} else {
// for other errors, we should break
break;
}
}

if (err == ERR_OK || err == ERR_HANDLE_EOF) {
// the log may still be written when used for learning
dassert(g_end_offset <= end_offset,
"make sure the global end offset is correct: %" PRId64 " vs %" PRId64,
g_end_offset,
end_offset);
err = ERR_OK;
} else if (err == ERR_INCOMPLETE_DATA) {
// ignore the last incomplate block
err = ERR_OK;
} else {
// bad error
derror("replay mutation log failed: %s", err.to_string());
}

return err;
}

decree mutation_log::max_decree(gpid gpid) const
{
zauto_lock l(_lock);
Expand Down
94 changes: 94 additions & 0 deletions src/dist/replication/lib/mutation_log_replay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// can be found in the LICENSE file in the root directory of this source tree.

#include "dist/replication/lib/mutation_log.h"
#include "dist/replication/lib/mutation_log_utils.h"
#include <dsn/utility/fail_point.h>
#include <dsn/utility/errors.h>
#include <dsn/dist/fmt_logging.h>
Expand Down Expand Up @@ -98,5 +99,98 @@ namespace replication {
return error_s::ok();
}

/*static*/ error_code mutation_log::replay(std::vector<std::string> &log_files,
replay_callback callback,
/*out*/ int64_t &end_offset)
{
std::map<int, log_file_ptr> logs;
for (auto &fpath : log_files) {
error_code err;
log_file_ptr log = log_file::open_read(fpath.c_str(), err);
if (log == nullptr) {
if (err == ERR_HANDLE_EOF || err == ERR_INCOMPLETE_DATA ||
err == ERR_INVALID_PARAMETERS) {
dinfo("skip file %s during log replay", fpath.c_str());
continue;
} else {
return err;
}
}

dassert(
logs.find(log->index()) == logs.end(), "invalid log_index, index = %d", log->index());
logs[log->index()] = log;
}

return replay(logs, callback, end_offset);
}

/*static*/ error_code mutation_log::replay(std::map<int, log_file_ptr> &logs,
replay_callback callback,
/*out*/ int64_t &end_offset)
{
int64_t g_start_offset = 0;
int64_t g_end_offset = 0;
error_code err = ERR_OK;
log_file_ptr last;

if (logs.size() > 0) {
g_start_offset = logs.begin()->second->start_offset();
g_end_offset = logs.rbegin()->second->end_offset();
}

error_s error = log_utils::check_log_files_continuity(logs);
if (!error.is_ok()) {
derror_f("check_log_files_continuity failed: {}", error);
return error.code();
}

end_offset = g_start_offset;

for (auto &kv : logs) {
log_file_ptr &log = kv.second;

if (log->start_offset() != end_offset) {
derror("offset mismatch in log file offset and global offset %" PRId64 " vs %" PRId64,
log->start_offset(),
end_offset);
return ERR_INVALID_DATA;
}

last = log;
err = mutation_log::replay(log, callback, end_offset);

log->close();

if (err == ERR_OK || err == ERR_HANDLE_EOF) {
// do nothing
} else if (err == ERR_INCOMPLETE_DATA) {
// If the file is not corrupted, it may also return the value of ERR_INCOMPLETE_DATA.
// In this case, the correctness is relying on the check of start_offset.
dwarn("delay handling error: %s", err.to_string());
} else {
// for other errors, we should break
break;
}
}

if (err == ERR_OK || err == ERR_HANDLE_EOF) {
// the log may still be written when used for learning
dassert(g_end_offset <= end_offset,
"make sure the global end offset is correct: %" PRId64 " vs %" PRId64,
g_end_offset,
end_offset);
err = ERR_OK;
} else if (err == ERR_INCOMPLETE_DATA) {
// ignore the last incomplate block
err = ERR_OK;
} else {
// bad error
derror("replay mutation log failed: %s", err.to_string());
}

return err;
}

} // namespace replication
} // namespace dsn

0 comments on commit 16d7f3f

Please sign in to comment.