Skip to content
This repository has been archived by the owner on Mar 3, 2020. It is now read-only.

Commit

Permalink
KEP-1092: Integration fixes for signature collation
Browse files Browse the repository at this point in the history
changed point of contact field to bytes
removed placeholder uuids from crud
moved message signing into session, changed method names to clarify
intent
  • Loading branch information
isabelsavannah committed Feb 12, 2019
1 parent 015aa96 commit d3a4c4d
Show file tree
Hide file tree
Showing 25 changed files with 328 additions and 511 deletions.
6 changes: 2 additions & 4 deletions crud/crud.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,10 @@ crud::send_response(const database_msg& request, const bzn::storage_result resul

bzn_envelope env;
env.set_database_response(response.SerializeAsString());
env.set_sender("placeholder for daemon's uuid"); // TODO
// TODO: crypto

if (session)
{
session->send_message(std::make_shared<std::string>(env.SerializeAsString()));
session->send_signed_message(std::make_shared<bzn_envelope>(env));
}
else
{
Expand All @@ -116,7 +114,7 @@ crud::send_response(const database_msg& request, const bzn::storage_result resul
{
try
{
this->node->send_message(response.header().point_of_contact(), std::make_shared<bzn_envelope>(env));
this->node->send_signed_message(response.header().point_of_contact(), std::make_shared<bzn_envelope>(env));
}
catch(const std::runtime_error& err)
{
Expand Down
588 changes: 198 additions & 390 deletions crud/test/crud_test.cpp

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions mocks/mock_node_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ class Mocknode_base : public node_base {
bool(const bzn_envelope::PayloadCase msg_type, bzn::protobuf_handler message_handler));
MOCK_METHOD1(start,
void(std::shared_ptr<bzn::pbft_base> pbft));
MOCK_METHOD2(send_message,
MOCK_METHOD2(send_signed_message,
void(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn_envelope> msg));
MOCK_METHOD2(send_message,
MOCK_METHOD2(send_signed_message,
void(const bzn::uuid_t& uuid, std::shared_ptr<bzn_envelope> msg));
MOCK_METHOD2(send_message_str,
void(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn::encoded_message> msg));
Expand Down
2 changes: 2 additions & 0 deletions mocks/mock_session_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ namespace bzn {
public:
MOCK_METHOD1(send_message,
void(std::shared_ptr<bzn::encoded_message> msg));
MOCK_METHOD1(send_signed_message,
void(std::shared_ptr<bzn_envelope> msg));
MOCK_METHOD0(get_session_id,
bzn::session_id());
MOCK_METHOD0(close,
Expand Down
63 changes: 28 additions & 35 deletions node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ node::do_accept()
, self->chaos
, std::bind(&node::priv_protobuf_handler, self, std::placeholders::_1, std::placeholders::_2)
, self->options->get_ws_idle_timeout()
, [](){});
, [](){}
, self->crypto);

session->accept(std::move(ws));

Expand Down Expand Up @@ -146,48 +147,40 @@ node::priv_session_shutdown_handler(const ep_key_t& ep_key)
this->sessions.erase(ep_key);
}

void
node::send_message_str(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn::encoded_message> msg)
std::shared_ptr<bzn::session_base>
node::find_session(const boost::asio::ip::tcp::endpoint& ep)
{
std::shared_ptr<bzn::session_base> session;
std::lock_guard<std::mutex> lock(this->session_map_mutex);
auto key = this->key_from_ep(ep);

if (this->sessions.find(key) == this->sessions.end() || !(session = this->sessions.at(key).lock()) || !session->is_open())
{
std::lock_guard<std::mutex> lock(this->session_map_mutex);
auto key = this->key_from_ep(ep);

if (this->sessions.find(key) == this->sessions.end() || !(session = this->sessions.at(key).lock()) || !session->is_open())
{
session = std::make_shared<bzn::session>(
this->io_context
, ++this->session_id_counter
, ep
, this->chaos
, std::bind(&node::priv_protobuf_handler, shared_from_this(), std::placeholders::_1, std::placeholders::_2)
, this->options->get_ws_idle_timeout()
, std::bind(&node::priv_session_shutdown_handler, shared_from_this(), key));
session->open(this->websocket);
sessions.insert_or_assign(key, session);
}
// else session was assigned by the condition
session = std::make_shared<bzn::session>(
this->io_context
, ++this->session_id_counter
, ep
, this->chaos
, std::bind(&node::priv_protobuf_handler, shared_from_this(), std::placeholders::_1, std::placeholders::_2)
, this->options->get_ws_idle_timeout()
, std::bind(&node::priv_session_shutdown_handler, shared_from_this(), key)
, this->crypto);
session->open(this->websocket);
sessions.insert_or_assign(key, session);
}

session->send_message(msg);
return session;
}

void
node::send_message(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn_envelope> msg)
node::send_message_str(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn::encoded_message> msg)
{
if (msg->sender().empty())
{
msg->set_sender(this->options->get_uuid());
}

if (msg->signature().empty())
{
this->crypto->sign(*msg);
}
this->find_session(ep)->send_message(msg);
}

this->send_message_str(ep, std::make_shared<std::string>(msg->SerializeAsString()));
void
node::send_signed_message(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn_envelope> msg)
{
this->find_session(ep)->send_signed_message(msg);
}

ep_key_t
Expand All @@ -197,13 +190,13 @@ node::key_from_ep(const boost::asio::ip::tcp::endpoint& ep)
}

void
node::send_message(const bzn::uuid_t &uuid, std::shared_ptr<bzn_envelope> msg)
node::send_signed_message(const bzn::uuid_t& uuid, std::shared_ptr<bzn_envelope> msg)
{
try
{
auto point_of_contact_address = this->pbft->get_peer_by_uuid(uuid);
boost::asio::ip::tcp::endpoint endpoint{bzn::make_endpoint(point_of_contact_address)};
this->send_message(endpoint, msg);
this->send_signed_message(endpoint, msg);
}
catch (const std::runtime_error& err)
{
Expand Down
6 changes: 3 additions & 3 deletions node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,24 @@ namespace bzn

void start(std::shared_ptr<bzn::pbft_base> pbft) override;

void send_message(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn_envelope> msg) override;
void send_signed_message(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn_envelope> msg) override;

void send_message_str(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn::encoded_message> msg) override;

void send_message(const bzn::uuid_t &uuid, std::shared_ptr<bzn_envelope> msg) override;
void send_signed_message(const bzn::uuid_t& uuid, std::shared_ptr<bzn_envelope> msg) override;

private:
FRIEND_TEST(node, test_that_registered_message_handler_is_invoked);
FRIEND_TEST(node, test_that_wrongly_signed_messages_are_dropped);

void do_accept();


void priv_protobuf_handler(const bzn_envelope& msg, std::shared_ptr<bzn::session_base> session);
void priv_session_shutdown_handler(const ep_key_t& ep_key);

std::shared_ptr<bzn::pbft_base> pbft;

std::shared_ptr<bzn::session_base> find_session(const boost::asio::ip::tcp::endpoint& ep);
std::shared_ptr<bzn::session_base> open_session(const boost::asio::ip::tcp::endpoint& ep);

ep_key_t key_from_ep(const boost::asio::ip::tcp::endpoint& ep);
Expand Down
4 changes: 2 additions & 2 deletions node/node_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ namespace bzn
* @param ep host to send the message to
* @param msg message to send
*/
virtual void send_message(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn_envelope> msg) = 0;
virtual void send_signed_message(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn_envelope> msg) = 0;

/**
* Convenience method to connect and send a message to a node. Will set sender and signature fields as appropriate.
Expand All @@ -57,7 +57,7 @@ namespace bzn
*/
virtual void send_message_str(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn::encoded_message> msg) = 0;

virtual void send_message(const bzn::uuid_t &uuid, std::shared_ptr<bzn_envelope> msg) = 0;
virtual void send_signed_message(const bzn::uuid_t& uuid, std::shared_ptr<bzn_envelope> msg) = 0;
};

} // bzn
15 changes: 14 additions & 1 deletion node/session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ session::session(
std::shared_ptr<bzn::chaos_base> chaos,
bzn::protobuf_handler proto_handler,
std::chrono::milliseconds ws_idle_timeout,
bzn::session_shutdown_handler shutdown_handler
bzn::session_shutdown_handler shutdown_handler,
std::shared_ptr<bzn::crypto_base> crypto
)
: session_id(session_id)
, ep(std::move(ep))
Expand All @@ -37,6 +38,7 @@ session::session(
, idle_timer(this->io_context->make_unique_steady_timer())
, ws_idle_timeout(std::move(ws_idle_timeout))
, write_buffer(nullptr, 0)
, crypto(std::move(crypto))
{
LOG(debug) << "creating session " << std::to_string(session_id);
}
Expand Down Expand Up @@ -230,6 +232,17 @@ session::do_write()
});
}

void
session::send_signed_message(std::shared_ptr<bzn_envelope> msg)
{
if (msg->signature().empty())
{
this->crypto->sign(*msg);
}

this->send_message(std::make_shared<std::string>(msg->SerializeAsString()));
}

void
session::send_message(std::shared_ptr<bzn::encoded_message> msg)
{
Expand Down
6 changes: 5 additions & 1 deletion node/session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ namespace bzn
std::shared_ptr<bzn::chaos_base> chaos,
bzn::protobuf_handler proto_handler,
std::chrono::milliseconds ws_idle_timeout,
bzn::session_shutdown_handler shutdown_handler);
bzn::session_shutdown_handler shutdown_handler,
std::shared_ptr<bzn::crypto_base> crypto);

~session();

void send_message(std::shared_ptr<bzn::encoded_message> msg) override;
void send_signed_message(std::shared_ptr<bzn_envelope> msg) override;

void close() override;

Expand Down Expand Up @@ -84,6 +86,8 @@ namespace bzn
std::atomic<bool> activity = false;

boost::asio::mutable_buffers_1 write_buffer;

std::shared_ptr<bzn::crypto_base> crypto;
};

} // blz
1 change: 1 addition & 0 deletions node/session_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ namespace bzn
* @param msg message
*/
virtual void send_message(std::shared_ptr<bzn::encoded_message> msg) = 0;
virtual void send_signed_message(std::shared_ptr<bzn_envelope> msg) = 0;

/**
* Perform an orderly shutdown of the websocket.
Expand Down
4 changes: 2 additions & 2 deletions node/test/session_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class session_test2 : public Test

session_test2()
{
session = std::make_shared<bzn::session>(mock.io_context, 0, TEST_ENDPOINT, this->mock_chaos, [&](auto, auto){this->handler_called++;}, TEST_TIMEOUT, [](){});
session = std::make_shared<bzn::session>(mock.io_context, 0, TEST_ENDPOINT, this->mock_chaos, [&](auto, auto){this->handler_called++;}, TEST_TIMEOUT, [](){}, nullptr);
}

void yield()
Expand All @@ -93,7 +93,7 @@ namespace bzn

EXPECT_CALL(*mock_websocket_stream, async_read(_,_));

auto session = std::make_shared<bzn::session>(this->io_context, bzn::session_id(1), TEST_ENDPOINT, this->mock_chaos, [](auto, auto){}, TEST_TIMEOUT, [](){});
auto session = std::make_shared<bzn::session>(this->io_context, bzn::session_id(1), TEST_ENDPOINT, this->mock_chaos, [](auto, auto){}, TEST_TIMEOUT, [](){}, nullptr);
session->accept(mock_websocket_stream);
accept_handler(boost::system::error_code{});

Expand Down
9 changes: 5 additions & 4 deletions pbft/pbft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,8 @@ void
pbft::forward_request_to_primary(const bzn_envelope& request_env)
{
LOG(info) << "Forwarding request to primary";
this->node->send_message(bzn::make_endpoint(this->get_primary()), std::make_shared<bzn_envelope>(request_env));
this->node
->send_signed_message(bzn::make_endpoint(this->get_primary()), std::make_shared<bzn_envelope>(request_env));

const bzn::hash_t req_hash = this->crypto->hash(request_env);

Expand Down Expand Up @@ -583,7 +584,7 @@ pbft::broadcast(const bzn_envelope& msg)

for (const auto& peer : this->current_peers())
{
this->node->send_message(make_endpoint(peer), msg_ptr);
this->node->send_signed_message(make_endpoint(peer), msg_ptr);
}
}

Expand Down Expand Up @@ -963,7 +964,7 @@ pbft::request_checkpoint_state(const checkpoint_t& cp)
// TODO: fix the race condition here where receiving node may not have had time to
// stabilize its checkpoint yet.
auto msg_ptr = std::make_shared<bzn_envelope>(this->wrap_message(msg));
this->node->send_message(make_endpoint(selected), msg_ptr);
this->node->send_signed_message(make_endpoint(selected), msg_ptr);
}

const peer_address_t&
Expand Down Expand Up @@ -1840,7 +1841,7 @@ pbft::join_swarm()

LOG(info) << "Sending request to join swarm to node " << this->current_peers()[selected].uuid;
auto msg_ptr = std::make_shared<bzn_envelope>(this->wrap_message(join_msg));
this->node->send_message(make_endpoint(this->current_peers()[selected]), msg_ptr);
this->node->send_signed_message(make_endpoint(this->current_peers()[selected]), msg_ptr);

this->in_swarm = swarm_status::joining;
this->join_retry_timer->expires_from_now(JOIN_RETRY_INTERVAL);
Expand Down
8 changes: 4 additions & 4 deletions pbft/test/pbft_audit_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ namespace bzn::test
{
TEST_F(pbft_test, test_local_commit_sends_audit_messages)
{
EXPECT_CALL(*mock_node, send_message(A<const boost::asio::ip::tcp::endpoint&>(), ResultOf(is_audit, Eq(false))))
EXPECT_CALL(*mock_node, send_signed_message(A<const boost::asio::ip::tcp::endpoint&>(), ResultOf(is_audit, Eq(false))))
.Times(AnyNumber());
EXPECT_CALL(*mock_node, send_message(A<const boost::asio::ip::tcp::endpoint&>(), ResultOf(is_audit, Eq(true))))
EXPECT_CALL(*mock_node, send_signed_message(A<const boost::asio::ip::tcp::endpoint&>(), ResultOf(is_audit, Eq(true))))
.Times(Exactly(TEST_PEER_LIST.size()));

this->build_pbft();
Expand Down Expand Up @@ -47,7 +47,7 @@ namespace bzn::test

TEST_F(pbft_test, primary_sends_primary_status)
{
EXPECT_CALL(*mock_node, send_message(A<const boost::asio::ip::tcp::endpoint&>(), ResultOf(is_audit, Eq(true))))
EXPECT_CALL(*mock_node, send_signed_message(A<const boost::asio::ip::tcp::endpoint&>(), ResultOf(is_audit, Eq(true))))
.Times(Exactly(TEST_PEER_LIST.size()));

this->build_pbft();
Expand All @@ -59,7 +59,7 @@ namespace bzn::test

TEST_F(pbft_test, nonprimary_does_not_send_primary_status)
{
EXPECT_CALL(*mock_node, send_message(A<const boost::asio::ip::tcp::endpoint&>(), ResultOf(is_audit, Eq(true))))
EXPECT_CALL(*mock_node, send_signed_message(A<const boost::asio::ip::tcp::endpoint&>(), ResultOf(is_audit, Eq(true))))
.Times(Exactly(0));

this->uuid = SECOND_NODE_UUID;
Expand Down
10 changes: 5 additions & 5 deletions pbft/test/pbft_catchup_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ namespace bzn
this->build_pbft();

// node shouldn't be sending any checkpoint messages right now
EXPECT_CALL(*mock_node, send_message(A<const boost::asio::ip::tcp::endpoint&>(), ResultOf(is_checkpoint, Eq(true))))
EXPECT_CALL(*mock_node, send_signed_message(A<const boost::asio::ip::tcp::endpoint&>(), ResultOf(is_checkpoint, Eq(true))))
.Times((Exactly(0)));

auto nodes = TEST_PEER_LIST.begin();
Expand All @@ -121,7 +121,7 @@ namespace bzn

// one more checkpoint message and the node should request state from a random node
auto primary = this->pbft->get_primary();
EXPECT_CALL(*mock_node, send_message(A<const boost::asio::ip::tcp::endpoint&>(), ResultOf(is_get_state, Eq(true))))
EXPECT_CALL(*mock_node, send_signed_message(A<const boost::asio::ip::tcp::endpoint&>(), ResultOf(is_get_state, Eq(true))))
.Times((Exactly(1)));

bzn::peer_address_t node(*nodes++);
Expand All @@ -140,7 +140,7 @@ namespace bzn
}

// since the node has this checkpoint it should NOT request state for it
EXPECT_CALL(*mock_node, send_message(A<const boost::asio::ip::tcp::endpoint&>(), ResultOf(is_get_state, Eq(true))))
EXPECT_CALL(*mock_node, send_signed_message(A<const boost::asio::ip::tcp::endpoint&>(), ResultOf(is_get_state, Eq(true))))
.Times((Exactly(0)));
stabilize_checkpoint(100);
}
Expand Down Expand Up @@ -171,7 +171,7 @@ namespace bzn

// get the node to request state
auto primary = this->pbft->get_primary();
EXPECT_CALL(*mock_node, send_message(A<const boost::asio::ip::tcp::endpoint&>(), ResultOf(is_get_state, Eq(true))))
EXPECT_CALL(*mock_node, send_signed_message(A<const boost::asio::ip::tcp::endpoint&>(), ResultOf(is_get_state, Eq(true))))
.Times((Exactly(1)));

auto nodes = TEST_PEER_LIST.begin();
Expand Down Expand Up @@ -204,7 +204,7 @@ namespace bzn

// get the node to request state
auto primary = this->pbft->get_primary();
EXPECT_CALL(*mock_node, send_message(A<const boost::asio::ip::tcp::endpoint&>(), ResultOf(is_get_state, Eq(true))))
EXPECT_CALL(*mock_node, send_signed_message(A<const boost::asio::ip::tcp::endpoint&>(), ResultOf(is_get_state, Eq(true))))
.Times((Exactly(1)));

auto nodes = TEST_PEER_LIST.begin();
Expand Down
2 changes: 1 addition & 1 deletion pbft/test/pbft_checkpoint_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ namespace bzn::test

TEST_F(pbft_checkpoint_test, test_checkpoint_messages_sent_on_execute)
{
EXPECT_CALL(*mock_node, send_message(A<const boost::asio::ip::tcp::endpoint&>(), ResultOf(is_checkpoint, Eq(true))))
EXPECT_CALL(*mock_node, send_signed_message(A<const boost::asio::ip::tcp::endpoint&>(), ResultOf(is_checkpoint, Eq(true))))
.Times(Exactly(TEST_PEER_LIST.size()));

this->build_pbft();
Expand Down

0 comments on commit d3a4c4d

Please sign in to comment.