Skip to content
This repository has been archived by the owner on Mar 3, 2020. It is now read-only.

Commit

Permalink
KEP-937: status request should keep session open
Browse files Browse the repository at this point in the history
  • Loading branch information
isabelsavannah authored and amastracci committed Dec 21, 2018
1 parent 9df19c0 commit 201b5ba
Show file tree
Hide file tree
Showing 14 changed files with 86 additions and 84 deletions.
8 changes: 4 additions & 4 deletions mocks/mock_node_base.hpp
Expand Up @@ -32,10 +32,10 @@ class Mocknode_base : public node_base {
void());
MOCK_METHOD2(send_message_json,
void(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn::json_message> msg));
MOCK_METHOD2(send_message,
void(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn_envelope> msg));
MOCK_METHOD2(send_message_str,
void(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn::encoded_message> msg));
MOCK_METHOD3(send_message,
void(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn_envelope> msg, bool close_session));
MOCK_METHOD3(send_message_str,
void(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn::encoded_message> msg, bool close_session));
};

} // namespace bzn
16 changes: 8 additions & 8 deletions node/node.cpp
Expand Up @@ -165,12 +165,12 @@ node::priv_protobuf_handler(const bzn_envelope& msg, std::shared_ptr<bzn::sessio
}

void
node::send_message_str(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn::encoded_message> msg)
node::send_message_str(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn::encoded_message> msg, bool close_session)
{
if (this->chaos->is_message_delayed())
{
const boost::asio::ip::tcp::endpoint ep_copy = ep;
this->chaos->reschedule_message(std::bind(&node::send_message_str, shared_from_this(), std::move(ep_copy), std::move(msg)));
this->chaos->reschedule_message(std::bind(&node::send_message_str, shared_from_this(), std::move(ep_copy), std::move(msg), close_session));
return;
}

Expand All @@ -182,7 +182,7 @@ node::send_message_str(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr
std::shared_ptr<bzn::asio::tcp_socket_base> socket = this->io_context->make_unique_tcp_socket();

socket->async_connect(ep,
[self = shared_from_this(), socket, ep, msg](const boost::system::error_code& ec)
[self = shared_from_this(), socket, ep, msg, close_session](const boost::system::error_code& ec)
{
if (ec)
{
Expand All @@ -195,7 +195,7 @@ node::send_message_str(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr
std::shared_ptr<bzn::beast::websocket_stream_base> ws = self->websocket->make_unique_websocket_stream(socket->get_tcp_socket());

ws->async_handshake(ep.address().to_string(), "/",
[self, ws, msg](const boost::system::error_code& ec)
[self, ws, msg, close_session](const boost::system::error_code& ec)
{
if (ec)
{
Expand All @@ -209,19 +209,19 @@ node::send_message_str(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr
std::bind(&node::priv_protobuf_handler, self, std::placeholders::_1, std::placeholders::_2));

// send the message requested...
session->send_message(msg, true);
session->send_message(msg, close_session);
});
});
}

void
node::send_message_json(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn::json_message> msg)
{
this->send_message_str(ep, std::make_shared<bzn::encoded_message>(msg->toStyledString()));
this->send_message_str(ep, std::make_shared<bzn::encoded_message>(msg->toStyledString()), true);
}

void
node::send_message(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn_envelope> msg)
node::send_message(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn_envelope> msg, bool close_session)
{
if(msg->sender().empty())
{
Expand All @@ -233,5 +233,5 @@ node::send_message(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn
this->crypto->sign(*msg);
}

this->send_message_str(ep, std::make_shared<std::string>(msg->SerializeAsString()));
this->send_message_str(ep, std::make_shared<std::string>(msg->SerializeAsString()), close_session);
}
4 changes: 2 additions & 2 deletions node/node.hpp
Expand Up @@ -42,9 +42,9 @@ namespace bzn

void send_message_json(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn::json_message> msg) override;

void send_message(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn_envelope> msg) override;
void send_message(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn_envelope> msg, bool close_session) override;

void send_message_str(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn::encoded_message> msg) override;
void send_message_str(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn::encoded_message> msg, bool close_session) override;

private:
FRIEND_TEST(node, test_that_registered_message_handler_is_invoked);
Expand Down
6 changes: 4 additions & 2 deletions node/node_base.hpp
Expand Up @@ -60,15 +60,17 @@ namespace bzn
* signature fields, if not already set.
* @param ep host to send the message to
* @param msg message to send
* @param close_session don't expect a response on this session
*/
virtual void send_message(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn_envelope> msg) = 0;
virtual void send_message(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn_envelope> msg, bool close_session) = 0;

/**
* Convenience method to connect and send a message to a node. Will set sender and signature fields as appropriate.
* @param ep host to send the message to
* @param msg message to send
* @param close_session don't expect a response on this session
*/
virtual void send_message_str(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn::encoded_message> msg) = 0;
virtual void send_message_str(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn::encoded_message> msg, bool close_session) = 0;
};

} // bzn
8 changes: 4 additions & 4 deletions pbft/pbft.cpp
Expand Up @@ -329,7 +329,7 @@ void
pbft::forward_request_to_primary(const bzn_envelope& request_env)
{
LOG(info) << "Forwarding request to primary";
this->node->send_message(bzn::make_endpoint(this->get_primary()), std::make_shared<bzn_envelope>(request_env));
this->node->send_message(bzn::make_endpoint(this->get_primary()), std::make_shared<bzn_envelope>(request_env), true);

const bzn::hash_t req_hash = this->crypto->hash(request_env);

Expand Down Expand Up @@ -561,7 +561,7 @@ pbft::broadcast(const bzn_envelope& msg)

for (const auto& peer : this->current_peers())
{
this->node->send_message(make_endpoint(peer), msg_ptr);
this->node->send_message(make_endpoint(peer), msg_ptr, true);
}
}

Expand Down Expand Up @@ -908,7 +908,7 @@ pbft::request_checkpoint_state(const checkpoint_t& cp)
// TODO: fix the race condition here where receiving node may not have had time to
// stabilize its checkpoint yet.
auto msg_ptr = std::make_shared<bzn_envelope>(this->wrap_message(msg));
this->node->send_message(make_endpoint(selected), msg_ptr);
this->node->send_message(make_endpoint(selected), msg_ptr, false);
}

const peer_address_t&
Expand Down Expand Up @@ -1722,7 +1722,7 @@ pbft::join_swarm()

LOG(info) << "Sending request to join swarm to node " << this->current_peers()[selected].uuid;
auto msg_ptr = std::make_shared<bzn_envelope>(this->wrap_message(join_msg));
this->node->send_message(make_endpoint(this->current_peers()[selected]), msg_ptr);
this->node->send_message(make_endpoint(this->current_peers()[selected]), msg_ptr, false);

// TODO: set timer and retry with different peer if we don't get a response
#else
Expand Down
8 changes: 4 additions & 4 deletions pbft/test/pbft_audit_test.cpp
Expand Up @@ -17,9 +17,9 @@ namespace bzn::test
{
TEST_F(pbft_test, test_local_commit_sends_audit_messages)
{
EXPECT_CALL(*mock_node, send_message(_, ResultOf(is_audit, Eq(false))))
EXPECT_CALL(*mock_node, send_message(_, ResultOf(is_audit, Eq(false)), _))
.Times(AnyNumber());
EXPECT_CALL(*mock_node, send_message(_, ResultOf(is_audit, Eq(true))))
EXPECT_CALL(*mock_node, send_message(_, ResultOf(is_audit, Eq(true)), _))
.Times(Exactly(TEST_PEER_LIST.size()));

this->build_pbft();
Expand Down Expand Up @@ -47,7 +47,7 @@ namespace bzn::test

TEST_F(pbft_test, primary_sends_primary_status)
{
EXPECT_CALL(*mock_node, send_message(_, ResultOf(is_audit, Eq(true))))
EXPECT_CALL(*mock_node, send_message(_, ResultOf(is_audit, Eq(true)), _))
.Times(Exactly(TEST_PEER_LIST.size()));

this->build_pbft();
Expand All @@ -59,7 +59,7 @@ namespace bzn::test

TEST_F(pbft_test, nonprimary_does_not_send_primary_status)
{
EXPECT_CALL(*mock_node, send_message(_, ResultOf(is_audit, Eq(true))))
EXPECT_CALL(*mock_node, send_message(_, ResultOf(is_audit, Eq(true)), _))
.Times(Exactly(0));

this->uuid = SECOND_NODE_UUID;
Expand Down
10 changes: 5 additions & 5 deletions pbft/test/pbft_catchup_test.cpp
Expand Up @@ -108,7 +108,7 @@ namespace bzn
this->build_pbft();

// node shouldn't be sending any checkpoint messages right now
EXPECT_CALL(*mock_node, send_message(_, ResultOf(is_checkpoint, Eq(true))))
EXPECT_CALL(*mock_node, send_message(_, ResultOf(is_checkpoint, Eq(true)), _))
.Times((Exactly(0)));

auto nodes = TEST_PEER_LIST.begin();
Expand All @@ -121,7 +121,7 @@ namespace bzn

// one more checkpoint message and the node should request state from a random node
auto primary = this->pbft->get_primary();
EXPECT_CALL(*mock_node, send_message(_, ResultOf(is_get_state, Eq(true))))
EXPECT_CALL(*mock_node, send_message(_, ResultOf(is_get_state, Eq(true)), _))
.Times((Exactly(1)));

bzn::peer_address_t node(*nodes++);
Expand All @@ -140,7 +140,7 @@ namespace bzn
}

// since the node has this checkpoint it should NOT request state for it
EXPECT_CALL(*mock_node, send_message(_, ResultOf(is_get_state, Eq(true))))
EXPECT_CALL(*mock_node, send_message(_, ResultOf(is_get_state, Eq(true)), _))
.Times((Exactly(0)));
stabilize_checkpoint(100);
}
Expand Down Expand Up @@ -171,7 +171,7 @@ namespace bzn

// get the node to request state
auto primary = this->pbft->get_primary();
EXPECT_CALL(*mock_node, send_message(_, ResultOf(is_get_state, Eq(true))))
EXPECT_CALL(*mock_node, send_message(_, ResultOf(is_get_state, Eq(true)), _))
.Times((Exactly(1)));

auto nodes = TEST_PEER_LIST.begin();
Expand Down Expand Up @@ -204,7 +204,7 @@ namespace bzn

// get the node to request state
auto primary = this->pbft->get_primary();
EXPECT_CALL(*mock_node, send_message(_, ResultOf(is_get_state, Eq(true))))
EXPECT_CALL(*mock_node, send_message(_, ResultOf(is_get_state, Eq(true)), _))
.Times((Exactly(1)));

auto nodes = TEST_PEER_LIST.begin();
Expand Down
2 changes: 1 addition & 1 deletion pbft/test/pbft_checkpoint_tests.cpp
Expand Up @@ -47,7 +47,7 @@ namespace bzn::test

TEST_F(pbft_checkpoint_test, test_checkpoint_messages_sent_on_execute)
{
EXPECT_CALL(*mock_node, send_message(_, ResultOf(is_checkpoint, Eq(true))))
EXPECT_CALL(*mock_node, send_message(_, ResultOf(is_checkpoint, Eq(true)), _))
.Times(Exactly(TEST_PEER_LIST.size()));

this->build_pbft();
Expand Down
26 changes: 13 additions & 13 deletions pbft/test/pbft_join_leave_test.cpp
Expand Up @@ -111,7 +111,7 @@ namespace bzn
{
EXPECT_CALL(*(mock_node),
send_message(bzn::make_endpoint(p),
AllOf(message_has_correct_req_hash(expect_hash), message_has_correct_pbft_type(PBFT_MSG_PREPARE))))
AllOf(message_has_correct_req_hash(expect_hash), message_has_correct_pbft_type(PBFT_MSG_PREPARE)), _))
.Times(Exactly(1));
}

Expand Down Expand Up @@ -198,7 +198,7 @@ namespace bzn
EXPECT_CALL(*(this->mock_node),
send_message(bzn::make_endpoint(p),
AllOf(message_has_req_with_correct_type(bzn_envelope::kPbftInternalRequest),
message_has_correct_pbft_type(PBFT_MSG_PREPREPARE))))
message_has_correct_pbft_type(PBFT_MSG_PREPREPARE)), _))
.Times(Exactly(1));
}

Expand Down Expand Up @@ -227,7 +227,7 @@ namespace bzn
EXPECT_CALL(*(this->mock_node),
send_message(bzn::make_endpoint(p),
AllOf(message_has_req_with_correct_type(bzn_envelope::kPbftInternalRequest),
message_has_correct_pbft_type(PBFT_MSG_PREPREPARE))))
message_has_correct_pbft_type(PBFT_MSG_PREPREPARE)), _))
.Times(Exactly(1));
}

Expand Down Expand Up @@ -301,7 +301,7 @@ namespace bzn
EXPECT_CALL(*(mock_node),
send_message(bzn::make_endpoint(p),
AllOf(message_has_correct_req_hash(msg.request_hash()),
message_has_correct_pbft_type(PBFT_MSG_COMMIT))))
message_has_correct_pbft_type(PBFT_MSG_COMMIT)), _))
.Times(Exactly(1));
}

Expand Down Expand Up @@ -340,7 +340,7 @@ namespace bzn
EXPECT_CALL(*(mock_node),
send_message(bzn::make_endpoint(p),
AllOf(message_has_correct_req_hash(msg.request_hash()),
message_has_correct_pbft_type(PBFT_MSG_COMMIT))))
message_has_correct_pbft_type(PBFT_MSG_COMMIT)), _))
.Times(Exactly(1));
}

Expand Down Expand Up @@ -394,9 +394,9 @@ namespace bzn
TEST_F(pbft_join_leave_test, DISABLED_node_not_in_swarm_asks_to_join)
{
this->uuid = "somenode";
EXPECT_CALL(*this->mock_node, send_message(_, ResultOf(test::is_join, Eq(true))))
EXPECT_CALL(*this->mock_node, send_message(_, ResultOf(test::is_join, Eq(true)), _))
.Times(Exactly(1))
.WillOnce(Invoke([&](auto, auto)
.WillOnce(Invoke([&](auto, auto, bool /*close_session*/)
{
pbft_membership_msg response;
response.set_type(PBFT_MMSG_JOIN_RESPONSE);
Expand All @@ -409,7 +409,7 @@ namespace bzn

TEST_F(pbft_join_leave_test, node_in_swarm_doesnt_ask_to_join)
{
EXPECT_CALL(*this->mock_node, send_message(_, ResultOf(test::is_join, Eq(true))))
EXPECT_CALL(*this->mock_node, send_message(_, ResultOf(test::is_join, Eq(true)), _))
.Times(Exactly(0));
this->build_pbft();
}
Expand All @@ -422,17 +422,17 @@ namespace bzn
for (auto const &p : TEST_PEER_LIST)
{
EXPECT_CALL(*(this->mock_node),
send_message(bzn::make_endpoint(p), ResultOf(test::is_preprepare, Eq(true))))
send_message(bzn::make_endpoint(p), ResultOf(test::is_preprepare, Eq(true)), _))
.Times(Exactly(1))
.WillOnce(Invoke([&](auto, auto &envelope)
.WillOnce(Invoke([&](auto, auto &envelope, bool /*close_session*/)
{
pbft_msg msg;
EXPECT_TRUE(msg.ParseFromString(envelope->pbft()));

if (p.uuid == TEST_NODE_UUID)
{
EXPECT_CALL(*(mock_node),
send_message(_, ResultOf(test::is_prepare, Eq(true))))
send_message(_, ResultOf(test::is_prepare, Eq(true)), _))
.Times(Exactly(TEST_PEER_LIST.size()));

// reflect the pre-prepare back
Expand All @@ -455,9 +455,9 @@ namespace bzn
for (auto const &p : TEST_PEER_LIST)
{
EXPECT_CALL(*(this->mock_node),
send_message(bzn::make_endpoint(p), ResultOf(test::is_commit, Eq(true))))
send_message(bzn::make_endpoint(p), ResultOf(test::is_commit, Eq(true)), _))
.Times(Exactly(1))
.WillOnce(Invoke([&](auto, auto &wmsg)
.WillOnce(Invoke([&](auto, auto &wmsg, bool /*close_session*/)
{
pbft_msg msg;
EXPECT_TRUE(msg.ParseFromString(wmsg->pbft()));
Expand Down
4 changes: 2 additions & 2 deletions pbft/test/pbft_newview_test.cpp
Expand Up @@ -81,8 +81,8 @@ namespace bzn
this->run_transaction_through_primary_times(2, current_sequence);

bzn_envelope viewchange_envelope;
EXPECT_CALL(*mock_node, send_message(_, ResultOf(test::is_viewchange, Eq(true))))
.WillRepeatedly(Invoke([&](const auto & /*endpoint*/, const auto& viewchange_env) {viewchange_envelope = *viewchange_env;}));
EXPECT_CALL(*mock_node, send_message(_, ResultOf(test::is_viewchange, Eq(true)), _))
.WillRepeatedly(Invoke([&](const auto & /*endpoint*/, const auto& viewchange_env, bool /*close_session*/) {viewchange_envelope = *viewchange_env;}));
this->pbft->handle_failure();

pbft_msg viewchange;
Expand Down
10 changes: 5 additions & 5 deletions pbft/test/pbft_proto_test.cpp
Expand Up @@ -31,9 +31,9 @@ namespace bzn
{
// after request is sent, SUT will send out pre-prepares to all nodes
auto operation = std::shared_ptr<pbft_operation>();
EXPECT_CALL(*this->mock_node, send_message(_, ResultOf(test::is_preprepare, Eq(true))))
EXPECT_CALL(*this->mock_node, send_message(_, ResultOf(test::is_preprepare, Eq(true)), _))
.Times(Exactly(TEST_PEER_LIST.size()))
.WillRepeatedly(Invoke([&](auto, auto wmsg)
.WillRepeatedly(Invoke([&](auto, auto wmsg, bool /*close_session*/)
{
pbft_msg msg;
if (msg.ParseFromString(wmsg->pbft()))
Expand Down Expand Up @@ -68,7 +68,7 @@ namespace bzn
pbft_proto_test::send_preprepare(uint64_t sequence, const bzn_envelope& request)
{
// after preprepare is sent, SUT will send out prepares to all nodes
EXPECT_CALL(*this->mock_node, send_message(_, ResultOf(test::is_prepare, Eq(true))))
EXPECT_CALL(*this->mock_node, send_message(_, ResultOf(test::is_prepare, Eq(true)), _))
.Times(Exactly(TEST_PEER_LIST.size()));

auto peer = *(TEST_PEER_LIST.begin());
Expand All @@ -88,7 +88,7 @@ namespace bzn
pbft_proto_test::send_prepares(uint64_t sequence, const bzn::hash_t& request_hash)
{
// after prepares are sent, SUT will send out commits to all nodes
EXPECT_CALL(*this->mock_node, send_message(_, ResultOf(test::is_commit, Eq(true))))
EXPECT_CALL(*this->mock_node, send_message(_, ResultOf(test::is_commit, Eq(true)), _))
.Times(Exactly(TEST_PEER_LIST.size()));

for (const auto& peer : TEST_PEER_LIST)
Expand Down Expand Up @@ -139,7 +139,7 @@ namespace bzn
}));

// after enough commits are sent, SUT will send out checkpoint message to all nodes
EXPECT_CALL(*this->mock_node, send_message(_, ResultOf(test::is_checkpoint, Eq(true))))
EXPECT_CALL(*this->mock_node, send_message(_, ResultOf(test::is_checkpoint, Eq(true)), _))
.Times(Exactly(TEST_PEER_LIST.size()));
}

Expand Down

0 comments on commit 201b5ba

Please sign in to comment.