Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
Merge branch 'master' into shared-io-bug.patch
Browse files Browse the repository at this point in the history
  • Loading branch information
HuangWei committed Dec 6, 2019
2 parents f5daa99 + f73980b commit ba952c2
Show file tree
Hide file tree
Showing 12 changed files with 417 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,28 @@ void replica_duplicator_manager::sync_duplication(const duplication_entry &ent)
}
}

decree replica_duplicator_manager::min_confirmed_decree() const
{
zauto_lock l(_lock);

decree min_decree = invalid_decree;
if (_replica->status() == partition_status::PS_PRIMARY) {
for (auto &kv : _duplications) {
const duplication_progress &p = kv.second->progress();
if (min_decree == invalid_decree) {
min_decree = p.confirmed_decree;
} else {
min_decree = std::min(min_decree, p.confirmed_decree);
}
}
} else if (_primary_confirmed_decree > 0) {
// if the replica is not primary, use the latest known (from primary)
// confirmed_decree instead.
min_decree = _primary_confirmed_decree;
}
return min_decree;
}

// Remove the duplications that are not in the `new_dup_map`.
// NOTE: this function may be blocked when destroying replica_duplicator.
void replica_duplicator_manager::remove_non_existed_duplications(
Expand All @@ -80,5 +102,23 @@ void replica_duplicator_manager::remove_non_existed_duplications(
}
}

void replica_duplicator_manager::update_confirmed_decree_if_secondary(decree confirmed)
{
// this function always runs in the same single thread with config-sync
if (_replica->status() != partition_status::PS_SECONDARY) {
return;
}

zauto_lock l(_lock);
remove_all_duplications();
if (confirmed >= 0) {
// confirmed decree never decreases
if (_primary_confirmed_decree < confirmed) {
_primary_confirmed_decree = confirmed;
}
}
_replica->update_init_info_duplicating(confirmed >= 0);
}

} // namespace replication
} // namespace dsn
13 changes: 13 additions & 0 deletions src/dist/replication/lib/duplication/replica_duplicator_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ class replica_duplicator_manager : public replica_base
/// collect updated duplication confirm points from this replica.
std::vector<duplication_confirm_entry> get_duplication_confirms_to_update() const;

/// mutations <= min_confirmed_decree are assumed to be cleanable.
/// If there's no duplication, invalid_decree is returned, mean that all logs are cleanable.
/// THREAD_POOL_REPLICATION
/// \see replica::on_checkpoint_timer()
decree min_confirmed_decree() const;

/// Updates the latest known confirmed decree on this replica if it's secondary.
/// THREAD_POOL_REPLICATION
/// \see replica_check.cpp
void update_confirmed_decree_if_secondary(decree confirmed);

private:
void sync_duplication(const duplication_entry &ent);

Expand All @@ -66,6 +77,8 @@ class replica_duplicator_manager : public replica_base

std::map<dupid_t, replica_duplicator_u_ptr> _duplications;

decree _primary_confirmed_decree{invalid_decree};

// avoid thread conflict between replica::on_checkpoint_timer and
// duplication_sync_timer.
mutable zlock _lock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,39 @@ class replica_duplicator_manager_test : public duplication_test_base
ASSERT_EQ(d._duplications.size(), 1);
}

void test_set_confirmed_decree_non_primary()
{
auto r = stub->add_primary_replica(2, 1);
auto &d = r->get_replica_duplicator_manager();

duplication_entry ent;
ent.dupid = 1;
ent.status = duplication_status::DS_PAUSE;
ent.remote = "dsn://slave-cluster";
ent.progress[r->get_gpid().get_partition_index()] = 100;
d.sync_duplication(ent);
ASSERT_EQ(d._duplications.size(), 1);
ASSERT_EQ(d._primary_confirmed_decree, invalid_decree);

// replica failover
r->as_secondary();

d.update_confirmed_decree_if_secondary(99);
ASSERT_EQ(d._duplications.size(), 0);
ASSERT_EQ(d._primary_confirmed_decree, 99);

// receives group check
d.update_confirmed_decree_if_secondary(101);
ASSERT_EQ(d._duplications.size(), 0);
ASSERT_EQ(d._primary_confirmed_decree, 101);

// confirmed decree never decreases
d.update_confirmed_decree_if_secondary(0);
ASSERT_EQ(d._primary_confirmed_decree, 101);
d.update_confirmed_decree_if_secondary(1);
ASSERT_EQ(d._primary_confirmed_decree, 101);
}

void test_get_duplication_confirms()
{
auto r = stub->add_primary_replica(2, 1);
Expand Down Expand Up @@ -68,17 +101,71 @@ class replica_duplicator_manager_test : public duplication_test_base
auto result = r->get_replica_duplicator_manager().get_duplication_confirms_to_update();
ASSERT_EQ(result.size(), update_dup_num);
}

void test_min_confirmed_decree()
{
struct test_case
{
std::vector<int64_t> confirmed_decree;
int64_t min_confirmed_decree;
};

auto r = stub->add_non_primary_replica(2, 1);
auto assert_test = [r, this](test_case tt) {
for (int id = 1; id <= tt.confirmed_decree.size(); id++) {
duplication_entry ent;
ent.dupid = id;
ent.status = duplication_status::DS_PAUSE;
ent.progress[r->get_gpid().get_partition_index()] = 0;

auto dup = make_unique<replica_duplicator>(ent, r);
dup->update_progress(dup->progress()
.set_last_decree(tt.confirmed_decree[id - 1])
.set_confirmed_decree(tt.confirmed_decree[id - 1]));
add_dup(r, std::move(dup));
}

ASSERT_EQ(r->get_replica_duplicator_manager().min_confirmed_decree(),
tt.min_confirmed_decree);
r->get_replica_duplicator_manager()._duplications.clear();
};

{
// non-primary
test_case tt{{1, 2, 3}, invalid_decree};
assert_test(tt);
}

{ // primary
r->as_primary();
test_case tt{{1, 2, 3}, 1};
assert_test(tt);

tt = {{1000}, 1000};
assert_test(tt);

tt = {{}, invalid_decree};
assert_test(tt);
}
}
};

TEST_F(replica_duplicator_manager_test, get_duplication_confirms)
{
test_get_duplication_confirms();
}

TEST_F(replica_duplicator_manager_test, set_confirmed_decree_non_primary)
{
test_set_confirmed_decree_non_primary();
}

TEST_F(replica_duplicator_manager_test, remove_non_existed_duplications)
{
test_remove_non_existed_duplications();
}

TEST_F(replica_duplicator_manager_test, min_confirmed_decree) { test_min_confirmed_decree(); }

} // namespace replication
} // namespace dsn
123 changes: 123 additions & 0 deletions src/dist/replication/lib/duplication/test/replica_duplicator_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include <dsn/utility/filesystem.h>

#include "dist/replication/lib/mutation_log_utils.h"
#include "dist/replication/lib/duplication/load_from_private_log.h"
#include "dist/replication/lib/duplication/duplication_pipeline.h"
#include "duplication_test_base.h"

namespace dsn {
namespace apps {

// for loading PUT mutations from log file.
DEFINE_TASK_CODE_RPC(RPC_RRDB_RRDB_PUT, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT);

} // namespace apps
} // namespace dsn

namespace dsn {
namespace replication {

class replica_duplicator_test : public duplication_test_base
{
public:
replica_duplicator_test() { _replica->init_private_log(_log_dir); }

void test_new_duplicator()
{
dupid_t dupid = 1;
std::string remote = "remote_address";
duplication_status::type status = duplication_status::DS_PAUSE;
int64_t confirmed_decree = 100;

duplication_entry dup_ent;
dup_ent.dupid = dupid;
dup_ent.remote = remote;
dup_ent.status = status;
dup_ent.progress[_replica->get_gpid().get_partition_index()] = confirmed_decree;

auto duplicator = make_unique<replica_duplicator>(dup_ent, _replica.get());
ASSERT_EQ(duplicator->id(), dupid);
ASSERT_EQ(duplicator->remote_cluster_name(), remote);
ASSERT_EQ(duplicator->_status, status);
ASSERT_EQ(duplicator->progress().confirmed_decree, confirmed_decree);
ASSERT_EQ(duplicator->progress().last_decree, confirmed_decree);

auto &expected_env = *duplicator;
ASSERT_EQ(duplicator->tracker(), expected_env.__conf.tracker);
ASSERT_EQ(duplicator->get_gpid().thread_hash(), expected_env.__conf.thread_hash);
}

void test_pause_start_duplication()
{
mutation_log_ptr mlog = new mutation_log_private(
_replica->dir(), 4, _replica->get_gpid(), _replica.get(), 1024, 512, 10000);
EXPECT_EQ(mlog->open(nullptr, nullptr), ERR_OK);

{
_replica->init_private_log(mlog);
auto duplicator = create_test_duplicator();

duplicator->update_status_if_needed(duplication_status::DS_START);
ASSERT_EQ(duplicator->_status, duplication_status::DS_START);
auto expected_env = duplicator->_ship->_mutation_duplicator->_env;
ASSERT_EQ(duplicator->tracker(), expected_env.__conf.tracker);
ASSERT_EQ(duplicator->get_gpid().thread_hash(), expected_env.__conf.thread_hash);

// corner cases: next_status is INIT
duplicator->update_status_if_needed(duplication_status::DS_INIT);
ASSERT_EQ(duplicator->_status, duplication_status::DS_START);
duplicator->update_status_if_needed(duplication_status::DS_START);
ASSERT_EQ(duplicator->_status, duplication_status::DS_START);

duplicator->update_status_if_needed(duplication_status::DS_PAUSE);
ASSERT_TRUE(duplicator->paused());
ASSERT_EQ(duplicator->_status, duplication_status::DS_PAUSE);
ASSERT_EQ(duplicator->_load_private.get(), nullptr);
ASSERT_EQ(duplicator->_load.get(), nullptr);
ASSERT_EQ(duplicator->_ship.get(), nullptr);

// corner cases: next_status is INIT
duplicator->update_status_if_needed(duplication_status::DS_INIT);
ASSERT_EQ(duplicator->_status, duplication_status::DS_PAUSE);

// corner cases: next_status is INIT
duplicator->update_status_if_needed(duplication_status::DS_INIT);
ASSERT_EQ(duplicator->_status, duplication_status::DS_PAUSE);

duplicator->wait_all();
}
}
};

TEST_F(replica_duplicator_test, new_duplicator) { test_new_duplicator(); }

TEST_F(replica_duplicator_test, pause_start_duplication) { test_pause_start_duplication(); }

TEST_F(replica_duplicator_test, duplication_progress)
{
auto duplicator = create_test_duplicator();
ASSERT_EQ(duplicator->progress().last_decree, 0); // start duplication from empty plog
ASSERT_EQ(duplicator->progress().confirmed_decree, invalid_decree);

duplicator->update_progress(duplicator->progress().set_last_decree(10));
ASSERT_EQ(duplicator->progress().last_decree, 10);
ASSERT_EQ(duplicator->progress().confirmed_decree, invalid_decree);

duplicator->update_progress(duplicator->progress().set_confirmed_decree(10));
ASSERT_EQ(duplicator->progress().confirmed_decree, 10);
ASSERT_EQ(duplicator->progress().last_decree, 10);

ASSERT_EQ(duplicator->update_progress(duplicator->progress().set_confirmed_decree(1)),
error_s::make(ERR_INVALID_STATE, "never decrease confirmed_decree: new(1) old(10)"));

ASSERT_EQ(duplicator->update_progress(duplicator->progress().set_confirmed_decree(12)),
error_s::make(ERR_INVALID_STATE,
"last_decree(10) should always larger than confirmed_decree(12)"));
}

} // namespace replication
} // namespace dsn
4 changes: 4 additions & 0 deletions src/dist/replication/lib/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,10 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
bool verbose_commit_log() const;
dsn::task_tracker *tracker() { return &_tracker; }

/// \see replica_duplicate.cpp
replica_duplicator_manager *get_duplication_manager() const { return _duplication_mgr.get(); }
bool is_duplicating() const;
void update_init_info_duplicating(bool duplicating);

void update_last_checkpoint_generate_time();

Expand All @@ -195,6 +198,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
error_code initialize_on_new();
error_code initialize_on_load();
error_code init_app_and_prepare_list(bool create_new);
decree get_replay_start_decree();

/////////////////////////////////////////////////////////////////
// 2pc
Expand Down
22 changes: 15 additions & 7 deletions src/dist/replication/lib/replica_check.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
#include "mutation.h"
#include "mutation_log.h"
#include "replica_stub.h"

#include "dist/replication/lib/duplication/replica_duplicator_manager.h"

#include <dsn/dist/fmt_logging.h>
#include <dsn/dist/replication/replication_app_base.h>

namespace dsn {
Expand Down Expand Up @@ -91,6 +95,7 @@ void replica::broadcast_group_check()
request->node = addr;
_primary_states.get_replica_config(it->second, request->config);
request->last_committed_decree = last_committed_decree();
request->__set_confirmed_decree(_duplication_mgr->min_confirmed_decree());

if (request->config.status == partition_status::PS_POTENTIAL_SECONDARY) {
auto it = _primary_states.learners.find(addr);
Expand Down Expand Up @@ -133,13 +138,13 @@ void replica::on_group_check(const group_check_request &request,
{
_checker.only_one_thread_access();

ddebug("%s: process group check, primary = %s, ballot = %" PRId64
", status = %s, last_committed_decree = %" PRId64,
name(),
request.config.primary.to_string(),
request.config.ballot,
enum_to_string(request.config.status),
request.last_committed_decree);
ddebug_replica("process group check, primary = {}, ballot = {}, status = {}, "
"last_committed_decree = {}, confirmed_decree = {}",
request.config.primary.to_string(),
request.config.ballot,
enum_to_string(request.config.status),
request.last_committed_decree,
request.__isset.confirmed_decree ? request.confirmed_decree : invalid_decree);

if (request.config.ballot < get_ballot()) {
response.err = ERR_VERSION_OUTDATED;
Expand All @@ -154,6 +159,9 @@ void replica::on_group_check(const group_check_request &request,
} else if (is_same_ballot_status_change_allowed(status(), request.config.status)) {
update_local_configuration(request.config, true);
}
if (request.__isset.confirmed_decree) {
_duplication_mgr->update_confirmed_decree_if_secondary(request.confirmed_decree);
}

switch (status()) {
case partition_status::PS_INACTIVE:
Expand Down
Loading

0 comments on commit ba952c2

Please sign in to comment.