Skip to content

Commit

Permalink
refactor: move primary's learning preparation of cache into another f…
Browse files Browse the repository at this point in the history
…unction (#368)
  • Loading branch information
Wu Tao committed Mar 3, 2021
1 parent f8aefb4 commit 7d6f56b
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 62 deletions.
21 changes: 14 additions & 7 deletions .github/workflows/pull_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,27 @@ jobs:
needs: lint
runs-on: self-hosted
container:
image: apachepegasus/ci-env
image: registry.cn-beijing.aliyuncs.com/apachepegasus/thirdparties-bin:ubuntu1804
env:
CCACHE_DIR: /tmp/ccache/pegasus
CCACHE_MAXSIZE: 10G
volumes:
# Place ccache compilation intermediate results in host memory, that's shared among containers.
- /tmp/ccache/pegasus:/tmp/ccache/pegasus
# Read docs at https://docs.docker.com/storage/tmpfs/ for more details of using tmpfs in docker.
options: --mount type=tmpfs,destination=/tmp/pegasus,tmpfs-size=10737418240 --cap-add=SYS_PTRACE
options: --mount type=tmpfs,destination=/tmp/pegasus --cap-add=SYS_PTRACE
defaults:
run:
shell: bash
working-directory: /root/rdsn
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 1
- name: Clone rdsn source
working-directory: /root
run: |
git clone --depth=1 https://hub.fastgit.org/XiaoMi/rdsn.git
- name: Unpack prebuilt third-parties
if: contains(github.event.pull_request.labels.*.name, 'thirdparty-modified') == false
run: unzip /root/pegasus-thirdparty-output.zip -d ./thirdparty
run: unzip /root/thirdparties-bin.zip -d ./thirdparty
- name: Rebuild third-parties
if: contains(github.event.pull_request.labels.*.name, 'thirdparty-modified')
working-directory: thirdparty
Expand All @@ -63,4 +68,6 @@ jobs:
- name: Compilation
run: ./run.sh build -c --skip_thirdparty
- name: Unit Testing
run: ./run.sh test --skip_thirdparty
run: |
export LD_LIBRARY_PATH=/root/rdsn/thirdparty/output/lib:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64/server
./run.sh test --skip_thirdparty
9 changes: 9 additions & 0 deletions src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,15 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
void notify_learn_completion();
error_code apply_learned_state_from_private_log(learn_state &state);

// Prepares in-memory mutations for the replica's learning.
// Returns false if there's no delta data in cache (aka prepare-list).
bool prepare_cached_learn_state(const learn_request &request,
decree learn_start_decree,
decree local_committed_decree,
/*out*/ remote_learner_state &learner_state,
/*out*/ learn_response &response,
/*out*/ bool &delayed_replay_prepare_list);

// Gets the position where this round of the learning process should begin.
// This method is called on primary-side.
// TODO(wutao1): mark it const
Expand Down
125 changes: 70 additions & 55 deletions src/replica/replica_learn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -437,62 +437,14 @@ void replica::on_learn(dsn::message_ex *msg, const learn_request &request)
response.last_committed_decree = local_committed_decree;
response.err = ERR_OK;

// set prepare_start_decree when to-be-learn state is covered by prepare list,
// note min_decree can be NOT present in prepare list when list.count == 0
if (learn_start_decree > _prepare_list->min_decree() ||
(learn_start_decree == _prepare_list->min_decree() && _prepare_list->count() > 0)) {
if (learner_state.prepare_start_decree == invalid_decree) {
// start from (last_committed_decree + 1)
learner_state.prepare_start_decree = local_committed_decree + 1;

cleanup_preparing_mutations(false);

// the replayed prepare msg needs to be AFTER the learning response msg
// to reduce probability that preparing messages arrive remote early than
// learning response msg.
delayed_replay_prepare_list = true;

ddebug("%s: on_learn[%016" PRIx64
"]: learner = %s, set prepare_start_decree = %" PRId64,
name(),
request.signature,
request.learner.to_string(),
local_committed_decree + 1);
}

response.prepare_start_decree = learner_state.prepare_start_decree;
} else {
learner_state.prepare_start_decree = invalid_decree;
}

// only learn mutation cache in range of [learn_start_decree, prepare_start_decree),
// in this case, the state on the PS should be contiguous (+ to-be-sent prepare list)
if (response.prepare_start_decree != invalid_decree) {
binary_writer writer;
int count = 0;
for (decree d = learn_start_decree; d < response.prepare_start_decree; d++) {
auto mu = _prepare_list->get_mutation_by_decree(d);
dassert(mu != nullptr, "mutation must not be nullptr, decree = %" PRId64 "", d);
mu->write_to(writer, nullptr);
count++;
}
response.type = learn_type::LT_CACHE;
response.state.meta = writer.get_buffer();
ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, learn mutation cache succeed, "
"learn_start_decree = %" PRId64 ", prepare_start_decree = %" PRId64 ", "
"learn_mutation_count = %d, learn_data_size = %d",
name(),
request.signature,
request.learner.to_string(),
learn_start_decree,
response.prepare_start_decree,
count,
response.state.meta.length());
}

// learn delta state or checkpoint
// in this case, the state on the PS is still incomplete
else {
bool should_learn_cache = prepare_cached_learn_state(request,
learn_start_decree,
local_committed_decree,
learner_state,
response,
delayed_replay_prepare_list);
if (!should_learn_cache) {
if (learn_start_decree > _app->last_durable_decree()) {
ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn private logs, "
"because learn_start_decree(%" PRId64 ") > _app->last_durable_decree(%" PRId64
Expand Down Expand Up @@ -1004,6 +956,69 @@ void replica::on_learn_reply(error_code err, learn_request &&req, learn_response
}
}

bool replica::prepare_cached_learn_state(const learn_request &request,
decree learn_start_decree,
decree local_committed_decree,
/*out*/ remote_learner_state &learner_state,
/*out*/ learn_response &response,
/*out*/ bool &delayed_replay_prepare_list)
{
// set prepare_start_decree when to-be-learn state is covered by prepare list,
// note min_decree can be NOT present in prepare list when list.count == 0
if (learn_start_decree > _prepare_list->min_decree() ||
(learn_start_decree == _prepare_list->min_decree() && _prepare_list->count() > 0)) {
if (learner_state.prepare_start_decree == invalid_decree) {
// start from (last_committed_decree + 1)
learner_state.prepare_start_decree = local_committed_decree + 1;

cleanup_preparing_mutations(false);

// the replayed prepare msg needs to be AFTER the learning response msg
// to reduce probability that preparing messages arrive remote early than
// learning response msg.
delayed_replay_prepare_list = true;

ddebug("%s: on_learn[%016" PRIx64
"]: learner = %s, set prepare_start_decree = %" PRId64,
name(),
request.signature,
request.learner.to_string(),
local_committed_decree + 1);
}

response.prepare_start_decree = learner_state.prepare_start_decree;
} else {
learner_state.prepare_start_decree = invalid_decree;
}

// only learn mutation cache in range of [learn_start_decree, prepare_start_decree),
// in this case, the state on the PS should be contiguous (+ to-be-sent prepare list)
if (response.prepare_start_decree != invalid_decree) {
binary_writer writer;
int count = 0;
for (decree d = learn_start_decree; d < response.prepare_start_decree; d++) {
auto mu = _prepare_list->get_mutation_by_decree(d);
dassert(mu != nullptr, "mutation must not be nullptr, decree = %" PRId64 "", d);
mu->write_to(writer, nullptr);
count++;
}
response.type = learn_type::LT_CACHE;
response.state.meta = writer.get_buffer();
ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, learn mutation cache succeed, "
"learn_start_decree = %" PRId64 ", prepare_start_decree = %" PRId64 ", "
"learn_mutation_count = %d, learn_data_size = %d",
name(),
request.signature,
request.learner.to_string(),
learn_start_decree,
response.prepare_start_decree,
count,
response.state.meta.length());
return true;
}
return false;
}

void replica::on_copy_remote_state_completed(error_code err,
size_t size,
uint64_t copy_start_time,
Expand Down

0 comments on commit 7d6f56b

Please sign in to comment.