Skip to content
This repository has been archived by the owner on Mar 3, 2020. It is now read-only.

Commit

Permalink
KEP-966: Keep old configurations, refactored configuration store
Browse files Browse the repository at this point in the history
  • Loading branch information
paularchard committed Jan 18, 2019
1 parent add1423 commit c35d2d6
Show file tree
Hide file tree
Showing 9 changed files with 301 additions and 269 deletions.
147 changes: 71 additions & 76 deletions pbft/pbft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,7 @@ pbft::handle_membership_message(const bzn_envelope& msg, std::shared_ptr<bzn::se
{
case PBFT_MMSG_JOIN:
case PBFT_MMSG_LEAVE:
this->sessions_waiting_on_forwarded_requests[hash] = session;
if (!this->is_primary())
{
this->forward_request_to_primary(msg);
return;
}
this->handle_join_or_leave(inner_msg, session, hash);
this->handle_join_or_leave(msg, inner_msg, session, hash);
break;
case PBFT_MMSG_GET_STATE:
this->handle_get_state(inner_msg, std::move(session));
Expand Down Expand Up @@ -409,7 +403,7 @@ pbft::handle_commit(const pbft_msg& msg, const bzn_envelope& original_msg)
}

void
pbft::handle_join_or_leave(const pbft_membership_msg& msg, std::shared_ptr<bzn::session_base> session
pbft::handle_join_or_leave(const bzn_envelope& env, const pbft_membership_msg& msg, std::shared_ptr<bzn::session_base> session
, const std::string& msg_hash)
{
if (msg.has_peer_info())
Expand All @@ -419,36 +413,65 @@ pbft::handle_join_or_leave(const pbft_membership_msg& msg, std::shared_ptr<bzn::
bzn::peer_address_t peer(peer_info.host(), static_cast<uint16_t>(peer_info.port()),
static_cast<uint16_t>(peer_info.http_port()), peer_info.name(), peer_info.uuid());

auto config = std::make_shared<pbft_configuration>(*(this->configurations.current()));
// test for re-join of existing swarm member
if (msg.type() == PBFT_MMSG_JOIN && this->is_peer(peer_info.uuid()))
{
// send response
if (session && session->is_open())
{
pbft_membership_msg response;
response.set_type(PBFT_MMSG_JOIN_RESPONSE);
auto env = this->wrap_message(response);
session->send_message(std::make_shared<std::string>(env.SerializeAsString()), true);
}

return;
}

if (this->new_config_in_flight)
{
if (session)
{
session->close();
}

return;
}

this->sessions_waiting_on_forwarded_requests[msg_hash] = session;
if (!this->is_primary())
{
this->forward_request_to_primary(env);
return;
}

auto config = std::make_shared<pbft_configuration>(*(this->configurations.get(this->configurations.newest_committed())));
if (msg.type() == PBFT_MMSG_JOIN)
{
// see if we can add this peer
if (!config->add_peer(peer))
{
LOG(debug) << "Can't add new peer due to conflict";

if (session && session->is_open())
if (session)
{
pbft_membership_msg response;
response.set_type(PBFT_MMSG_JOIN_RESPONSE);
response.set_result(false);
auto env = this->wrap_message(response);
session->send_message(std::make_shared<std::string>(env.SerializeAsString()), true);
session->close();
}

return;
}
}
else if (msg.type() == PBFT_MMSG_LEAVE)
{
if (!config->remove_peer(peer))
{
// TODO - respond with negative result?
LOG(debug) << "Couldn't remove requested peer";
return;
}
}

this->configurations.add(config);
this->new_config_in_flight = true;
this->broadcast_new_configuration(config, msg.type() == PBFT_MMSG_JOIN ? msg_hash : "");
}
else
Expand All @@ -458,28 +481,12 @@ pbft::handle_join_or_leave(const pbft_membership_msg& msg, std::shared_ptr<bzn::
}

void
pbft::handle_join_response(const pbft_membership_msg& msg)
pbft::handle_join_response(const pbft_membership_msg& /*msg*/)
{
if (this->in_swarm == swarm_status::joining)
{
if (msg.result())
{
this->in_swarm = swarm_status::waiting;
LOG(debug) << "Successfully joined the swarm, waiting for NEW_VIEW message...";
}
else
{
// since we're not persisting configuration to disk, we could be in one of two situations:
// either we've been rejected, or we restarted and the swarm still thinks we're a member.
// for now (since there's no blacklist yet) we'll assume the latter and try to continue.
// there is a potential for problems here if we don't have the latest configuration.
// TODO: change this to abort once we're persisting configuration
LOG(error) << "Request to join swarm rejected - did we exit and restart?";
this->in_swarm = swarm_status::joined;

// LOG(error) << "Request to join swarm rejected. Aborting...";
// throw (std::runtime_error("Request to join swarm rejected."));
}
this->in_swarm = swarm_status::waiting;
LOG(debug) << "Successfully joined the swarm, waiting for NEW_VIEW message...";
}
else
{
Expand Down Expand Up @@ -631,7 +638,7 @@ pbft::do_prepared(const std::shared_ptr<pbft_operation>& op)
pbft_configuration config;
if (config.from_string(op->get_config_request().configuration()))
{
this->configurations.enable(config.get_hash());
this->configurations.set_prepared(config.get_hash());
}
}

Expand Down Expand Up @@ -665,11 +672,13 @@ pbft::do_committed(const std::shared_ptr<pbft_operation>& op)
// commit new configuration if applicable
if (op->has_config_request())
{
// now this config is committed we can accept join requests again
this->new_config_in_flight = false;

pbft_configuration config;
if (config.from_string(op->get_config_request().configuration()))
{
// get rid of all other previous configs, except for currently active one
this->configurations.remove_prior_to(config.get_hash());
this->configurations.set_committed(config.get_hash());

this->new_config_timer->cancel();
this->new_config_timer->expires_from_now(NEW_CONFIG_INTERVAL);
Expand All @@ -685,12 +694,9 @@ pbft::do_committed(const std::shared_ptr<pbft_operation>& op)
{
pbft_membership_msg response;
response.set_type(PBFT_MMSG_JOIN_RESPONSE);
response.set_result(true);
auto env = this->wrap_message(response);
session_it->second->send_message(std::make_shared<std::string>(env.SerializeAsString()), true);

// TODO: start timer for sending viewchange KEP-825

}

this->sessions_waiting_on_forwarded_requests.erase(session_it);
Expand Down Expand Up @@ -744,16 +750,7 @@ pbft::handle_new_config_timeout(const boost::system::error_code& ec)
return;
}

// send viewchange with new config
auto config = this->configurations.newest();
if (config)
{
this->initiate_viewchange();
}
else
{
LOG(error) << "unable to send viewchange because we don't have a valid new configuration";
}
this->initiate_viewchange();
}

bool
Expand Down Expand Up @@ -968,12 +965,9 @@ const peer_address_t&
pbft::select_peer_for_checkpoint(const checkpoint_t& cp)
{
// choose one of the peers who vouch for this checkpoint at random
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<uint32_t> dist(0, this->unstable_checkpoint_proofs[cp].size() - 1);
uint32_t selected = this->generate_random_number(0, this->unstable_checkpoint_proofs[cp].size() - 1);

auto it = this->unstable_checkpoint_proofs[cp].begin();
uint32_t selected = dist(gen);
for (size_t i = 0; i < selected; i++)
{
it++;
Expand Down Expand Up @@ -1346,10 +1340,8 @@ pbft::make_newview(
pbft_msg newview;
newview.set_type(PBFT_MSG_NEWVIEW);
newview.set_view(new_view_index);

// new view always uses our latest prepared configuration
newview.set_config_hash(this->configurations.newest()->get_hash());
newview.set_config(this->configurations.newest()->to_string());
newview.set_config_hash(this->configurations.newest_prepared());
newview.set_config(this->configurations.get(this->configurations.newest_prepared())->to_string());

// V is the set of 2f+1 view change messages
for (const auto &sender_viewchange_envelope: viewchange_envelopes_from_senders)
Expand Down Expand Up @@ -1481,7 +1473,7 @@ pbft::handle_viewchange(const pbft_msg &msg, const bzn_envelope &original_msg)

auto res = this->build_newview(viewchange->first, viewchange_envelopes_from_senders);
this->next_issued_sequence_number = res.second;
this->move_to_new_configuration(this->configurations.newest()->get_hash());
this->move_to_new_configuration(res.first.config_hash(), this->view + 1);
this->broadcast(this->wrap_message(res.first));
}

Expand All @@ -1500,12 +1492,11 @@ pbft::handle_newview(const pbft_msg& msg, const bzn_envelope& original_msg)

auto hash = newconfig->get_hash();
this->configurations.add(newconfig);
this->configurations.enable(hash);

// we need to switch to this configuration now so we have the peer info to validate the message
// TODO: since the config tells us how to validate the NEW_VIEW, but the NEW_VIEW contains the config, we
// can't really trust this. We need to get the config from an external source.
this->move_to_new_configuration(hash);
this->move_to_new_configuration(hash, msg.view());

if (!this->is_valid_newview_message(msg, original_msg))
{
Expand All @@ -1529,14 +1520,15 @@ pbft::handle_newview(const pbft_msg& msg, const bzn_envelope& original_msg)

// validate requested configuration and switch to it
if (!this->is_configuration_acceptable_in_new_view(msg.config_hash()) ||
!this->move_to_new_configuration(msg.config_hash()))
!this->move_to_new_configuration(msg.config_hash(), msg.view()))
{
LOG(debug) << "unable to switch to configuration in new view";
return;
}

this->view = msg.view();
this->view_is_valid = true;
this->new_config_in_flight = false;

// after moving to the new view processes the preprepares
for (size_t i{0}; i < static_cast<size_t>(msg.pre_prepare_messages_size()); ++i)
Expand Down Expand Up @@ -1617,8 +1609,7 @@ pbft::initialize_configuration(const bzn::peers_list_t& peers)
}

this->configurations.add(config);
this->configurations.enable(config->get_hash());
this->configurations.set_current(config->get_hash());
this->configurations.set_current(config->get_hash(), this->view);

return config_good;
}
Expand Down Expand Up @@ -1672,7 +1663,7 @@ pbft::broadcast_new_configuration(pbft_configuration::shared_const_ptr config, c
bool
pbft::is_configuration_acceptable_in_new_view(const hash_t& config_hash)
{
return this->configurations.get(config_hash) != nullptr;
return this->configurations.is_acceptable(config_hash);
}

void
Expand All @@ -1691,14 +1682,15 @@ pbft::handle_config_message(const pbft_msg& msg, const std::shared_ptr<pbft_oper
}

bool
pbft::move_to_new_configuration(const hash_t& config_hash)
pbft::move_to_new_configuration(const hash_t& config_hash, uint64_t view)
{
if (this->configurations.current()->get_hash() == config_hash)
{
return true;
}

assert(this->configurations.get(config_hash) != nullptr);
this->configurations.set_current(config_hash);
return true;
// TODO: garbage collect old configurations (KEP-1006)
return this->configurations.set_current(config_hash, view);
}

bool
Expand Down Expand Up @@ -1748,7 +1740,6 @@ pbft::make_viewchange(
viewchange.set_type(PBFT_MSG_VIEWCHANGE);
viewchange.set_view(new_view);
viewchange.set_sequence(base_sequence_number); // base_sequence_number = n = sequence # of last valid checkpoint
viewchange.set_config_hash(this->configurations.newest()->get_hash());

// C = a set of local 2*f + 1 valid checkpoint messages
for (const auto& msg : stable_checkpoint_proof)
Expand Down Expand Up @@ -1812,11 +1803,7 @@ pbft::join_swarm()
join_msg.set_type(PBFT_MMSG_JOIN);
join_msg.set_allocated_peer_info(info);

// choose one of the peers at random
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<uint32_t> dist(0, this->current_peers().size() - 1);
uint32_t selected = dist(gen);
uint32_t selected = this->generate_random_number(0, this->current_peers().size() - 1);

LOG(info) << "Sending request to join swarm to node " << this->current_peers()[selected].uuid;
auto msg_ptr = std::make_shared<bzn_envelope>(this->wrap_message(join_msg));
Expand All @@ -1826,3 +1813,11 @@ pbft::join_swarm()

// TODO: set timer and retry with different peer if we don't get a response - KEP-980
}

uint32_t pbft::generate_random_number(uint32_t min, uint32_t max)
{
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<uint32_t> dist(min, max);
return dist(gen);
}
7 changes: 5 additions & 2 deletions pbft/pbft.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ namespace bzn
*/
static size_t honest_member_size(size_t swarm_size);

static uint32_t generate_random_number(uint32_t min, uint32_t max);

private:
bool preliminary_filter_msg(const pbft_msg& msg);

Expand All @@ -125,7 +127,7 @@ namespace bzn
void handle_prepare(const pbft_msg& msg, const bzn_envelope& original_msg);
void handle_commit(const pbft_msg& msg, const bzn_envelope& original_msg);
void handle_checkpoint(const pbft_msg& msg, const bzn_envelope& original_msg);
void handle_join_or_leave(const pbft_membership_msg& msg, std::shared_ptr<bzn::session_base> session, const std::string& msg_hash);
void handle_join_or_leave(const bzn_envelope& env, const pbft_membership_msg& msg, std::shared_ptr<bzn::session_base> session, const std::string& msg_hash);
void handle_join_response(const pbft_membership_msg& msg);
void handle_get_state(const pbft_membership_msg& msg, std::shared_ptr<bzn::session_base> session) const;
void handle_set_state(const pbft_membership_msg& msg);
Expand Down Expand Up @@ -177,7 +179,7 @@ namespace bzn
const peer_address_t& get_peer_by_uuid(const std::string& uuid) const;
void broadcast_new_configuration(pbft_configuration::shared_const_ptr config, const std::string& join_request_hash);
bool is_configuration_acceptable_in_new_view(const hash_t& config_hash);
bool move_to_new_configuration(const hash_t& config_hash);
bool move_to_new_configuration(const hash_t& config_hash, uint64_t view);
bool proposed_config_is_acceptable(std::shared_ptr<pbft_configuration> config);

void maybe_record_request(const pbft_msg& msg, const std::shared_ptr<pbft_operation>& op);
Expand Down Expand Up @@ -240,6 +242,7 @@ namespace bzn
std::map<checkpoint_t, std::unordered_map<uuid_t, std::string>> unstable_checkpoint_proofs;

pbft_config_store configurations;
bool new_config_in_flight = false;

std::multimap<timestamp_t, std::pair<bzn::uuid_t, request_hash_t>> recent_requests;

Expand Down

0 comments on commit c35d2d6

Please sign in to comment.