Skip to content

Commit

Permalink
KUDU-2463 pt 2: bump MVCC safe time on Raft no-op
Browse files Browse the repository at this point in the history
Based on the same rationale as Part 1 of this patch series, this patch
updates MVCC's safe and clean time using the no-op timestamp provided by
the leader following a successful Raft election.

There isn't an obvious reference to the tablet (to get to the MVCC
module) in Raft consensus, but there is a ReplicaTransactionFactory,
that the TabletReplica implements. I've extended this to be a more
general ConsensusRoundHandler that can be used to create transactions or
finish transactions as needed.

An invariant we are trying to uphold is that once MVCC's safe time is
adjusted, all further transactions registered with MVCC will have higher
timestamps than the safe time. With this in mind, it is critical that
the adjustment of safe time be serialized with respect to transactions.
This is the case today because safe time is only advanced by writes in
the prepare thread, on which transactions are started. To echo this,
Raft no-ops will also adjust the safe time on the prepare thread.

The following test changes are included:
- to ensure nothing terrible happens when there is a lot of election
  churn (and hence, a lot of new timestamp advancement), I've tweaked
  exactly_once_writes-itest to more explicitly churn elections.
  Previously it attempted this with just a low timeout. I injected some
  latency to make it churn a bit harder and looped the test 1000 times
  in both TSAN and debug mode.
- since MvccManager::StartTransaction() will hit a CHECK failure if it
  starts a transaction at a timestamp that was previously marked safe, I
  added a configurable sleep at the beginning of the function to widen
  the window during which safe time can be advanced, encouraging the
  CHECK failure. I configured this in raft_consensus_election-itest and
  looped it 1000 times in TSAN and debug mode. If no-ops _didn't_ use
  the prepare thread to advance safe time, the added delay would lead to
  CHECK failures.
- added a test that ensures that, on its own, a tablet will bump its
  MVCC timestamps, with just its elections
- tweaked raft_consensus-itest to use more realistic timestamps, now
  that MVCC's clean and safe time gets updated with the leadership no-op

This patch alone doesn't fix KUDU-2463. Rather, a later patch will
prevent scans from occuring if the MVCC safe time hasn't been advanced,
at which point this patch will reduce the window of scan unavailability.

Change-Id: Icbf812e2cbeeee7c322fd980245cfe40c886a15a
Reviewed-on: http://gerrit.cloudera.org:8080/11427
Tested-by: Andrew Wong <awong@cloudera.com>
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mpercy@apache.org>
  • Loading branch information
andrwng committed Oct 12, 2018
1 parent fdc215c commit bc817a4
Show file tree
Hide file tree
Showing 11 changed files with 204 additions and 81 deletions.
6 changes: 4 additions & 2 deletions src/kudu/consensus/consensus-test-util.h
Expand Up @@ -668,7 +668,7 @@ class TestDriver {
};

// A transaction factory for tests, usually this is implemented by TabletReplica.
class TestTransactionFactory : public ReplicaTransactionFactory {
class TestTransactionFactory : public ConsensusRoundHandler {
public:
explicit TestTransactionFactory(log::Log* log)
: consensus_(nullptr),
Expand All @@ -681,7 +681,7 @@ class TestTransactionFactory : public ReplicaTransactionFactory {
consensus_ = consensus;
}

Status StartReplicaTransaction(const scoped_refptr<ConsensusRound>& round) OVERRIDE {
Status StartFollowerTransaction(const scoped_refptr<ConsensusRound>& round) override {
auto txn = new TestDriver(pool_.get(), log_, round);
txn->round_->SetConsensusReplicatedCallback(std::bind(
&TestDriver::ReplicationFinished,
Expand All @@ -690,6 +690,8 @@ class TestTransactionFactory : public ReplicaTransactionFactory {
return Status::OK();
}

void FinishConsensusOnlyRound(ConsensusRound* /*round*/) override {}

void ReplicateAsync(ConsensusRound* round) {
CHECK_OK(consensus_->Replicate(round));
}
Expand Down
13 changes: 7 additions & 6 deletions src/kudu/consensus/raft_consensus.cc
Expand Up @@ -227,15 +227,15 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
gscoped_ptr<PeerProxyFactory> peer_proxy_factory,
scoped_refptr<log::Log> log,
scoped_refptr<TimeManager> time_manager,
ReplicaTransactionFactory* txn_factory,
ConsensusRoundHandler* round_handler,
const scoped_refptr<MetricEntity>& metric_entity,
Callback<void(const std::string& reason)> mark_dirty_clbk) {
DCHECK(metric_entity);

peer_proxy_factory_ = DCHECK_NOTNULL(std::move(peer_proxy_factory));
log_ = DCHECK_NOTNULL(std::move(log));
time_manager_ = DCHECK_NOTNULL(std::move(time_manager));
txn_factory_ = DCHECK_NOTNULL(txn_factory);
round_handler_ = DCHECK_NOTNULL(round_handler);
mark_dirty_clbk_ = std::move(mark_dirty_clbk);

term_metric_ = metric_entity->FindOrCreateGauge(&METRIC_raft_term, CurrentTerm());
Expand Down Expand Up @@ -328,7 +328,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
<< SecureShortDebugString(cmeta_->ActiveConfig());
for (ReplicateMsg* replicate : info.orphaned_replicates) {
ReplicateRefPtr replicate_ptr = make_scoped_refptr_replicate(new ReplicateMsg(*replicate));
RETURN_NOT_OK(StartReplicaTransactionUnlocked(replicate_ptr));
RETURN_NOT_OK(StartFollowerTransactionUnlocked(replicate_ptr));
}

// Set the initial committed opid for the PendingRounds only after
Expand Down Expand Up @@ -927,7 +927,7 @@ static bool IsConsensusOnlyOperation(OperationType op_type) {
return op_type == NO_OP || op_type == CHANGE_CONFIG_OP;
}

Status RaftConsensus::StartReplicaTransactionUnlocked(const ReplicateRefPtr& msg) {
Status RaftConsensus::StartFollowerTransactionUnlocked(const ReplicateRefPtr& msg) {
DCHECK(lock_.is_locked());

if (IsConsensusOnlyOperation(msg->get()->op_type())) {
Expand All @@ -943,7 +943,7 @@ Status RaftConsensus::StartReplicaTransactionUnlocked(const ReplicateRefPtr& msg
<< SecureShortDebugString(msg->get()->id());
scoped_refptr<ConsensusRound> round(new ConsensusRound(this, msg));
ConsensusRound* round_ptr = round.get();
RETURN_NOT_OK(txn_factory_->StartReplicaTransaction(round));
RETURN_NOT_OK(round_handler_->StartFollowerTransaction(round));
return AddPendingOperationUnlocked(round_ptr);
}

Expand Down Expand Up @@ -1386,7 +1386,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
}

while (iter != deduped_req.messages.end()) {
prepare_status = StartReplicaTransactionUnlocked(*iter);
prepare_status = StartFollowerTransactionUnlocked(*iter);
if (PREDICT_FALSE(!prepare_status.ok())) {
break;
}
Expand Down Expand Up @@ -2636,6 +2636,7 @@ void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round,
}
VLOG_WITH_PREFIX_UNLOCKED(1) << "Committing " << op_type_str << " with op id "
<< round->id();
round_handler_->FinishConsensusOnlyRound(round);
gscoped_ptr<CommitMsg> commit_msg(new CommitMsg);
commit_msg->set_op_type(round->replicate_msg()->op_type());
*commit_msg->mutable_commited_op_id() = round->id();
Expand Down
50 changes: 29 additions & 21 deletions src/kudu/consensus/raft_consensus.h
Expand Up @@ -72,10 +72,10 @@ namespace consensus {

class ConsensusMetadataManager;
class ConsensusRound;
class ConsensusRoundHandler;
class PeerManager;
class PeerProxyFactory;
class PendingRounds;
class ReplicaTransactionFactory;
struct ConsensusBootstrapInfo;
struct ElectionResult;

Expand Down Expand Up @@ -148,7 +148,7 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
gscoped_ptr<PeerProxyFactory> peer_proxy_factory,
scoped_refptr<log::Log> log,
scoped_refptr<TimeManager> time_manager,
ReplicaTransactionFactory* txn_factory,
ConsensusRoundHandler* round_handler,
const scoped_refptr<MetricEntity>& metric_entity,
Callback<void(const std::string& reason)> mark_dirty_clbk);

Expand Down Expand Up @@ -518,7 +518,7 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,

// Begin a replica transaction. If the type of message in 'msg' is not a type
// that uses transactions, delegates to StartConsensusOnlyRoundUnlocked().
Status StartReplicaTransactionUnlocked(const ReplicateRefPtr& msg);
Status StartFollowerTransactionUnlocked(const ReplicateRefPtr& msg);

// Returns true if this node is the only voter in the Raft configuration.
bool IsSingleVoterConfig() const;
Expand Down Expand Up @@ -803,9 +803,10 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
scoped_refptr<TimeManager> time_manager_;
gscoped_ptr<PeerProxyFactory> peer_proxy_factory_;

// When we receive a message from a remote peer telling us to start a transaction, we use
// this factory to start it.
ReplicaTransactionFactory* txn_factory_;
// When we receive a message from a remote peer telling us to start a
// transaction, or finish a round, we use this handler to handle it.
// This may update replica state (e.g. the tablet replica).
ConsensusRoundHandler* round_handler_;

std::unique_ptr<PeerManager> peer_manager_;

Expand Down Expand Up @@ -896,32 +897,39 @@ struct ConsensusBootstrapInfo {
DISALLOW_COPY_AND_ASSIGN(ConsensusBootstrapInfo);
};

// Factory for replica transactions.
// An implementation of this factory must be registered prior to consensus
// start, and is used to create transactions when the consensus implementation receives
// messages from the leader.
// Handler for consensus rounds.
// An implementation of this handler must be registered prior to consensus
// start, and is used to:
// - Create transactions when the consensus implementation receives messages
// from the leader.
// - Handle when the consensus implementation finishes a non-transaction round
//
// Replica transactions execute the following way:
// Follower transactions execute the following way:
//
// - When a ReplicateMsg is first received from the leader, the RaftConsensus
// instance creates the ConsensusRound and calls StartReplicaTransaction().
// This will trigger the Prepare(). At the same time replica consensus
// instance immediately stores the ReplicateMsg in the Log. Once the replicate
// instance creates the ConsensusRound and calls StartFollowerTransaction().
// This will trigger the Prepare(). At the same time, the follower's consensus
// instance immediately stores the ReplicateMsg in the Log. Once the
// message is stored in stable storage an ACK is sent to the leader (i.e. the
// replica RaftConsensus instance does not wait for Prepare() to finish).
//
// - When the CommitMsg for a replicate is first received from the leader
// the replica waits for the corresponding Prepare() to finish (if it has
// not completed yet) and then proceeds to trigger the Apply().
// - When the CommitMsg for a replicate is first received from the leader, the
// follower waits for the corresponding Prepare() to finish (if it has not
// completed yet) and then proceeds to trigger the Apply().
//
// - Once Apply() completes the ReplicaTransactionFactory is responsible for logging
// - Once Apply() completes the ConsensusRoundHandler is responsible for logging
// a CommitMsg to the log to ensure that the operation can be properly restored
// on a restart.
class ReplicaTransactionFactory {
class ConsensusRoundHandler {
public:
virtual Status StartReplicaTransaction(const scoped_refptr<ConsensusRound>& context) = 0;
virtual ~ConsensusRoundHandler() {}

virtual ~ReplicaTransactionFactory() {}
virtual Status StartFollowerTransaction(const scoped_refptr<ConsensusRound>& context) = 0;

// Consensus-only rounds complete when non-transaction ops finish
// replication. This can be used to trigger callbacks, akin to an Apply() for
// transaction ops.
virtual void FinishConsensusOnlyRound(ConsensusRound* round) = 0;
};

// Context for a consensus round on the LEADER side, typically created as an
Expand Down
16 changes: 14 additions & 2 deletions src/kudu/integration-tests/exactly_once_writes-itest.cc
Expand Up @@ -63,6 +63,9 @@ using std::unique_ptr;
using std::vector;

namespace kudu {

using strings::Substitute;

namespace tserver {

static const int kConsensusRpcTimeoutForTests = 50;
Expand Down Expand Up @@ -314,13 +317,22 @@ TEST_F(ExactlyOnceSemanticsITest, TestWritesWithExactlyOnceSemanticsWithCrashyNo
TEST_F(ExactlyOnceSemanticsITest, TestWritesWithExactlyOnceSemanticsWithChurnyElections) {
vector<string> ts_flags, master_flags;

int raft_heartbeat_interval;
#if defined(THREAD_SANITIZER) || defined(ADDRESS_SANITIZER)
// On TSAN/ASAN builds, we need to be a little bit less churny in order to make
// any progress at all.
ts_flags.push_back("--raft_heartbeat_interval_ms=5");
raft_heartbeat_interval = 100;
#else
ts_flags.emplace_back("--raft_heartbeat_interval_ms=2");
raft_heartbeat_interval = 50;
#endif
// Inject random latency of up to the Raft heartbeat interval to ensure there
// will be missed heartbeats, triggering actual elections.
ts_flags = {
Substitute("--raft_heartbeat_interval_ms=$0", raft_heartbeat_interval),
Substitute("--consensus_inject_latency_ms_in_notifications=$0", raft_heartbeat_interval),
"--raft_enable_pre_election=false",
"--leader_failure_max_missed_heartbeat_periods=1",
};

int num_batches = 200;
if (AllowSlowTests()) {
Expand Down

0 comments on commit bc817a4

Please sign in to comment.