Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat(split): child replica learn parent prepare list and checkpoint (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
hycdong authored and qinzuoyan committed Sep 20, 2019
1 parent b8c5adf commit da71dbd
Show file tree
Hide file tree
Showing 9 changed files with 353 additions and 39 deletions.
1 change: 1 addition & 0 deletions include/dsn/dist/replication/replication.codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ MAKE_EVENT_CODE(LPC_CHECKPOINT_REPLICA, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_CATCHUP_WITH_PRIVATE_LOGS, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_DISK_STAT, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_BACKGROUND_COLD_BACKUP, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_PARTITION_SPLIT_ASYNC_LEARN, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_REPLICATION_LONG_LOW, TASK_PRIORITY_LOW)
MAKE_EVENT_CODE(LPC_REPLICATION_LONG_COMMON, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_REPLICATION_LONG_HIGH, TASK_PRIORITY_HIGH)
Expand Down
1 change: 1 addition & 0 deletions src/dist/replication/lib/prepare_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class prepare_list : public mutation_cache, private replica_base
decree last_committed_decree() const { return _last_committed_decree; }
void reset(decree init_decree);
void truncate(decree init_decree);
void set_committer(mutation_committer committer) { _committer = committer; }

//
// for two-phase commit
Expand Down
1 change: 1 addition & 0 deletions src/dist/replication/lib/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ void replica::close()
dassert(_secondary_states.is_cleaned(), "secondary context is not cleared");
dassert(_potential_secondary_states.is_cleaned(),
"potential secondary context is not cleared");
dassert(_split_states.is_cleaned(), "partition split context is not cleared");
}

// for partition_status::PS_ERROR, context cleanup is done here as they may block
Expand Down
30 changes: 25 additions & 5 deletions src/dist/replication/lib/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -338,11 +338,29 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba

void parent_prepare_states(const std::string &dir);

void child_copy_states(learn_state lstate,
std::vector<mutation_ptr> mutation_list,
std::vector<std::string> files,
uint64_t total_file_size,
std::shared_ptr<prepare_list> plist);
// child copy parent prepare list and call child_learn_states
void child_copy_prepare_list(learn_state lstate,
std::vector<mutation_ptr> mutation_list,
std::vector<std::string> plog_files,
uint64_t total_file_size,
std::shared_ptr<prepare_list> plist);

// child learn states(including checkpoint, private logs, in-memory mutations)
void child_learn_states(learn_state lstate,
std::vector<mutation_ptr> mutation_list,
std::vector<std::string> plog_files,
uint64_t total_file_size,
decree last_committed_decree);

error_code child_replay_private_log(std::vector<std::string> plog_files,
uint64_t total_file_size,
decree last_committed_decree);

error_code child_learn_mutations(std::vector<mutation_ptr> mutation_list,
decree last_committed_decree);

// child catch up parent states while executing async learn task
void child_catch_up_states();

// return true if parent status is valid
bool parent_check_states();
Expand All @@ -351,6 +369,8 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
void parent_cleanup_split_context();
// child suicide when partition split failed
void child_handle_split_error(const std::string &error_msg);
// child handle error while async learn parent states
void child_handle_async_learn_error();

private:
friend class ::dsn::replication::replication_checker;
Expand Down
5 changes: 5 additions & 0 deletions src/dist/replication/lib/replica_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1290,9 +1290,14 @@ void cold_backup_context::file_upload_complete(const std::string &filename)

bool partition_split_context::cleanup(bool force)
{
CLEANUP_TASK(async_learn_task, force)

parent_gpid.set_app_id(0);
is_prepare_list_copied = false;
return true;
}

bool partition_split_context::is_cleaned() const { return async_learn_task == nullptr; }

} // namespace replication
} // namespace dsn
8 changes: 6 additions & 2 deletions src/dist/replication/lib/replica_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -526,12 +526,16 @@ typedef dsn::ref_ptr<cold_backup_context> cold_backup_context_ptr;
class partition_split_context
{
public:
partition_split_context() {}
// TODO(heyuchen): force will be used in further pull request
partition_split_context() : is_prepare_list_copied(false) {}
bool cleanup(bool force);
bool is_cleaned() const;

public:
gpid parent_gpid;
bool is_prepare_list_copied;

// child replica async learn parent states
dsn::task_ptr async_learn_task;
};

//---------------inline impl----------------------------------------------------------------
Expand Down
172 changes: 164 additions & 8 deletions src/dist/replication/lib/replica_split.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <dsn/dist/fmt_logging.h>
#include <dsn/dist/replication/replication_app_base.h>
#include <dsn/utility/defer.h>
#include <dsn/utility/filesystem.h>
#include <dsn/utility/fail_point.h>

Expand Down Expand Up @@ -93,6 +94,7 @@ void replica::child_init_replica(gpid parent_gpid,

// init split states
_split_states.parent_gpid = parent_gpid;
_split_states.is_prepare_list_copied = false;

ddebug_replica("init ballot is {}, parent gpid is ({})", init_ballot, parent_gpid);

Expand Down Expand Up @@ -188,7 +190,7 @@ void replica::parent_prepare_states(const std::string &dir) // on parent partiti
last_committed_decree());

_stub->split_replica_exec(_child_gpid,
std::bind(&replica::child_copy_states,
std::bind(&replica::child_copy_prepare_list,
std::placeholders::_1,
parent_states,
mutation_list,
Expand All @@ -200,14 +202,158 @@ void replica::parent_prepare_states(const std::string &dir) // on parent partiti
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica::child_copy_states(learn_state lstate,
std::vector<mutation_ptr> mutation_list,
std::vector<std::string> files,
uint64_t total_file_size,
std::shared_ptr<prepare_list> plist) // on child partition
void replica::child_copy_prepare_list(learn_state lstate,
std::vector<mutation_ptr> mutation_list,
std::vector<std::string> plog_files,
uint64_t total_file_size,
std::shared_ptr<prepare_list> plist) // on child partition
{
FAIL_POINT_INJECT_F("replica_child_copy_states", [](dsn::string_view) {});
// TODO(heyuchen): impelment function in further pull request
FAIL_POINT_INJECT_F("replica_child_copy_prepare_list", [](dsn::string_view) {});

if (status() != partition_status::PS_PARTITION_SPLIT) {
dwarn_replica("wrong status, status is {}", enum_to_string(status()));
_stub->split_replica_error_handler(
_split_states.parent_gpid,
std::bind(&replica::parent_cleanup_split_context, std::placeholders::_1));
child_handle_split_error("wrong child status when execute child_copy_prepare_list");
return;
}

// learning parent states is time-consuming, should execute in THREAD_POOL_REPLICATION_LONG
decree last_committed_decree = plist->last_committed_decree();
_split_states.async_learn_task = tasking::enqueue(LPC_PARTITION_SPLIT_ASYNC_LEARN,
tracker(),
std::bind(&replica::child_learn_states,
this,
lstate,
mutation_list,
plog_files,
total_file_size,
last_committed_decree));

ddebug_replica("start to copy parent prepare list, last_committed_decree={}, prepare list min "
"decree={}, max decree={}",
last_committed_decree,
plist->min_decree(),
plist->max_decree());

// copy parent prepare list
plist->set_committer(std::bind(&replica::execute_mutation, this, std::placeholders::_1));
delete _prepare_list;
_prepare_list = new prepare_list(this, *plist);
for (decree d = last_committed_decree + 1; d <= _prepare_list->max_decree(); ++d) {
mutation_ptr mu = _prepare_list->get_mutation_by_decree(d);
dassert_replica(mu != nullptr, "can not find mutation, dercee={}", d);
mu->data.header.pid = get_gpid();
_stub->_log->append(mu, LPC_WRITE_REPLICATION_LOG_COMMON, tracker(), nullptr);
_private_log->append(mu, LPC_WRITE_REPLICATION_LOG_COMMON, tracker(), nullptr);
// set mutation has been logged in private log
if (!mu->is_logged()) {
mu->set_logged();
}
}
_split_states.is_prepare_list_copied = true;
}

// ThreadPool: THREAD_POOL_REPLICATION_LONG
void replica::child_learn_states(learn_state lstate,
std::vector<mutation_ptr> mutation_list,
std::vector<std::string> plog_files,
uint64_t total_file_size,
decree last_committed_decree) // on child partition
{
FAIL_POINT_INJECT_F("replica_child_learn_states", [](dsn::string_view) {});

if (status() != partition_status::PS_PARTITION_SPLIT) {
dwarn_replica("wrong status, status is {}", enum_to_string(status()));
child_handle_async_learn_error();
return;
}

ddebug_replica("start to learn states asynchronously, prepare_list last_committed_decree={}, "
"checkpoint decree range=({},{}], private log files count={}, in-memory "
"mutation count={}",
last_committed_decree,
lstate.from_decree_excluded,
lstate.to_decree_included,
plog_files.size(),
mutation_list.size());

// apply parent checkpoint
error_code err;
auto cleanup = defer([this, &err]() {
if (err != ERR_OK) {
child_handle_async_learn_error();
}
});

err = _app->apply_checkpoint(replication_app_base::chkpt_apply_mode::learn, lstate);
if (err != ERR_OK) {
derror_replica("failed to apply checkpoint, error={}", err);
return;
}

// replay parent private log
err = child_replay_private_log(plog_files, total_file_size, last_committed_decree);
if (err != ERR_OK) {
derror_replica("failed to replay private log, error={}", err);
return;
}

// learn parent in-memory mutations
err = child_learn_mutations(mutation_list, last_committed_decree);
if (err != ERR_OK) {
derror_replica("failed to learn mutations, error={}", err);
return;
}

// generate a checkpoint synchronously
err = _app->sync_checkpoint();
if (err != ERR_OK) {
derror_replica("failed to generate checkpoint synchrounously, error={}", err);
return;
}

err = _app->update_init_info_ballot_and_decree(this);
if (err != ERR_OK) {
derror_replica("update_init_info_ballot_and_decree failed, error={}", err);
return;
}

ddebug_replica("learn parent states asynchronously succeed");

tasking::enqueue(LPC_PARTITION_SPLIT,
tracker(),
std::bind(&replica::child_catch_up_states, this),
get_gpid().thread_hash());
_split_states.async_learn_task = nullptr;
}

// ThreadPool: THREAD_POOL_REPLICATION_LONG
error_code replica::child_replay_private_log(std::vector<std::string> plog_files,
uint64_t total_file_size,
decree last_committed_decree) // on child partition
{
FAIL_POINT_INJECT_F("replica_child_replay_private_log",
[](dsn::string_view) { return ERR_OK; });
// TODO(heyuchen): TBD
return ERR_OK;
}

// ThreadPool: THREAD_POOL_REPLICATION_LONG
error_code replica::child_learn_mutations(std::vector<mutation_ptr> mutation_list,
decree last_committed_decree) // on child partition
{
FAIL_POINT_INJECT_F("replica_child_learn_mutations", [](dsn::string_view) { return ERR_OK; });
// TODO(heyuchen): TBD
return ERR_OK;
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica::child_catch_up_states() // on child partition
{
FAIL_POINT_INJECT_F("replica_child_catch_up_states", [](dsn::string_view) {});
// TODO(heyuchen): TBD
}

// ThreadPool: THREAD_POOL_REPLICATION
Expand All @@ -228,5 +374,15 @@ void replica::child_handle_split_error(const std::string &error_msg) // on child
}
}

// ThreadPool: THREAD_POOL_REPLICATION_LONG
void replica::child_handle_async_learn_error() // on child partition
{
_stub->split_replica_error_handler(
_split_states.parent_gpid,
std::bind(&replica::parent_cleanup_split_context, std::placeholders::_1));
child_handle_split_error("meet error when execute child_learn_states");
_split_states.async_learn_task = nullptr;
}

} // namespace replication
} // namespace dsn
5 changes: 3 additions & 2 deletions src/dist/replication/test/replica_test/unit_test/mock_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class mock_replication_app_base : public replication_app_base

error_code start(int, char **) override { return ERR_NOT_IMPLEMENTED; }
error_code stop(bool) override { return ERR_NOT_IMPLEMENTED; }
error_code sync_checkpoint() override { return ERR_NOT_IMPLEMENTED; }
error_code sync_checkpoint() override { return ERR_OK; }
error_code async_checkpoint(bool) override { return ERR_NOT_IMPLEMENTED; }
error_code prepare_get_checkpoint(blob &) override { return ERR_NOT_IMPLEMENTED; }
error_code get_checkpoint(int64_t, const blob &, learn_state &) override
Expand All @@ -53,7 +53,7 @@ class mock_replication_app_base : public replication_app_base
}
error_code storage_apply_checkpoint(chkpt_apply_mode, const learn_state &) override
{
return ERR_NOT_IMPLEMENTED;
return ERR_OK;
}
error_code copy_checkpoint_to_dir(const char *checkpoint_dir,
/*output*/ int64_t *last_decree) override
Expand Down Expand Up @@ -119,6 +119,7 @@ class mock_replica : public replica
void set_child_gpid(gpid pid) { _child_gpid = pid; }
void set_init_child_ballot(ballot b) { _child_init_ballot = b; }
void set_last_committed_decree(decree d) { _prepare_list->reset(d); }
prepare_list *get_plist() { return _prepare_list; }
};
typedef dsn::ref_ptr<mock_replica> mock_replica_ptr;

Expand Down
Loading

0 comments on commit da71dbd

Please sign in to comment.