Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

replica-server: remove checkpoint dirs when cold backup completed #216

Merged
merged 16 commits into from
Jan 14, 2019
1 change: 1 addition & 0 deletions src/dist/replication/lib/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba

/////////////////////////////////////////////////////////////////
// cold backup
void clear_backup_checkpoint(const std::string &policy_name);
void generate_backup_checkpoint(cold_backup_context_ptr backup_context);
void trigger_async_checkpoint_for_backup(cold_backup_context_ptr backup_context);
void wait_async_checkpoint_for_backup(cold_backup_context_ptr backup_context);
Expand Down
105 changes: 100 additions & 5 deletions src/dist/replication/lib/replica_backup.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <boost/lexical_cast.hpp>

#include <dsn/utility/filesystem.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/dist/replication/replication_app_base.h>

#include "dist/replication/common/block_service_manager.h"
Expand All @@ -13,6 +14,7 @@
namespace dsn {
namespace replication {

// backup_id == 0 means clear backup context and checkpoint dirs of the policy.
void replica::on_cold_backup(const backup_request &request, /*out*/ backup_response &response)
{
_checker.only_one_thread_access();
Expand All @@ -22,13 +24,30 @@ void replica::on_cold_backup(const backup_request &request, /*out*/ backup_respo
cold_backup_context_ptr new_context(
new cold_backup_context(this, request, _options->max_concurrent_uploading_file_count));

ddebug("%s: received cold backup request, partition_status = %s%s",
new_context->name,
enum_to_string(status()),
backup_id == 0 ? ", this is a clear request" : "");

if (status() == partition_status::type::PS_PRIMARY ||
status() == partition_status::type::PS_SECONDARY) {
cold_backup_context_ptr backup_context = nullptr;
auto find = _cold_backup_contexts.find(policy_name);
if (find != _cold_backup_contexts.end()) {
backup_context = find->second;
} else {
if (backup_id == 0) {
if (status() == partition_status::type::PS_PRIMARY) {
Copy link
Contributor

@neverchanje neverchanje Jan 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感觉这个 check 应该放在 send_backup_request_to_secondary 里面,这样就不用每次调用都写一遍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

也就多写一遍,而且外面确实就需要检查一下,调用这个函数的角色就应当是primary

// send clear request to secondaries
send_backup_request_to_secondary(request);
}
// clear local checkpoint dirs in background thread
tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, &_tracker, [this, policy_name]() {
clear_backup_checkpoint(policy_name);
});
return;
}

/// TODO: policy may change provider
dist::block_service::block_filesystem *block_service =
_stub->_block_service_manager.get_block_filesystem(
Expand All @@ -55,17 +74,29 @@ void replica::on_cold_backup(const backup_request &request, /*out*/ backup_respo
policy_name.c_str());
cold_backup_status backup_status = backup_context->status();

if (backup_context->request.backup_id < backup_id || backup_status == ColdBackupCanceled) {
// clear obsoleted backup firstly
/// TODO: clear dir
ddebug("%s: clear obsoleted cold backup, old_backup_id = %" PRId64
if (backup_id == 0 || backup_context->request.backup_id < backup_id ||
backup_status == ColdBackupCanceled) {
// clear obsoleted backup context firstly
ddebug("%s: clear obsoleted cold backup context, old_backup_id = %" PRId64
", old_backup_status = %s",
new_context->name,
backup_context->request.backup_id,
cold_backup_status_to_string(backup_status));
backup_context->cancel();
_cold_backup_contexts.erase(policy_name);
on_cold_backup(request, response);
if (backup_id != 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

现在的backup_id是开始备份的时间戳吧?所以肯定不是0?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的,是时间戳

// go to another round
on_cold_backup(request, response);
} else { // backup_id == 0
if (status() == partition_status::type::PS_PRIMARY) {
// send clear request to secondaries
send_backup_request_to_secondary(request);
}
// clear local checkpoint dirs in background thread
tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, &_tracker, [this, policy_name]() {
clear_backup_checkpoint(policy_name);
});
}
return;
}

Expand Down Expand Up @@ -144,6 +175,14 @@ void replica::on_cold_backup(const backup_request &request, /*out*/ backup_respo
_cold_backup_contexts.erase(policy_name);
} else if (backup_status == ColdBackupCompleted) {
ddebug("%s: upload checkpoint completed, response ERR_OK", backup_context->name);
// send clear request to secondaries
backup_request new_request = request;
new_request.backup_id = 0;
send_backup_request_to_secondary(new_request);
// clear local checkpoint dirs in background thread
tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, &_tracker, [this, policy_name]() {
clear_backup_checkpoint(policy_name);
});
response.err = ERR_OK;
} else {
dwarn(
Expand Down Expand Up @@ -201,6 +240,39 @@ backup_get_tmp_dir_name(const std::string &policy_name, int64_t backup_id, int64
return std::string(buffer);
}

// returns true if this checkpoint dir belongs to the policy
static bool is_policy_checkpoint(const std::string &chkpt_dirname, const std::string &policy_name)
{
std::vector<std::string> strs;
::dsn::utils::split_args(chkpt_dirname.c_str(), strs, '.');
// backup_tmp.<policy_name>.* or backup.<policy_name>.*
return strs.size() >= 2 &&
(strs[0] == std::string("backup_tmp") || strs[0] == std::string("backup")) &&
strs[1] == policy_name;
}

// get all backup checkpoint dirs which belong to the policy
static bool get_policy_checkpoint_dirs(const std::string &dir,
const std::string &policy,
/*out*/ std::vector<std::string> &chkpt_dirs)
{
chkpt_dirs.clear();
// list sub dirs
std::vector<std::string> sub_dirs;
if (!dsn::utils::filesystem::get_subdirectories(dir, sub_dirs, false)) {
derror("list sub dirs of dir %s failed", dir.c_str());
return false;
}

for (std::string &d : sub_dirs) {
std::string dirname = ::dsn::utils::filesystem::get_file_name(d);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

去掉 ::dsn

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (is_policy_checkpoint(dirname, policy)) {
chkpt_dirs.emplace_back(std::move(dirname));
}
}
return true;
}

// returns:
// 0 : not related
// 1 : related (belong to this policy but not belong to this backup_context)
Expand Down Expand Up @@ -334,6 +406,29 @@ static bool backup_parse_dir_name(const char *name,
}
}

// clear all checkpoint dirs of the policy
void replica::clear_backup_checkpoint(const std::string &policy_name)
{
ddebug_replica("clear all checkpoint dirs of policy({})", policy_name);
auto backup_dir = _app->backup_dir();
if (!dsn::utils::filesystem::directory_exists(backup_dir)) {
return;
}
std::vector<std::string> chkpt_dirs;
if (!get_policy_checkpoint_dirs(backup_dir, policy_name, chkpt_dirs)) {
dwarn_replica("get checkpoint dirs in backup dir({}) failed", backup_dir);
return;
}
for (const std::string &dirname : chkpt_dirs) {
std::string full_path = ::dsn::utils::filesystem::path_combine(backup_dir, dirname);
if (dsn::utils::filesystem::remove_path(full_path)) {
ddebug_replica("remove backup checkpoint dir({}) succeed", full_path);
} else {
dwarn_replica("remove backup checkpoint dir({}) failed", full_path);
}
}
}

// run in REPLICATION_LONG thread
// Effection:
// - may ignore_checkpoint() if in invalid status
Expand Down