Skip to content
This repository has been archived by the owner on Mar 3, 2020. It is now read-only.

Commit

Permalink
KEP-747: Peer can add itself to peer list (PBFT Edition)
Browse files Browse the repository at this point in the history
  • Loading branch information
paularchard committed Nov 30, 2018
1 parent cf2e8db commit 06d6308
Show file tree
Hide file tree
Showing 7 changed files with 376 additions and 72 deletions.
96 changes: 88 additions & 8 deletions pbft/pbft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ pbft::pbft(
std::shared_ptr<bzn::node_base> node
, std::shared_ptr<bzn::asio::io_context_base> io_context
, const bzn::peers_list_t& peers
, bzn::uuid_t uuid
, std::shared_ptr<bzn::options_base> options
, std::shared_ptr<pbft_service_base> service
, std::shared_ptr<pbft_failure_detector_base> failure_detector
, std::shared_ptr<bzn::crypto_base> crypto
)
: node(std::move(node))
, uuid(std::move(uuid))
, uuid(options->get_uuid())
, options(options)
, service(std::move(service))
, failure_detector(std::move(failure_detector))
, io_context(io_context)
Expand Down Expand Up @@ -113,6 +114,8 @@ pbft::start()
}
}
);

this->join_swarm();
});
}

Expand Down Expand Up @@ -171,14 +174,17 @@ pbft::handle_membership_message(const bzn_envelope& msg, std::shared_ptr<bzn::se
{
case PBFT_MMSG_JOIN:
case PBFT_MMSG_LEAVE:
this->handle_join_or_leave(inner_msg);
this->handle_join_or_leave(inner_msg, session);
break;
case PBFT_MMSG_GET_STATE:
this->handle_get_state(inner_msg, std::move(session));
break;
case PBFT_MMSG_SET_STATE:
this->handle_set_state(inner_msg);
break;
case PBFT_MMSG_JOIN_RESPONSE:
this->handle_join_response(inner_msg);
break;
default:
LOG(error) << "Invalid membership message received "
<< inner_msg.DebugString().substr(0, MAX_MESSAGE_SIZE);
Expand Down Expand Up @@ -392,7 +398,7 @@ pbft::handle_commit(const pbft_msg& msg, const bzn_envelope& original_msg)
}

void
pbft::handle_join_or_leave(const pbft_membership_msg& msg)
pbft::handle_join_or_leave(const pbft_membership_msg& msg, std::shared_ptr<bzn::session_base> session)
{
if (!this->is_primary())
{
Expand All @@ -414,8 +420,15 @@ pbft::handle_join_or_leave(const pbft_membership_msg& msg)
// see if we can add this peer
if (!config->add_peer(peer))
{
// TODO - respond with negative result?
LOG(debug) << "Can't add new peer due to conflict";

if (session && session->is_open())
{
pbft_membership_msg response;
response.set_type(PBFT_MMSG_JOIN_RESPONSE);
response.set_result(false);
session->send_message(std::make_shared<std::string>(response.SerializeAsString()), true);
}
return;
}
}
Expand All @@ -430,14 +443,36 @@ pbft::handle_join_or_leave(const pbft_membership_msg& msg)
}

this->configurations.add(config);
this->broadcast_new_configuration(config);
this->broadcast_new_configuration(config, session);
}
else
{
LOG(debug) << "Malformed join/leave message";
}
}

void
pbft::handle_join_response(const pbft_membership_msg& msg)
{
if (!this->in_swarm)
{
if (msg.result())
{
this->in_swarm = true;
LOG(debug) << "Successfully joined the swarm, waiting for NEW_VIEW message...";
}
else
{
LOG(error) << "Request to join swarm rejected. Aborting...";
throw (std::runtime_error("Request to join swarm rejected."));
}
}
else
{
LOG(error) << "Received JOIN response when not waiting to join swarm";
}
}

void
pbft::handle_get_state(const pbft_membership_msg& msg, std::shared_ptr<bzn::session_base> session) const
{
Expand Down Expand Up @@ -601,6 +636,23 @@ pbft::do_committed(const std::shared_ptr<pbft_operation>& op)
// get rid of all other previous configs, except for currently active one
this->configurations.remove_prior_to(config.get_hash());
}

// send response to new node
auto session = op->session();
if (session && session->is_open())
{
pbft_membership_msg response;
response.set_type(PBFT_MMSG_JOIN_RESPONSE);
response.set_result(true);
session->send_message(std::make_shared<std::string>(response.SerializeAsString()), true);

// TODO: start timer for sending viewchange KEP-825

}
else
{
LOG(debug) << "Unable to send join response, session is not valid";
}
}

LOG(debug) << "Operation " << op->get_sequence() << " is committed-local";
Expand Down Expand Up @@ -1106,14 +1158,14 @@ pbft::get_peer_by_uuid(const std::string& uuid) const
}

void
pbft::broadcast_new_configuration(pbft_configuration::shared_const_ptr config)
pbft::broadcast_new_configuration(pbft_configuration::shared_const_ptr config, std::shared_ptr<bzn::session_base> session)
{
auto cfg_msg = new pbft_config_msg;
cfg_msg->set_configuration(config->to_string());
bzn_envelope req;
req.set_pbft_internal_request(cfg_msg->SerializeAsString());

auto op = this->setup_request_operation(req, this->crypto->hash(req));
auto op = this->setup_request_operation(req, this->crypto->hash(req), session);
this->do_preprepare(op);
}

Expand Down Expand Up @@ -1203,3 +1255,31 @@ pbft::honest_majority_size(size_t swarm_size)
{
return pbft::faulty_nodes_bound(swarm_size) * 2 + 1;
}

void
pbft::join_swarm()
{
// are we already in the peers list?
// TODO - replace this with call to is_peer()
for (auto const& peer : this->current_peers())
{
if (peer.uuid == this->uuid)
{
this->in_swarm = true;
return;
}
}

auto info = new pbft_peer_info;
info->set_host(this->options->get_listener().address().to_string());
info->set_port(this->options->get_listener().port());
info->set_http_port(options->get_http_port());
info->set_uuid(this->uuid);

pbft_membership_msg join_msg;
join_msg.set_type(PBFT_MMSG_JOIN);
join_msg.set_allocated_peer_info(info);

LOG(info) << "Sending request to join swarm";
this->broadcast(this->wrap_message(join_msg));
}
20 changes: 9 additions & 11 deletions pbft/pbft.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <proto/audit.pb.h>
#include <mutex>
#include <gtest/gtest_prod.h>
#include <options/options_base.hpp>

namespace
{
Expand All @@ -48,7 +49,7 @@ namespace bzn
std::shared_ptr<bzn::node_base> node
, std::shared_ptr<bzn::asio::io_context_base> io_context
, const bzn::peers_list_t& peers
, bzn::uuid_t uuid
, std::shared_ptr<bzn::options_base> options
, std::shared_ptr<pbft_service_base> service
, std::shared_ptr<pbft_failure_detector_base> failure_detector
, std::shared_ptr<bzn::crypto_base> crypto
Expand Down Expand Up @@ -115,7 +116,8 @@ namespace bzn
void handle_prepare(const pbft_msg& msg, const bzn_envelope& original_msg);
void handle_commit(const pbft_msg& msg, const bzn_envelope& original_msg);
void handle_checkpoint(const pbft_msg& msg, const bzn_envelope& original_msg);
void handle_join_or_leave(const pbft_membership_msg& msg);
void handle_join_or_leave(const pbft_membership_msg& msg, std::shared_ptr<bzn::session_base> session);
void handle_join_response(const pbft_membership_msg& msg);
void handle_get_state(const pbft_membership_msg& msg, std::shared_ptr<bzn::session_base> session) const;
void handle_set_state(const pbft_membership_msg& msg);
void handle_config_message(const pbft_msg& msg, const std::shared_ptr<pbft_operation>& op);
Expand Down Expand Up @@ -163,7 +165,7 @@ namespace bzn
std::shared_ptr<const std::vector<bzn::peer_address_t>> current_peers_ptr() const;
const std::vector<bzn::peer_address_t>& current_peers() const;
const peer_address_t& get_peer_by_uuid(const std::string& uuid) const;
void broadcast_new_configuration(pbft_configuration::shared_const_ptr config);
void broadcast_new_configuration(pbft_configuration::shared_const_ptr config, std::shared_ptr<bzn::session_base> session);
bool is_configuration_acceptable_in_new_view(hash_t config_hash);
bool move_to_new_configuration(hash_t config_hash);
bool proposed_config_is_acceptable(std::shared_ptr<pbft_configuration> config);
Expand All @@ -174,6 +176,7 @@ namespace bzn
bool already_seen_request(const bzn_envelope& msg, const request_hash_t& hash) const;
void saw_request(const bzn_envelope& msg, const request_hash_t& hash);

void join_swarm();

// Using 1 as first value here to distinguish from default value of 0 in protobuf
uint64_t view = 1;
Expand All @@ -185,6 +188,7 @@ namespace bzn
std::shared_ptr<bzn::node_base> node;

const bzn::uuid_t uuid;
std::shared_ptr<bzn::options_base> options;
std::shared_ptr<pbft_service_base> service;

std::shared_ptr<pbft_failure_detector_base> failure_detector;
Expand All @@ -200,6 +204,7 @@ namespace bzn
std::unique_ptr<bzn::asio::steady_timer_base> audit_heartbeat_timer;

bool audit_enabled = true;
bool in_swarm = false;

checkpoint_t stable_checkpoint{0, INITIAL_CHECKPOINT_HASH};
std::unordered_map<uuid_t, std::string> stable_checkpoint_proof;
Expand All @@ -210,15 +215,8 @@ namespace bzn

std::multimap<timestamp_t, std::pair<bzn::uuid_t, request_hash_t>> recent_requests;

FRIEND_TEST(pbft_test, join_request_generates_new_config_preprepare);
FRIEND_TEST(pbft_test, valid_leave_request_test);
FRIEND_TEST(pbft_test, invalid_leave_request_test);
FRIEND_TEST(pbft_test, test_new_config_preprepare_handling);
FRIEND_TEST(pbft_test, test_new_config_prepare_handling);
FRIEND_TEST(pbft_test, test_new_config_commit_handling);
FRIEND_TEST(pbft_test, test_move_to_new_config);

friend class pbft_proto_test;
friend class pbft_join_leave_test;

std::shared_ptr<crypto_base> crypto;

Expand Down

0 comments on commit 06d6308

Please sign in to comment.