Skip to content
This repository has been archived by the owner on Mar 3, 2020. It is now read-only.

Commit

Permalink
KEP-825 PBFT view change should propagate configuration change
Browse files Browse the repository at this point in the history
  • Loading branch information
paularchard committed Jan 7, 2019
1 parent 201b5ba commit 5da3a9e
Show file tree
Hide file tree
Showing 9 changed files with 327 additions and 51 deletions.
178 changes: 171 additions & 7 deletions pbft/pbft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pbft::pbft(
, failure_detector(std::move(failure_detector))
, io_context(io_context)
, audit_heartbeat_timer(this->io_context->make_unique_steady_timer())
, new_config_timer(this->io_context->make_unique_steady_timer())
, crypto(std::move(crypto))
, operation_manager(std::move(operation_manager))
{
Expand Down Expand Up @@ -458,17 +459,25 @@ pbft::handle_join_or_leave(const pbft_membership_msg& msg, std::shared_ptr<bzn::
void
pbft::handle_join_response(const pbft_membership_msg& msg)
{
if (!this->in_swarm)
if (this->in_swarm == swarm_status::joining)
{
if (msg.result())
{
this->in_swarm = true;
this->in_swarm = swarm_status::waiting;
LOG(debug) << "Successfully joined the swarm, waiting for NEW_VIEW message...";
}
else
{
LOG(error) << "Request to join swarm rejected. Aborting...";
throw (std::runtime_error("Request to join swarm rejected."));
// since we're not persisting configuration to disk, we could be in one of two situations:
// either we've been rejected, or we restarted and the swarm still thinks we're a member.
// for now (since there's no blacklist yet) we'll assume the latter and try to continue.
// there is a potential for problems here if we don't have the latest configuration.
// TODO: change this to abort once we're persisting configuration
LOG(error) << "Request to join swarm rejected - did we exit and restart?";
this->in_swarm = swarm_status::joined;

// LOG(error) << "Request to join swarm rejected. Aborting...";
// throw (std::runtime_error("Request to join swarm rejected."));
}
}
else
Expand Down Expand Up @@ -660,6 +669,11 @@ pbft::do_committed(const std::shared_ptr<pbft_operation>& op)
{
// get rid of all other previous configs, except for currently active one
this->configurations.remove_prior_to(config.get_hash());

this->new_config_timer->cancel();
this->new_config_timer->expires_from_now(NEW_CONFIG_INTERVAL);
this->new_config_timer->async_wait(
std::bind(&pbft::handle_new_config_timeout, shared_from_this(), std::placeholders::_1));
}

// send response to new node
Expand Down Expand Up @@ -710,6 +724,36 @@ pbft::do_committed(const std::shared_ptr<pbft_operation>& op)
}
}

void
pbft::handle_new_config_timeout(const boost::system::error_code& ec)
{
if (ec == boost::asio::error::operation_aborted)
{
return;
}

if (ec)
{
LOG(error) << "handle_new_config_timeout error: " << ec.message();
return;
}

// send viewchange with new config
// TODO: should the current view be invalidated?
auto config = this->configurations.newest();
if (config)
{
this->view_is_valid = false;
auto view_change = pbft::make_viewchange(this->view + 1, this->latest_stable_checkpoint().first
, this->stable_checkpoint_proof, this->operation_manager->prepared_operations_since(this->latest_stable_checkpoint().first));
this->broadcast(this->wrap_message(view_change));
}
else
{
LOG(error) << "unable to send viewchange because we don't have a valid new configuration";
}
}

bool
pbft::is_primary() const
{
Expand Down Expand Up @@ -787,6 +831,7 @@ pbft::handle_failure()
std::lock_guard<std::mutex> lock(this->pbft_lock);
LOG (error) << "handle_failure - PBFT failure - invalidating current view and sending VIEWCHANGE to view: " << this->view + 1;
this->notify_audit_failure_detected();
this->new_config_timer->cancel();
this->view_is_valid = false;
auto ops = this->operation_manager->prepared_operations_since(this->latest_stable_checkpoint().first);
pbft_msg view_change{pbft::make_viewchange(this->view + 1, this->latest_stable_checkpoint().first, this->stable_checkpoint_proof, ops)};
Expand Down Expand Up @@ -1289,6 +1334,8 @@ pbft::make_newview(
pbft_msg newview;
newview.set_type(PBFT_MSG_NEWVIEW);
newview.set_view(new_view_index);
newview.set_config_hash(this->configurations.current()->get_hash());
newview.set_config(this->configurations.current()->to_string());

// V is the set of 2f+1 view change messages
for (const auto &sender_viewchange_envelope: viewchange_envelopes_from_senders)
Expand Down Expand Up @@ -1421,6 +1468,15 @@ pbft::handle_viewchange(const pbft_msg &msg, const bzn_envelope &original_msg)
viewchange_envelopes_from_senders[sender] = viewchange_envelope;
}

// figure out which config to use. As primary we need to switch to the new config now so
// we send newview message to any new nodes.
// note - this isn't optimal, we're parsing all these messages multiple times.
if (!adopt_config_from_viewchange(viewchange_envelopes_from_senders))
{
LOG (error) << "No valid configuration for new view, aborting...";
return;
}

auto res = this->build_newview(viewchange->first, viewchange_envelopes_from_senders);
this->next_issued_sequence_number = res.second;
this->broadcast(this->wrap_message(res.first));
Expand All @@ -1429,14 +1485,57 @@ pbft::handle_viewchange(const pbft_msg &msg, const bzn_envelope &original_msg)
void
pbft::handle_newview(const pbft_msg& msg, const bzn_envelope& original_msg)
{
if (!this->is_valid_newview_message(msg, original_msg))
// are we just now joining the swarm?
if (this->in_swarm == swarm_status::waiting)
{
if (!validate_config_in_newview(msg))
{
LOG(debug) << "newview received with unsubstantiated configuration";
return;
}

auto newconfig = std::make_shared<pbft_configuration>();
if (!newconfig->from_string(msg.config()))
{
LOG(debug) << "newview received with invalid configuration";
return;
}

auto hash = newconfig->get_hash();
this->configurations.add(newconfig);
this->configurations.enable(hash);

// we need to switch to this configuration now so we have the peer info to validate the message
this->move_to_new_configuration(hash);

if (!this->is_valid_newview_message(msg, original_msg))
{
LOG (debug) << "handle_newview - ignoring invalid NEWVIEW message while waiting to join swarm";
return;
}

// adopt checkpoint in newview message
pbft_msg viewchange;
viewchange.ParseFromString(msg.viewchange_messages(0).pbft());
this->save_checkpoint(viewchange);
this->in_swarm = swarm_status::joined;
}
else if (!this->is_valid_newview_message(msg, original_msg))
{
LOG (debug) << "handle_newview - ignoring invalid NEWVIEW message ";
return;
}

LOG(debug) << "handle_newview - recieved valid NEWVIEW message";

// validate requested configuration and switch to it
if (!this->is_configuration_acceptable_in_new_view(msg.config_hash()) ||
!this->move_to_new_configuration(msg.config_hash()))
{
LOG(debug) << "unable to switch to configuration in new view";
return;
}

this->view = msg.view();
this->view_is_valid = true;

Expand Down Expand Up @@ -1571,7 +1670,7 @@ pbft::broadcast_new_configuration(pbft_configuration::shared_const_ptr config)
}

bool
pbft::is_configuration_acceptable_in_new_view(hash_t config_hash)
pbft::is_configuration_acceptable_in_new_view(const hash_t& config_hash)
{
return this->configurations.is_enabled(config_hash);
}
Expand All @@ -1592,8 +1691,11 @@ pbft::handle_config_message(const pbft_msg& msg, const std::shared_ptr<pbft_oper
}

bool
pbft::move_to_new_configuration(hash_t config_hash)
pbft::move_to_new_configuration(const hash_t& config_hash)
{
if (this->configurations.current()->get_hash() == config_hash)
return true;

if (this->configurations.is_enabled(config_hash))
{
this->configurations.set_current(config_hash);
Expand All @@ -1610,6 +1712,64 @@ pbft::proposed_config_is_acceptable(std::shared_ptr<pbft_configuration> /* confi
return true;
}

bool
pbft::adopt_config_from_viewchange(const std::map<uuid_t, bzn_envelope>& viewchange_envelopes)
{
std::map<bzn::hash_t, size_t> config_hashes;

for (const auto &viewchange_envelope : viewchange_envelopes)
{
pbft_msg viewchange_message;
viewchange_message.ParseFromString(viewchange_envelope.second.pbft());

// accumulate this viewchange's "vote" for the config to use
(config_hashes.insert(std::make_pair(viewchange_message.config_hash(), 0))).first->second++;
}

auto config_it = std::find_if(config_hashes.begin(), config_hashes.end(), [this](auto& config)
{
return config.second >= 2 * this->max_faulty_nodes() + 1;
});

if (config_it == config_hashes.end())
{
LOG (error) << "viewchange messages do not have a consensus on which configuration to use";
return false;
}

if (this->configurations.get(config_it->first) == nullptr || !this->configurations.is_enabled(config_it->first))
{
LOG (error) << "requested view configuration is not available";
return false;
}

auto new_config = config_it->first;
if (!this->is_configuration_acceptable_in_new_view(new_config) || !this->move_to_new_configuration(new_config))
{
LOG (error) << "Unable to switch to new view configuration";
return false;
}

return true;
}

bool
pbft::validate_config_in_newview(const pbft_msg& msg)
{
size_t match_count = 0;
for (int i = 0; i < msg.viewchange_messages_size(); i++)
{
pbft_msg viewchange;
if (viewchange.ParseFromString(msg.viewchange_messages(i).pbft()) &&
viewchange.config_hash() == msg.config_hash())
{
match_count++;
}
}

return match_count >= this->honest_majority_size(msg.viewchange_messages_size());
}

timestamp_t
pbft::now() const
{
Expand Down Expand Up @@ -1651,6 +1811,7 @@ pbft::make_viewchange(
viewchange.set_type(PBFT_MSG_VIEWCHANGE);
viewchange.set_view(new_view);
viewchange.set_sequence(base_sequence_number); // base_sequence_number = n = sequence # of last valid checkpoint
viewchange.set_config_hash(this->configurations.newest()->get_hash());

// C = a set of local 2*f + 1 valid checkpoint messages
for (const auto& msg : stable_checkpoint_proof)
Expand Down Expand Up @@ -1700,6 +1861,7 @@ pbft::join_swarm()
// are we already in the peers list?
if (this->is_peer(this->uuid))
{
this->in_swarm = swarm_status::joined;
return;
}

Expand All @@ -1724,6 +1886,8 @@ pbft::join_swarm()
auto msg_ptr = std::make_shared<bzn_envelope>(this->wrap_message(join_msg));
this->node->send_message(make_endpoint(this->current_peers()[selected]), msg_ptr, false);

this->in_swarm = swarm_status::joining;

// TODO: set timer and retry with different peer if we don't get a response
#else
LOG(fatal) << "This node is not a member of the swarm, exiting.";
Expand Down
24 changes: 11 additions & 13 deletions pbft/pbft.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
namespace
{
const std::chrono::milliseconds HEARTBEAT_INTERVAL{std::chrono::milliseconds(5000)};
const std::chrono::seconds NEW_CONFIG_INTERVAL{std::chrono::seconds(30)};
const std::string INITIAL_CHECKPOINT_HASH = "<null db state>";
const uint64_t CHECKPOINT_INTERVAL = 100; //TODO: KEP-574
const double HIGH_WATER_INTERVAL_IN_CHECKPOINTS = 2.0; //TODO: KEP-574
Expand Down Expand Up @@ -152,6 +153,7 @@ namespace bzn
void broadcast(const bzn_envelope& message);

void handle_audit_heartbeat_timeout(const boost::system::error_code& ec);
void handle_new_config_timeout(const boost::system::error_code& ec);

void notify_audit_failure_detected();

Expand All @@ -174,9 +176,11 @@ namespace bzn
const std::vector<bzn::peer_address_t>& current_peers() const;
const peer_address_t& get_peer_by_uuid(const std::string& uuid) const;
void broadcast_new_configuration(pbft_configuration::shared_const_ptr config);
bool is_configuration_acceptable_in_new_view(hash_t config_hash);
bool move_to_new_configuration(hash_t config_hash);
bool is_configuration_acceptable_in_new_view(const hash_t& config_hash);
bool move_to_new_configuration(const hash_t& config_hash);
bool proposed_config_is_acceptable(std::shared_ptr<pbft_configuration> config);
bool adopt_config_from_viewchange(const std::map<uuid_t, bzn_envelope>& viewchange_envelopes);
bool validate_config_in_newview(const pbft_msg& msg);

void maybe_record_request(const pbft_msg& msg, const std::shared_ptr<pbft_operation>& op);

Expand All @@ -186,7 +190,7 @@ namespace bzn

void join_swarm();
// VIEWCHANGE/NEWVIEW Helper methods
static pbft_msg make_viewchange(uint64_t new_view, uint64_t n, const std::unordered_map<bzn::uuid_t, std::string>& stable_checkpoint_proof, const std::map<uint64_t, std::shared_ptr<bzn::pbft_operation>>& prepared_operations);
pbft_msg make_viewchange(uint64_t new_view, uint64_t n, const std::unordered_map<bzn::uuid_t, std::string>& stable_checkpoint_proof, const std::map<uint64_t, std::shared_ptr<bzn::pbft_operation>>& prepared_operations);
pbft_msg make_newview(uint64_t new_view_index, const std::map<uuid_t,bzn_envelope>& viewchange_envelopes_from_senders, const std::map<uint64_t, bzn_envelope>& pre_prepare_messages) const;
std::pair<pbft_msg, uint64_t> build_newview(uint64_t new_view, const std::map<uuid_t,bzn_envelope>& viewchange_envelopes_from_senders) const;
std::map<bzn::checkpoint_t , std::set<bzn::uuid_t>> validate_and_extract_checkpoint_hashes(const pbft_msg &viewchange_message) const;
Expand Down Expand Up @@ -220,9 +224,12 @@ namespace bzn
const std::shared_ptr<bzn::asio::io_context_base> io_context;

std::unique_ptr<bzn::asio::steady_timer_base> audit_heartbeat_timer;
std::unique_ptr<bzn::asio::steady_timer_base> new_config_timer;

bool audit_enabled = true;
bool in_swarm = false;

enum class swarm_status {not_joined, joining, waiting, joined};
swarm_status in_swarm = swarm_status::not_joined;

checkpoint_t stable_checkpoint{0, INITIAL_CHECKPOINT_HASH};

Expand All @@ -247,15 +254,6 @@ namespace bzn

std::shared_ptr<pbft_operation_manager> operation_manager;

FRIEND_TEST(pbft_test, join_request_generates_new_config_preprepare);
FRIEND_TEST(pbft_test, valid_leave_request_test);
FRIEND_TEST(pbft_test, invalid_leave_request_test);
FRIEND_TEST(pbft_test, test_new_config_preprepare_handling);
FRIEND_TEST(pbft_test, test_new_config_prepare_handling);
FRIEND_TEST(pbft_test, test_new_config_commit_handling);
FRIEND_TEST(pbft_test, test_move_to_new_config);
FRIEND_TEST(pbft_test, full_test);

FRIEND_TEST(pbft_viewchange_test, pbft_with_invalid_view_drops_messages);
FRIEND_TEST(pbft_viewchange_test, test_make_signed_envelope);
FRIEND_TEST(pbft_viewchange_test, test_is_peer);
Expand Down
18 changes: 18 additions & 0 deletions pbft/pbft_config_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ using namespace bzn;
bool
pbft_config_store::add(pbft_configuration::shared_const_ptr config)
{
if (this->find_by_hash(config->get_hash()) != this->configs.end())
{
return false;
}

// TODO - should we be making a copy here instead?
// currently the added config could be changed externally after being added
return (this->configs.insert(std::make_pair(this->next_index++, std::make_pair(std::move(config), false)))).second;
Expand Down Expand Up @@ -112,3 +117,16 @@ pbft_config_store::current() const
auto it = this->configs.find(this->current_index);
return it != this->configs.end() ? it->second.first : nullptr;
}

pbft_configuration::shared_const_ptr
pbft_config_store::newest() const
{
auto config = std::find_if(this->configs.rbegin(), this->configs.rend(),
[](auto& c)
{
// is config enabled?
return c.second.second;
});

return (config == this->configs.rend()) ? nullptr : config->second.first;
}
3 changes: 3 additions & 0 deletions pbft/pbft_config_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ namespace bzn
// returns whether the configuration with the given hash is enabled
bool is_enabled(const hash_t& hash) const;

// returns the most recent enabled configuration
pbft_configuration::shared_const_ptr newest() const;

private:
using index_t = uint64_t;
using config_map = std::map<index_t, std::pair<pbft_configuration::shared_const_ptr, bool>>;
Expand Down

0 comments on commit 5da3a9e

Please sign in to comment.