diff --git a/src/core/tests/service_api_c.cpp b/src/core/tests/service_api_c.cpp index f23cd2754a..155fb769b4 100644 --- a/src/core/tests/service_api_c.cpp +++ b/src/core/tests/service_api_c.cpp @@ -235,8 +235,8 @@ TEST(core, dsn_file) ASSERT_NE(nullptr, tin); if (dsn::tools::get_current_tool()->name() != "simulator") { - // 1 for tin, 1 for disk_engine - ASSERT_EQ(2, tin->get_count()); + // at least 1 for tin, but if already read completed, then only 1 + ASSERT_LE(1, tin->get_count()); } tin->wait(); diff --git a/src/dist/replication/lib/mutation_log.cpp b/src/dist/replication/lib/mutation_log.cpp index 8e4fd847e1..737926e53f 100644 --- a/src/dist/replication/lib/mutation_log.cpp +++ b/src/dist/replication/lib/mutation_log.cpp @@ -408,7 +408,7 @@ void mutation_log_private::write_pending_mutations(bool release_lock_required) // FIXME : the file could have been closed lf->flush(); - // update _private_max_commit_on_disk after writen into log file done + // update _private_max_commit_on_disk after written into log file done update_max_commit_on_disk(max_commit); } else { derror("write private log failed, err = %s", err.to_string()); diff --git a/src/dist/replication/lib/mutation_log.h b/src/dist/replication/lib/mutation_log.h index 331fa5936f..fd9bb33c3f 100644 --- a/src/dist/replication/lib/mutation_log.h +++ b/src/dist/replication/lib/mutation_log.h @@ -567,7 +567,7 @@ class log_file : public ref_counter static log_block *prepare_log_block(); // async write log entry into the file - // 'block' is the date to be writen + // 'block' is the date to be written // 'offset' is start offset of the entry in the global space // 'evt' is to indicate which thread pool to execute the callback // 'callback_host' is used to get tracer diff --git a/src/dist/replication/lib/replica.h b/src/dist/replication/lib/replica.h index 85899ef414..8643d3cc9d 100644 --- a/src/dist/replication/lib/replica.h +++ b/src/dist/replication/lib/replica.h @@ -264,6 +264,7 @@ class replica : public serverlet, public ref_counter, public replica_ba ///////////////////////////////////////////////////////////////// // cold backup + void clear_backup_checkpoint(const std::string &policy_name); void generate_backup_checkpoint(cold_backup_context_ptr backup_context); void trigger_async_checkpoint_for_backup(cold_backup_context_ptr backup_context); void wait_async_checkpoint_for_backup(cold_backup_context_ptr backup_context); diff --git a/src/dist/replication/lib/replica_backup.cpp b/src/dist/replication/lib/replica_backup.cpp index 4171580ace..bb051112b9 100644 --- a/src/dist/replication/lib/replica_backup.cpp +++ b/src/dist/replication/lib/replica_backup.cpp @@ -1,6 +1,7 @@ #include #include +#include #include #include "dist/replication/common/block_service_manager.h" @@ -13,6 +14,7 @@ namespace dsn { namespace replication { +// backup_id == 0 means clear backup context and checkpoint dirs of the policy. void replica::on_cold_backup(const backup_request &request, /*out*/ backup_response &response) { _checker.only_one_thread_access(); @@ -22,6 +24,11 @@ void replica::on_cold_backup(const backup_request &request, /*out*/ backup_respo cold_backup_context_ptr new_context( new cold_backup_context(this, request, _options->max_concurrent_uploading_file_count)); + ddebug("%s: received cold backup request, partition_status = %s%s", + new_context->name, + enum_to_string(status()), + backup_id == 0 ? ", this is a clear request" : ""); + if (status() == partition_status::type::PS_PRIMARY || status() == partition_status::type::PS_SECONDARY) { cold_backup_context_ptr backup_context = nullptr; @@ -29,6 +36,18 @@ void replica::on_cold_backup(const backup_request &request, /*out*/ backup_respo if (find != _cold_backup_contexts.end()) { backup_context = find->second; } else { + if (backup_id == 0) { + if (status() == partition_status::type::PS_PRIMARY) { + // send clear request to secondaries + send_backup_request_to_secondary(request); + } + // clear local checkpoint dirs in background thread + tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, &_tracker, [this, policy_name]() { + clear_backup_checkpoint(policy_name); + }); + return; + } + /// TODO: policy may change provider dist::block_service::block_filesystem *block_service = _stub->_block_service_manager.get_block_filesystem( @@ -55,17 +74,29 @@ void replica::on_cold_backup(const backup_request &request, /*out*/ backup_respo policy_name.c_str()); cold_backup_status backup_status = backup_context->status(); - if (backup_context->request.backup_id < backup_id || backup_status == ColdBackupCanceled) { - // clear obsoleted backup firstly - /// TODO: clear dir - ddebug("%s: clear obsoleted cold backup, old_backup_id = %" PRId64 + if (backup_id == 0 || backup_context->request.backup_id < backup_id || + backup_status == ColdBackupCanceled) { + // clear obsoleted backup context firstly + ddebug("%s: clear obsoleted cold backup context, old_backup_id = %" PRId64 ", old_backup_status = %s", new_context->name, backup_context->request.backup_id, cold_backup_status_to_string(backup_status)); backup_context->cancel(); _cold_backup_contexts.erase(policy_name); - on_cold_backup(request, response); + if (backup_id != 0) { + // go to another round + on_cold_backup(request, response); + } else { // backup_id == 0 + if (status() == partition_status::type::PS_PRIMARY) { + // send clear request to secondaries + send_backup_request_to_secondary(request); + } + // clear local checkpoint dirs in background thread + tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, &_tracker, [this, policy_name]() { + clear_backup_checkpoint(policy_name); + }); + } return; } @@ -144,6 +175,14 @@ void replica::on_cold_backup(const backup_request &request, /*out*/ backup_respo _cold_backup_contexts.erase(policy_name); } else if (backup_status == ColdBackupCompleted) { ddebug("%s: upload checkpoint completed, response ERR_OK", backup_context->name); + // send clear request to secondaries + backup_request new_request = request; + new_request.backup_id = 0; + send_backup_request_to_secondary(new_request); + // clear local checkpoint dirs in background thread + tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, &_tracker, [this, policy_name]() { + clear_backup_checkpoint(policy_name); + }); response.err = ERR_OK; } else { dwarn( @@ -201,6 +240,39 @@ backup_get_tmp_dir_name(const std::string &policy_name, int64_t backup_id, int64 return std::string(buffer); } +// returns true if this checkpoint dir belongs to the policy +static bool is_policy_checkpoint(const std::string &chkpt_dirname, const std::string &policy_name) +{ + std::vector strs; + utils::split_args(chkpt_dirname.c_str(), strs, '.'); + // backup_tmp..* or backup..* + return strs.size() >= 2 && + (strs[0] == std::string("backup_tmp") || strs[0] == std::string("backup")) && + strs[1] == policy_name; +} + +// get all backup checkpoint dirs which belong to the policy +static bool get_policy_checkpoint_dirs(const std::string &dir, + const std::string &policy, + /*out*/ std::vector &chkpt_dirs) +{ + chkpt_dirs.clear(); + // list sub dirs + std::vector sub_dirs; + if (!utils::filesystem::get_subdirectories(dir, sub_dirs, false)) { + derror("list sub dirs of dir %s failed", dir.c_str()); + return false; + } + + for (std::string &d : sub_dirs) { + std::string dirname = utils::filesystem::get_file_name(d); + if (is_policy_checkpoint(dirname, policy)) { + chkpt_dirs.emplace_back(std::move(dirname)); + } + } + return true; +} + // returns: // 0 : not related // 1 : related (belong to this policy but not belong to this backup_context) @@ -263,13 +335,13 @@ static bool filter_checkpoint(const std::string &dir, related_chkpt_dirs.clear(); // list sub dirs std::vector sub_dirs; - if (!dsn::utils::filesystem::get_subdirectories(dir, sub_dirs, false)) { + if (!utils::filesystem::get_subdirectories(dir, sub_dirs, false)) { derror("%s: list sub dirs of dir %s failed", backup_context->name, dir.c_str()); return false; } for (std::string &d : sub_dirs) { - std::string dirname = ::dsn::utils::filesystem::get_file_name(d); + std::string dirname = utils::filesystem::get_file_name(d); int ret = is_related_or_valid_checkpoint(dirname, backup_context); if (ret == 1) { related_chkpt_dirs.emplace_back(std::move(dirname)); @@ -291,7 +363,7 @@ statistic_file_infos_under_dir(const std::string &dir, /*out*/ int64_t &total_size) { std::vector sub_files; - if (!dsn::utils::filesystem::get_subfiles(dir, sub_files, false)) { + if (!utils::filesystem::get_subfiles(dir, sub_files, false)) { derror("list sub files of dir %s failed", dir.c_str()); return false; } @@ -302,11 +374,11 @@ statistic_file_infos_under_dir(const std::string &dir, for (std::string &file : sub_files) { std::pair file_info; - if (!::dsn::utils::filesystem::file_size(file, file_info.second)) { + if (!utils::filesystem::file_size(file, file_info.second)) { derror("get file size of %s failed", file.c_str()); return false; } - file_info.first = ::dsn::utils::filesystem::get_file_name(file); + file_info.first = utils::filesystem::get_file_name(file); total_size += file_info.second; file_infos.emplace_back(std::move(file_info)); @@ -334,6 +406,29 @@ static bool backup_parse_dir_name(const char *name, } } +// clear all checkpoint dirs of the policy +void replica::clear_backup_checkpoint(const std::string &policy_name) +{ + ddebug_replica("clear all checkpoint dirs of policy({})", policy_name); + auto backup_dir = _app->backup_dir(); + if (!utils::filesystem::directory_exists(backup_dir)) { + return; + } + std::vector chkpt_dirs; + if (!get_policy_checkpoint_dirs(backup_dir, policy_name, chkpt_dirs)) { + dwarn_replica("get checkpoint dirs in backup dir({}) failed", backup_dir); + return; + } + for (const std::string &dirname : chkpt_dirs) { + std::string full_path = utils::filesystem::path_combine(backup_dir, dirname); + if (utils::filesystem::remove_path(full_path)) { + ddebug_replica("remove backup checkpoint dir({}) succeed", full_path); + } else { + dwarn_replica("remove backup checkpoint dir({}) failed", full_path); + } + } +} + // run in REPLICATION_LONG thread // Effection: // - may ignore_checkpoint() if in invalid status @@ -353,8 +448,8 @@ void replica::generate_backup_checkpoint(cold_backup_context_ptr backup_context) // prepare back dir auto backup_dir = _app->backup_dir(); - if (!dsn::utils::filesystem::directory_exists(backup_dir) && - !dsn::utils::filesystem::create_directory(backup_dir)) { + if (!utils::filesystem::directory_exists(backup_dir) && + !utils::filesystem::create_directory(backup_dir)) { derror("%s: create backup dir %s failed", backup_context->name, backup_dir.c_str()); backup_context->fail_checkpoint("create backup dir failed"); return; @@ -372,7 +467,7 @@ void replica::generate_backup_checkpoint(cold_backup_context_ptr backup_context) std::vector> file_infos; int64_t total_size = 0; std::string valid_chkpt_full_path = - ::dsn::utils::filesystem::path_combine(backup_dir, valid_backup_chkpt_dirname); + utils::filesystem::path_combine(backup_dir, valid_backup_chkpt_dirname); // parse checkpoint dirname std::string policy_name; int64_t backup_id = 0, decree = 0, timestamp = 0; @@ -423,11 +518,11 @@ void replica::generate_backup_checkpoint(cold_backup_context_ptr backup_context) // clear related but not valid checkpoint for (const std::string &dirname : related_backup_chkpt_dirname) { - std::string full_path = ::dsn::utils::filesystem::path_combine(backup_dir, dirname); + std::string full_path = utils::filesystem::path_combine(backup_dir, dirname); ddebug("%s: found obsolete backup checkpoint dir(%s), remove it", backup_context->name, full_path.c_str()); - if (!dsn::utils::filesystem::remove_path(full_path)) { + if (!utils::filesystem::remove_path(full_path)) { dwarn("%s: remove obsolete backup checkpoint dir(%s) failed", backup_context->name, full_path.c_str()); @@ -578,7 +673,7 @@ void replica::local_create_backup_checkpoint(cold_backup_context_ptr backup_cont // the real checkpoint decree may be larger than backup_context->checkpoint_decree, // so we need copy checkpoint to backup_checkpoint_tmp_dir_path, and then rename it. - std::string backup_checkpoint_tmp_dir_path = ::dsn::utils::filesystem::path_combine( + std::string backup_checkpoint_tmp_dir_path = utils::filesystem::path_combine( _app->backup_dir(), backup_get_tmp_dir_name(backup_context->request.policy.policy_name, backup_context->request.backup_id, @@ -592,7 +687,7 @@ void replica::local_create_backup_checkpoint(cold_backup_context_ptr backup_cont "local_create_backup_checkpoint 10s later", backup_context->name, err.to_string()); - dsn::utils::filesystem::remove_path(backup_checkpoint_tmp_dir_path); + utils::filesystem::remove_path(backup_checkpoint_tmp_dir_path); tasking::enqueue( LPC_BACKGROUND_COLD_BACKUP, &_tracker, @@ -605,20 +700,20 @@ void replica::local_create_backup_checkpoint(cold_backup_context_ptr backup_cont last_decree, backup_context->checkpoint_decree); backup_context->checkpoint_decree = last_decree; // update to real decree - std::string backup_checkpoint_dir_path = ::dsn::utils::filesystem::path_combine( + std::string backup_checkpoint_dir_path = utils::filesystem::path_combine( _app->backup_dir(), backup_get_dir_name(backup_context->request.policy.policy_name, backup_context->request.backup_id, backup_context->checkpoint_decree, backup_context->checkpoint_timestamp)); - if (!dsn::utils::filesystem::rename_path(backup_checkpoint_tmp_dir_path, - backup_checkpoint_dir_path)) { + if (!utils::filesystem::rename_path(backup_checkpoint_tmp_dir_path, + backup_checkpoint_dir_path)) { derror("%s: rename checkpoint dir(%s) to dir(%s) failed", backup_context->name, backup_checkpoint_tmp_dir_path.c_str(), backup_checkpoint_dir_path.c_str()); - dsn::utils::filesystem::remove_path(backup_checkpoint_tmp_dir_path); - dsn::utils::filesystem::remove_path(backup_checkpoint_dir_path); + utils::filesystem::remove_path(backup_checkpoint_tmp_dir_path); + utils::filesystem::remove_path(backup_checkpoint_dir_path); backup_context->fail_checkpoint("rename checkpoint dir failed"); return; } diff --git a/src/dist/replication/replication.thrift b/src/dist/replication/replication.thrift index fc4bfa174d..09257a5a93 100644 --- a/src/dist/replication/replication.thrift +++ b/src/dist/replication/replication.thrift @@ -453,6 +453,8 @@ struct configuration_restore_request 8:bool skip_bad_partition; } +// if backup_id == 0, means clear all backup resources (including backup contexts and +// checkpoint dirs) of this policy. struct backup_request { 1:dsn.gpid pid; diff --git a/src/dist/replication/test/simple_kv/case-402.act b/src/dist/replication/test/simple_kv/case-402.act index bfa0a6d74d..85458a1b05 100644 --- a/src/dist/replication/test/simple_kv/case-402.act +++ b/src/dist/replication/test/simple_kv/case-402.act @@ -181,7 +181,7 @@ client:begin_write:id=169,key=k169,value=v169,timeout=0 inject:on_aio_call:node=r2,task_code=LPC_WRITE_REPLICATION_LOG_SHARED config:{4,r1,[r3]} -state:{{r1,pri,4,23},{r3,sec,4,20}} +state:{{r1,pri,4,20},{r3,sec,4,11}} set:disable_load_balance=0