Skip to content
This repository has been archived by the owner on Mar 3, 2020. It is now read-only.

Commit

Permalink
KEP-747: changes from review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
paularchard committed Dec 5, 2018
1 parent b4a0efd commit c5bfe01
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 46 deletions.
98 changes: 55 additions & 43 deletions pbft/pbft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ pbft::handle_membership_message(const bzn_envelope& msg, std::shared_ptr<bzn::se
{
case PBFT_MMSG_JOIN:
case PBFT_MMSG_LEAVE:
if (!this->is_primary())
{
this->forward_request_to_primary(msg, session);
return;
}
this->handle_join_or_leave(inner_msg, session);
break;
case PBFT_MMSG_GET_STATE:
Expand Down Expand Up @@ -271,37 +276,7 @@ pbft::handle_request(const bzn_envelope& request_env, const std::shared_ptr<sess
{
if (!this->is_primary())
{
LOG(info) << "Forwarding request to primary";
this->node->send_message(bzn::make_endpoint(this->get_primary()), std::make_shared<bzn_envelope>(request_env));

const bzn::hash_t req_hash = this->crypto->hash(request_env);

const auto existing_operation = std::find_if(this->operations.begin(), this->operations.end(),
// This search is inefficient for in-memory operations, but the db lookup that will replace it is not
[&](const auto& pair)
{
return std::get<2>(pair.first) == req_hash;
});

// If we already have an operation for this request_hash, attach the session to it
if (existing_operation != this->operations.end())
{
const std::shared_ptr<bzn::pbft_operation>& op = existing_operation->second;
LOG(debug) << "We already had an operation for that request hash; attaching session to it";

if (!op->has_session())
{
op->set_session(session);
}
}
else
{
LOG(debug) << "Saving session because we don't have an operation for this hash: " << bzn::bytes_to_debug_string(req_hash);
this->sessions_waiting_on_forwarded_requests[req_hash] = session;
}

this->failure_detector->request_seen(req_hash);

this->forward_request_to_primary(request_env, session);
return;
}

Expand All @@ -326,6 +301,41 @@ pbft::handle_request(const bzn_envelope& request_env, const std::shared_ptr<sess
this->do_preprepare(op);
}

void
pbft::forward_request_to_primary(const bzn_envelope& request_env, const std::shared_ptr<session_base>& session)
{
LOG(info) << "Forwarding request to primary";
this->node->send_message(bzn::make_endpoint(this->get_primary()), std::make_shared<bzn_envelope>(request_env));

const bzn::hash_t req_hash = this->crypto->hash(request_env);

const auto existing_operation = std::find_if(this->operations.begin(), this->operations.end(),
// This search is inefficient for in-memory operations, but the db lookup that will replace it is not
[&](const auto& pair)
{
return std::get<2>(pair.first) == req_hash;
});

// If we already have an operation for this request_hash, attach the session to it
if (existing_operation != this->operations.end())
{
const std::shared_ptr<bzn::pbft_operation>& op = existing_operation->second;
LOG(debug) << "We already had an operation for that request hash; attaching session to it";

if (!op->has_session())
{
op->set_session(session);
}
}
else
{
LOG(debug) << "Saving session because we don't have an operation for this hash: " << bzn::bytes_to_debug_string(req_hash);
this->sessions_waiting_on_forwarded_requests[req_hash] = session;
}

this->failure_detector->request_seen(req_hash);
}

void
pbft::maybe_record_request(const pbft_msg& msg, const std::shared_ptr<pbft_operation>& op)
{
Expand Down Expand Up @@ -400,13 +410,6 @@ pbft::handle_commit(const pbft_msg& msg, const bzn_envelope& original_msg)
void
pbft::handle_join_or_leave(const pbft_membership_msg& msg, std::shared_ptr<bzn::session_base> session)
{
if (!this->is_primary())
{
LOG(error) << "Ignoring client request because I am not the primary";
// TODO - KEP-327
return;
}

if (msg.has_peer_info())
{
// build a peer_address_t from the message
Expand Down Expand Up @@ -615,6 +618,9 @@ pbft::do_prepared(const std::shared_ptr<pbft_operation>& op)
void
pbft::do_committed(const std::shared_ptr<pbft_operation>& op)
{
LOG(debug) << "Operation " << op->get_sequence() << " is committed-local";
op->advance_operation_stage(pbft_operation_stage::execute);

// commit new configuration if applicable
if (op->has_config_request())
{
Expand Down Expand Up @@ -644,9 +650,6 @@ pbft::do_committed(const std::shared_ptr<pbft_operation>& op)
}
}

LOG(debug) << "Operation " << op->get_sequence() << " is committed-local";
op->advance_operation_stage(pbft_operation_stage::execute);

if (this->audit_enabled)
{
audit_message msg;
Expand Down Expand Up @@ -1270,6 +1273,15 @@ pbft::join_swarm()
join_msg.set_type(PBFT_MMSG_JOIN);
join_msg.set_allocated_peer_info(info);

LOG(info) << "Sending request to join swarm";
this->broadcast(this->wrap_message(join_msg));
// choose one of the peers at random
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<uint32_t> dist(0, this->current_peers().size() - 1);
uint32_t selected = dist(gen);

LOG(info) << "Sending request to join swarm to node " << this->current_peers()[selected].uuid;
auto msg_ptr = std::make_shared<bzn_envelope>(this->wrap_message(join_msg));
this->node->send_message(make_endpoint(this->current_peers()[selected]), msg_ptr);

// TODO: set timer and retry with different peer if we don't get a response
}
1 change: 1 addition & 0 deletions pbft/pbft.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ namespace bzn
std::shared_ptr<pbft_operation> setup_request_operation(const bzn_envelope& msg
, const bzn::hash_t& request_hash
, const std::shared_ptr<session_base>& session = nullptr);
void forward_request_to_primary(const bzn_envelope& request_env, const std::shared_ptr<session_base>& session);

void broadcast(const bzn_envelope& message);

Expand Down
5 changes: 2 additions & 3 deletions pbft/test/pbft_join_leave_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,15 +401,14 @@ namespace bzn
{
this->uuid = "somenode";
EXPECT_CALL(*this->mock_node, send_message(_, ResultOf(test::is_join, Eq(true))))
.Times(Exactly(TEST_PEER_LIST.size()))
.Times(Exactly(1))
.WillOnce(Invoke([&](auto, auto)
{
pbft_membership_msg response;
response.set_type(PBFT_MMSG_JOIN_RESPONSE);
response.set_result(true);
this->handle_membership_message(test::wrap_pbft_membership_msg(response));
}))
.WillRepeatedly(Return());
}));

this->build_pbft();
}
Expand Down

0 comments on commit c5bfe01

Please sign in to comment.