Skip to content

Commit

Permalink
fix-peers-change-failed-when-cluster-restart-in-joint-status
Browse files Browse the repository at this point in the history
  • Loading branch information
ehds committed Jun 25, 2023
1 parent e32b78a commit d3039b5
Show file tree
Hide file tree
Showing 5 changed files with 466 additions and 9 deletions.
14 changes: 9 additions & 5 deletions src/braft/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
#include "braft/node_manager.h"
#include "braft/snapshot_executor.h"
#include "braft/errno.pb.h"
#include "braft/sync_point.h"
#include "butil/logging.h"


namespace braft {

Expand Down Expand Up @@ -1585,9 +1588,9 @@ void NodeImpl::pre_vote(std::unique_lock<raft_mutex_t>* lck, bool triggered) {
" configuration is possibly out of date";
return;
}
if (!_conf.contains(_server_id)) {
if (_conf.empty()) {
LOG(WARNING) << "node " << _group_id << ':' << _server_id
<< " can't do pre_vote as it is not in " << _conf.conf;
<< " can't do pre_vote as conf is emtpy";
return;
}

Expand Down Expand Up @@ -1644,9 +1647,9 @@ void NodeImpl::elect_self(std::unique_lock<raft_mutex_t>* lck,
bool old_leader_stepped_down) {
LOG(INFO) << "node " << _group_id << ":" << _server_id
<< " term " << _current_term << " start vote and grant vote self";
if (!_conf.contains(_server_id)) {
if (_conf.empty()) {
LOG(WARNING) << "node " << _group_id << ':' << _server_id
<< " can't do elect_self as it is not in " << _conf.conf;
<< " can't do elect_self as _conf is empty";
return;
}
// cancel follower election timer
Expand Down Expand Up @@ -2108,10 +2111,10 @@ int NodeImpl::handle_pre_vote_request(const RequestVoteRequest* request,
LogId last_log_id = _log_manager->last_log_id(true);
lck.lock();
// pre_vote not need ABA check after unlock&lock

int64_t votable_time = _follower_lease.votable_time_from_now();
bool grantable = (LogId(request->last_log_index(), request->last_log_term())
>= last_log_id);
BRAFT_VLOG<<"grantable "<< grantable;
if (grantable) {
granted = (votable_time == 0);
rejected_by_lease = (votable_time > 0);
Expand Down Expand Up @@ -3267,6 +3270,7 @@ void NodeImpl::ConfigurationCtx::next_stage() {
// implementation.
case STAGE_JOINT:
_stage = STAGE_STABLE;
TEST_SYNC_POINT_CALLBACK("NodeImpl::ConfigurationCtx:StableStage:BeforeApplyConfiguration", _node);
return _node->unsafe_apply_configuration(
Configuration(_new_peers), NULL, false);
case STAGE_STABLE:
Expand Down
203 changes: 203 additions & 0 deletions src/braft/sync_point.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#include "sync_point.h"

#include <atomic>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <string>
#include <thread>
#include <unordered_map>
#include <unordered_set>

#include <fcntl.h>

#ifndef NDEBUG
namespace braft {

struct SyncPoint::Data {
Data() : enabled_(false) {}
// Enable proper deletion by subclasses
virtual ~Data() {}
// successor/predecessor map loaded from LoadDependency
std::unordered_map<std::string, std::vector<std::string>> successors_;
std::unordered_map<std::string, std::vector<std::string>> predecessors_;
std::unordered_map<std::string, std::function<void(void *)>> callbacks_;
std::unordered_map<std::string, std::vector<std::string>> markers_;
std::unordered_map<std::string, std::thread::id> marked_thread_id_;

std::mutex mutex_;
std::condition_variable cv_;
// sync points that have been passed through
std::unordered_set<std::string> cleared_points_;
std::atomic<bool> enabled_;
int num_callbacks_running_ = 0;

void LoadDependency(const std::vector<SyncPointPair> &dependencies);
void LoadDependencyAndMarkers(const std::vector<SyncPointPair> &dependencies,
const std::vector<SyncPointPair> &markers);
bool PredecessorsAllCleared(const std::string &point);
void SetCallBack(const std::string &point,
const std::function<void(void *)> &callback) {
std::lock_guard<std::mutex> lock(mutex_);
callbacks_[point] = callback;
}

void ClearCallBack(const std::string &point);
void ClearAllCallBacks();
void EnableProcessing() { enabled_ = true; }
void DisableProcessing() { enabled_ = false; }
void ClearTrace() {
std::lock_guard<std::mutex> lock(mutex_);
cleared_points_.clear();
}
bool DisabledByMarker(const std::string &point, std::thread::id thread_id) {
auto marked_point_iter = marked_thread_id_.find(point);
return marked_point_iter != marked_thread_id_.end() &&
thread_id != marked_point_iter->second;
}
void Process(const std::string &point, void *cb_arg);
};

SyncPoint *SyncPoint::GetInstance() {
static SyncPoint sync_point;
return &sync_point;
}

SyncPoint::SyncPoint() : impl_(new Data) {}

SyncPoint::~SyncPoint() { delete impl_; }

void SyncPoint::LoadDependency(const std::vector<SyncPointPair> &dependencies) {
impl_->LoadDependency(dependencies);
}

void SyncPoint::LoadDependencyAndMarkers(
const std::vector<SyncPointPair> &dependencies,
const std::vector<SyncPointPair> &markers) {
impl_->LoadDependencyAndMarkers(dependencies, markers);
}

void SyncPoint::SetCallBack(const std::string &point,
const std::function<void(void *)> &callback) {
impl_->SetCallBack(point, callback);
}

void SyncPoint::ClearCallBack(const std::string &point) {
impl_->ClearCallBack(point);
}

void SyncPoint::ClearAllCallBacks() { impl_->ClearAllCallBacks(); }

void SyncPoint::EnableProcessing() { impl_->EnableProcessing(); }

void SyncPoint::DisableProcessing() { impl_->DisableProcessing(); }

void SyncPoint::ClearTrace() { impl_->ClearTrace(); }

void SyncPoint::Process(const std::string &point, void *cb_arg) {
impl_->Process(point, cb_arg);
}

void SyncPoint::Data::LoadDependency(
const std::vector<SyncPointPair> &dependencies) {
std::lock_guard<std::mutex> lock(mutex_);
successors_.clear();
predecessors_.clear();
cleared_points_.clear();
for (const auto &dependency : dependencies) {
successors_[dependency.predecessor].push_back(dependency.successor);
predecessors_[dependency.successor].push_back(dependency.predecessor);
}
cv_.notify_all();
}

void SyncPoint::Data::LoadDependencyAndMarkers(
const std::vector<SyncPointPair> &dependencies,
const std::vector<SyncPointPair> &markers) {
std::lock_guard<std::mutex> lock(mutex_);
successors_.clear();
predecessors_.clear();
cleared_points_.clear();
markers_.clear();
marked_thread_id_.clear();
for (const auto &dependency : dependencies) {
successors_[dependency.predecessor].push_back(dependency.successor);
predecessors_[dependency.successor].push_back(dependency.predecessor);
}
for (const auto &marker : markers) {
successors_[marker.predecessor].push_back(marker.successor);
predecessors_[marker.successor].push_back(marker.predecessor);
markers_[marker.predecessor].push_back(marker.successor);
}
cv_.notify_all();
}

bool SyncPoint::Data::PredecessorsAllCleared(const std::string &point) {
for (const auto &pred : predecessors_[point]) {
if (cleared_points_.count(pred) == 0) {
return false;
}
}
return true;
}

void SyncPoint::Data::ClearCallBack(const std::string &point) {
std::unique_lock<std::mutex> lock(mutex_);
while (num_callbacks_running_ > 0) {
cv_.wait(lock);
}
callbacks_.erase(point);
}

void SyncPoint::Data::ClearAllCallBacks() {
std::unique_lock<std::mutex> lock(mutex_);
while (num_callbacks_running_ > 0) {
cv_.wait(lock);
}
callbacks_.clear();
}

void SyncPoint::Data::Process(const std::string &point, void *cb_arg) {
if (!enabled_) {
return;
}

std::unique_lock<std::mutex> lock(mutex_);
auto thread_id = std::this_thread::get_id();

auto marker_iter = markers_.find(point);
if (marker_iter != markers_.end()) {
for (auto &marked_point : marker_iter->second) {
marked_thread_id_.emplace(marked_point, thread_id);
}
}

if (DisabledByMarker(point, thread_id)) {
return;
}

while (!PredecessorsAllCleared(point)) {
cv_.wait(lock);
if (DisabledByMarker(point, thread_id)) {
return;
}
}

auto callback_pair = callbacks_.find(point);
if (callback_pair != callbacks_.end()) {
num_callbacks_running_++;
mutex_.unlock();
callback_pair->second(cb_arg);
mutex_.lock();
num_callbacks_running_--;
}
cleared_points_.insert(point);
cv_.notify_all();
}
} // namespace braft
#endif // NDEBUG
140 changes: 140 additions & 0 deletions src/braft/sync_point.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once

#include <assert.h>

#include <functional>
#include <mutex>
#include <string>
#include <thread>
#include <vector>

#ifdef NDEBUG
#define TEST_SYNC_POINT(x)
#define TEST_IDX_SYNC_POINT(x, index)
#define TEST_SYNC_POINT_CALLBACK(x, y)
#define INIT_SYNC_POINT_SINGLETONS()
#else

namespace braft {

// This class provides facility to reproduce race conditions deterministically
// in unit tests.
// Developer could specify sync points in the codebase via TEST_SYNC_POINT.
// Each sync point represents a position in the execution stream of a thread.
// In the unit test, 'Happens After' relationship among sync points could be
// setup via SyncPoint::LoadDependency, to reproduce a desired interleave of
// threads execution.

class SyncPoint {
public:
static SyncPoint* GetInstance();

SyncPoint(const SyncPoint&) = delete;
SyncPoint& operator=(const SyncPoint&) = delete;
~SyncPoint();

struct SyncPointPair {
std::string predecessor;
std::string successor;
};

// call once at the beginning of a test to setup the dependency between
// sync points. Specifically, execution will not be allowed to proceed past
// each successor until execution has reached the corresponding predecessor,
// in any thread.
void LoadDependency(const std::vector<SyncPointPair>& dependencies);

// call once at the beginning of a test to setup the dependency between
// sync points and setup markers indicating the successor is only enabled
// when it is processed on the same thread as the predecessor.
// When adding a marker, it implicitly adds a dependency for the marker pair.
void LoadDependencyAndMarkers(const std::vector<SyncPointPair>& dependencies,
const std::vector<SyncPointPair>& markers);

// The argument to the callback is passed through from
// TEST_SYNC_POINT_CALLBACK(); nullptr if TEST_SYNC_POINT or
// TEST_IDX_SYNC_POINT was used.
void SetCallBack(const std::string& point,
const std::function<void(void*)>& callback);

// Clear callback function by point
void ClearCallBack(const std::string& point);

// Clear all call back functions.
void ClearAllCallBacks();

// enable sync point processing (disabled on startup)
void EnableProcessing();

// disable sync point processing
void DisableProcessing();

// remove the execution trace of all sync points
void ClearTrace();

// triggered by TEST_SYNC_POINT, blocking execution until all predecessors
// are executed.
// And/or call registered callback function, with argument `cb_arg`
void Process(const std::string& point, void* cb_arg = nullptr);

// template gets length of const string at compile time,
// avoiding strlen() at runtime
template <size_t kLen>
void Process(const char (&point)[kLen], void* cb_arg = nullptr) {
static_assert(kLen > 0, "Must not be empty");
assert(point[kLen - 1] == '\0');
Process(std::string(point, kLen - 1), cb_arg);
}

// TODO: it might be useful to provide a function that blocks until all
// sync points are cleared.

// We want this to be public so we can
// subclass the implementation
struct Data;

private:
// Singleton
SyncPoint();
Data* impl_;
};

// Sets up sync points to mock direct IO instead of actually issuing direct IO
// to the file system.
void SetupSyncPointsToMockDirectIO();
} // namespace braft

// Use TEST_SYNC_POINT to specify sync points inside code base.
// Sync points can have happens-after dependency on other sync points,
// configured at runtime via SyncPoint::LoadDependency. This could be
// utilized to re-produce race conditions between threads.
// See TransactionLogIteratorRace in db_test.cc for an example use case.
// TEST_SYNC_POINT is no op in release build.
#define TEST_SYNC_POINT(x) \
braft::SyncPoint::GetInstance()->Process(x)
#define TEST_IDX_SYNC_POINT(x, index) \
braft::SyncPoint::GetInstance()->Process(x + \
std::to_string(index))
#define TEST_SYNC_POINT_CALLBACK(x, y) \
braft::SyncPoint::GetInstance()->Process(x, y)
#define INIT_SYNC_POINT_SINGLETONS() \
(void)braft::SyncPoint::GetInstance();
#endif // NDEBUG

// Callback sync point for any read IO errors that should be ignored by
// the fault injection framework
// Disable in release mode
#ifdef NDEBUG
#define IGNORE_STATUS_IF_ERROR(_status_)
#else
#define IGNORE_STATUS_IF_ERROR(_status_) \
{ \
if (!_status_.ok()) { \
TEST_SYNC_POINT("FaultInjectionIgnoreError"); \
} \
}
#endif // NDEBUG
Loading

0 comments on commit d3039b5

Please sign in to comment.