Skip to content
This repository has been archived by the owner on Mar 3, 2020. It is now read-only.

Commit

Permalink
KEP-653: Support in node and session for protobuf messages without json
Browse files Browse the repository at this point in the history
wrapper. pbft changed to use this support. fixed hack in session test.
  • Loading branch information
isabelsavannah committed Sep 28, 2018
1 parent 1b5ecfb commit 30e6c04
Show file tree
Hide file tree
Showing 18 changed files with 269 additions and 138 deletions.
4 changes: 4 additions & 0 deletions mocks/mock_node_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@ class Mocknode_base : public node_base {
public:
MOCK_METHOD2(register_for_message,
bool(const std::string& msg_type, bzn::message_handler message_handler));
MOCK_METHOD2(register_for_message,
bool(const bzn_msg_type msg_type, bzn::protobuf_handler message_handler));
MOCK_METHOD0(start,
void());
MOCK_METHOD2(send_message,
void(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn::message> msg));
MOCK_METHOD2(send_message_str,
void(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<std::string> msg));
};

} // namespace bzn
4 changes: 2 additions & 2 deletions mocks/mock_session_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
namespace bzn {
class Mocksession_base : public session_base {
public:
MOCK_METHOD1(start,
void(bzn::message_handler handler));
MOCK_METHOD2(start,
void(bzn::message_handler handler, bzn::protobuf_handler proto_handler));
MOCK_METHOD2(send_message,
void(std::shared_ptr<bzn::message> msg, bool end_session));
MOCK_METHOD2(send_message,
Expand Down
2 changes: 1 addition & 1 deletion node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ add_library(node STATIC
session.cpp
../mocks/mock_session_base.hpp)

target_link_libraries(node)
target_link_libraries(node proto)
add_dependencies(node proto googletest jsoncpp) # for FRIEND_TEST

target_include_directories(node PRIVATE ${JSONCPP_INCLUDE_DIRS} ${PROTO_INCLUDE_DIR})
Expand Down
113 changes: 83 additions & 30 deletions node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,30 @@ node::register_for_message(const std::string& msg_type, bzn::message_handler msg
}


bool
node::register_for_message(const bzn_msg_type type, bzn::protobuf_handler msg_handler)
{
std::lock_guard<std::mutex> lock(this->message_map_mutex);

// never allow!
if (!msg_handler)
{
return false;
}

if (this->protobuf_map.find(type) != this->protobuf_map.end())
{
LOG(debug) << bzn_msg_type_Name(type) << " message type already registered";

return false;
}

this->protobuf_map[type] = std::move(msg_handler);

return true;
}


void
node::do_accept()
{
Expand All @@ -86,8 +110,9 @@ node::do_accept()
auto ws = self->websocket->make_unique_websocket_stream(
self->acceptor_socket->get_tcp_socket());

std::make_shared<bzn::session>(self->io_context, ++self->session_id_counter, std::move(ws), self->ws_idle_timeout)->start(
std::bind(&node::priv_msg_handler, self, std::placeholders::_1, std::placeholders::_2));
std::make_shared<bzn::session>(self->io_context, ++self->session_id_counter, std::move(ws), self->ws_idle_timeout)
->start(std::bind(&node::priv_msg_handler, self, std::placeholders::_1, std::placeholders::_2)
, std::bind(&node::priv_protobuf_handler, self, std::placeholders::_1, std::placeholders::_2));
}

self->do_accept();
Expand All @@ -114,41 +139,69 @@ node::priv_msg_handler(const Json::Value& msg, std::shared_ptr<bzn::session_base
session->close();
}

void
node::priv_protobuf_handler(const wrapped_bzn_msg& msg, std::shared_ptr<bzn::session_base> session)
{
std::lock_guard<std::mutex> lock(this->message_map_mutex);

if (auto it = this->protobuf_map.find(msg.type()); it != this->protobuf_map.end())
{
// Should also do signature verification here
it->second(msg, std::move(session));
}
else
{
LOG(debug) << "no handler for message type " << bzn_msg_type_Name(msg.type());
}

}

void
node::send_message(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn::message> msg)
node::send_message_str(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<std::string> msg)
{
std::shared_ptr<bzn::asio::tcp_socket_base> socket = this->io_context->make_unique_tcp_socket();

socket->async_connect(ep,
[self = shared_from_this(), socket, ep, msg](const boost::system::error_code& ec)
{
if (ec)
[self = shared_from_this(), socket, ep, msg](const boost::system::error_code& ec)
{
LOG(error) << "failed to connect to: " << ep.address().to_string() << ":" << ep.port() << " - " << ec.message();

return;
}

// we've completed the handshake...
std::shared_ptr<bzn::beast::websocket_stream_base> ws = self->websocket->make_unique_websocket_stream(socket->get_tcp_socket());

ws->async_handshake(ep.address().to_string(), "/",
[self, ws, msg](const boost::system::error_code& ec)
if (ec)
{
if (ec)
{
LOG(error) << "handshake failed: " << ec.message();

return;
}

auto session = std::make_shared<bzn::session>(self->io_context, ++self->session_id_counter, ws, self->ws_idle_timeout);

session->start(std::bind(&node::priv_msg_handler, self, std::placeholders::_1, std::placeholders::_2));
LOG(error) << "failed to connect to: " << ep.address().to_string() << ":" << ep.port() << " - " << ec.message();

return;
}

// we've completed the handshake...
std::shared_ptr<bzn::beast::websocket_stream_base> ws = self->websocket->make_unique_websocket_stream(socket->get_tcp_socket());

ws->async_handshake(ep.address().to_string(), "/",
[self, ws, msg](const boost::system::error_code& ec)
{
if (ec)
{
LOG(error) << "handshake failed: " << ec.message();

return;
}

auto session = std::make_shared<bzn::session>(self->io_context, ++self->session_id_counter, ws, self->ws_idle_timeout);

session->start(
std::bind(
&node::priv_msg_handler
, self
, std::placeholders::_1
, std::placeholders::_2
), bzn::protobuf_handler());

// send the message requested...
session->send_message(msg, false);
});
});
}

// send the message requested...
session->send_message(msg, false);
});
});
void
node::send_message(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn::message> msg)
{
this->send_message_str(ep, std::make_shared<std::string>(msg->toStyledString()));
}
6 changes: 6 additions & 0 deletions node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,21 @@ namespace bzn

bool register_for_message(const std::string& msg_type, bzn::message_handler msg_handler) override;

bool register_for_message(const bzn_msg_type type, bzn::protobuf_handler msg_handler) override;

void start() override;

void send_message(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn::message> msg) override;

void send_message_str(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<std::string> msg) override;

private:
FRIEND_TEST(node, test_that_registered_message_handler_is_invoked);

void do_accept();

void priv_msg_handler(const bzn::message& msg, std::shared_ptr<bzn::session_base> session);
void priv_protobuf_handler(const wrapped_bzn_msg& msg, std::shared_ptr<bzn::session_base> session);

std::unique_ptr<bzn::asio::tcp_acceptor_base> tcp_acceptor;
std::shared_ptr<bzn::asio::io_context_base> io_context;
Expand All @@ -51,6 +56,7 @@ namespace bzn
const std::chrono::milliseconds ws_idle_timeout;

std::unordered_map<std::string, bzn::message_handler> message_map;
std::unordered_map<bzn_msg_type, bzn::protobuf_handler> protobuf_map;
std::mutex message_map_mutex;

std::once_flag start_once;
Expand Down
16 changes: 16 additions & 0 deletions node/node_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <boost/asio.hpp>
#include <node/session_base.hpp>
#include <json/json.h>
#include <proto/bluzelle.pb.h>


namespace bzn
Expand All @@ -34,6 +35,14 @@ namespace bzn
*/
virtual bool register_for_message(const std::string& msg_type, bzn::message_handler msg_handler) = 0;

/**
* Register for a callback to be execute when a certain message type arrives
* @param msg_type message type (crud, raft etc.)
* @param msg_handler callback
* @return true if registration succeeded
*/
virtual bool register_for_message(const bzn_msg_type type, bzn::protobuf_handler msg_handler) = 0;

/**
* Start server's listener etc.
*/
Expand All @@ -45,6 +54,13 @@ namespace bzn
* @param msg message to send
*/
virtual void send_message(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn::message> msg) = 0;

/**
* Convenience method to connect and send a message to a node
* @param ep host to send the message to
* @param msg message to send
*/
virtual void send_message_str(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<std::string> msg) = 0;
};

} // bzn
25 changes: 13 additions & 12 deletions node/session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ session::session(std::shared_ptr<bzn::asio::io_context_base> io_context, const b


void
session::start(bzn::message_handler handler)
session::start(bzn::message_handler handler, bzn::protobuf_handler proto_handler)
{
this->handler = std::move(handler);
this->proto_handler = std::move(proto_handler);

// If we haven't completed a handshake then we are accepting one...
if (!this->websocket->is_open())
Expand Down Expand Up @@ -90,20 +91,20 @@ session::do_read()
Json::Value msg;
Json::Reader reader;

if (!reader.parse(ss.str(), msg))
wrapped_bzn_msg proto_msg;

if (reader.parse(ss.str(), msg))
{
self->handler(msg, self);
}
else if(ss.seekg(0); proto_msg.ParseFromIstream(&ss))
{
self->proto_handler(proto_msg, self);
}
else
{
LOG(error) << "Failed to parse: " << reader.getFormattedErrorMessages();

// Only a unit test should change this flag since we can't intercept the buffer and modify it.
if (!self->ignore_json_errors)
{
self->close();
return;
}
}

// call subscriber...
self->handler(msg, self);
}));
}

Expand Down
7 changes: 2 additions & 5 deletions node/session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace bzn
public:
session(std::shared_ptr<bzn::asio::io_context_base> io_context, bzn::session_id session_id, std::shared_ptr<bzn::beast::websocket_stream_base> websocket, const std::chrono::milliseconds& ws_idle_timeout);

void start(bzn::message_handler handler) override;
void start(bzn::message_handler handler, bzn::protobuf_handler proto_handler) override;

void send_message(std::shared_ptr<bzn::message> msg, bool end_session) override;

Expand All @@ -45,8 +45,6 @@ namespace bzn
bzn::session_id get_session_id() override { return this->session_id; }

private:
FRIEND_TEST(node_session, test_that_when_message_arrives_registered_callback_is_executed);

void do_read();

void start_idle_timeout();
Expand All @@ -59,8 +57,7 @@ namespace bzn

const std::chrono::milliseconds ws_idle_timeout;
bzn::message_handler handler;

const bool ignore_json_errors = false;
bzn::protobuf_handler proto_handler;

std::mutex write_lock;
};
Expand Down
7 changes: 6 additions & 1 deletion node/session_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <include/bluzelle.hpp>
#include <proto/bluzelle.pb.h>


namespace bzn
Expand All @@ -23,6 +24,7 @@ namespace bzn
class session_base;

using message_handler = std::function<void(const bzn::message& msg, std::shared_ptr<bzn::session_base> session)>;
using protobuf_handler = std::function<void(const wrapped_bzn_msg& msg, std::shared_ptr<bzn::session_base> session)>;

class session_base
{
Expand All @@ -33,7 +35,10 @@ namespace bzn
* Start accepting new connections
* @param handler callback to execute when connection is established
*/
virtual void start(bzn::message_handler handler) = 0;
virtual void start(
std::function<void(const message&, std::shared_ptr<session_base>)> handler
, bzn::protobuf_handler proto_handler
) = 0;

/**
* Send a message to the connected node
Expand Down

0 comments on commit 30e6c04

Please sign in to comment.