Skip to content
This repository has been archived by the owner on Mar 3, 2020. It is now read-only.

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
isabelsavannah committed Jan 30, 2019
1 parent 841fb79 commit 3ec3544
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 130 deletions.
4 changes: 2 additions & 2 deletions mocks/mock_session_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace bzn {
void());
MOCK_CONST_METHOD0(is_open,
bool());
MOCK_METHOD1(open_connection, void(std::shared_ptr<bzn::beast::websocket_base> ws_factory));
MOCK_METHOD1(accept_connection, void(std::shared_ptr<bzn::beast::websocket_stream_base> ws));
MOCK_METHOD1(open, void(std::shared_ptr<bzn::beast::websocket_base> ws_factory));
MOCK_METHOD1(accept, void(std::shared_ptr<bzn::beast::websocket_stream_base> ws));
};
} // namespace bzn
58 changes: 34 additions & 24 deletions node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,6 @@ node::node(std::shared_ptr<bzn::asio::io_context_base> io_context, std::shared_p
void
node::start()
{
this->weak_priv_protobuf_handler =
[weak_self = weak_from_this()](auto msg, auto session)
{
auto strong_self = weak_self.lock();
if (strong_self)
{
strong_self->priv_protobuf_handler(msg, session);
}
else
{
LOG(warning) << "ignoring incoming message because node is gone";
}
};
std::call_once(this->start_once, &node::do_accept, this);
}

Expand Down Expand Up @@ -98,8 +85,16 @@ node::do_accept()
std::shared_ptr<bzn::beast::websocket_stream_base> ws = self->websocket->make_unique_websocket_stream(
self->acceptor_socket->get_tcp_socket());

auto session = std::make_shared<bzn::session>(self->io_context, ++self->session_id_counter, ep, self->chaos, self->weak_priv_protobuf_handler, self->options->get_ws_idle_timeout());
session->accept_connection(std::move(ws));
auto session = std::make_shared<bzn::session>(
self->io_context
, ++self->session_id_counter
, ep
, self->chaos
, std::bind(&node::priv_protobuf_handler, self, std::placeholders::_1, std::placeholders::_2)
, self->options->get_ws_idle_timeout()
, [](){});

session->accept(std::move(ws));

LOG(info) << "accepting new incomming connection with " << key;
// Do not attempt to identify the incoming session; one ip address could be running multiple daemons
Expand Down Expand Up @@ -132,26 +127,41 @@ node::priv_protobuf_handler(const bzn_envelope& msg, std::shared_ptr<bzn::sessio

}

void
node::priv_session_death_handler(const ep_key_t& ep_key)
{
std::shared_ptr<bzn::session_base> session;
std::lock_guard<std::mutex> lock(this->session_map_mutex);
if (this->sessions.find(ep_key) != this->sessions.end() && (session = this->sessions.at(ep_key).lock()) && session->is_open())
{
// the session may have already been replaced, and we don't want to remove the new one if so
return;
}
this->sessions.erase(ep_key);
}

void
node::send_message_str(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn::encoded_message> msg) {
std::shared_ptr<bzn::session_base> session;

{
std::lock_guard<std::mutex> lock(this->session_map_mutex);
auto key = this->key_from_ep(ep);

if (this->sessions.find(key) == this->sessions.end() || !this->sessions.at(key)->is_open()) {
auto session = std::make_shared<bzn::session>(
if (this->sessions.find(key) == this->sessions.end() || !(session = this->sessions.at(key).lock()) || !session->is_open())
{
session = std::make_shared<bzn::session>(
this->io_context
, ++this->session_id_counter
, ep
, this->chaos
, this->weak_priv_protobuf_handler
, this->options->get_ws_idle_timeout());
session->open_connection(this->websocket);
, std::bind(&node::priv_protobuf_handler, shared_from_this(), std::placeholders::_1, std::placeholders::_2)
, this->options->get_ws_idle_timeout()
, std::bind(&node::priv_session_death_handler, shared_from_this(), key));
session->open(this->websocket);
sessions.insert_or_assign(key, session);
}

session = this->sessions.at(key);
// else session was assigned by the condition
}

session->send_message(msg);
Expand All @@ -173,8 +183,8 @@ node::send_message(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr<bzn
this->send_message_str(ep, std::make_shared<std::string>(msg->SerializeAsString()));
}

std::string
node::key_from_ep(const boost::asio::ip::tcp::endpoint &ep)
ep_key_t
node::key_from_ep(const boost::asio::ip::tcp::endpoint& ep)
{
return ep.address().to_string() + ":" + std::to_string(ep.port());
}
11 changes: 7 additions & 4 deletions node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@

namespace bzn
{
using ep_key_t = std::string;
using session_death_handler = std::function<void()>;

class node final : public bzn::node_base, public std::enable_shared_from_this<node>
{
public:
Expand All @@ -48,13 +51,15 @@ namespace bzn

void do_accept();


void priv_protobuf_handler(const bzn_envelope& msg, std::shared_ptr<bzn::session_base> session);
void priv_session_death_handler(const ep_key_t& ep_key);

std::shared_ptr<bzn::session_base> open_session(const boost::asio::ip::tcp::endpoint& ep);

std::string key_from_ep(const boost::asio::ip::tcp::endpoint& ep);
ep_key_t key_from_ep(const boost::asio::ip::tcp::endpoint& ep);

std::unordered_map<std::string, std::shared_ptr<bzn::session_base>> sessions;
std::unordered_map<ep_key_t, std::weak_ptr<bzn::session_base>> sessions;
std::mutex session_map_mutex;

std::unique_ptr<bzn::asio::tcp_acceptor_base> tcp_acceptor;
Expand All @@ -72,8 +77,6 @@ namespace bzn

std::shared_ptr<bzn::crypto_base> crypto;
std::shared_ptr<bzn::options_base> options;

std::function<void(const bzn_envelope&, std::shared_ptr<bzn::session_base>)> weak_priv_protobuf_handler;
};

} // bzn
152 changes: 78 additions & 74 deletions node/session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

#include <node/session.hpp>
#include <node/node.hpp>
#include <sstream>


Expand All @@ -24,13 +25,15 @@ session::session(
boost::asio::ip::tcp::endpoint ep,
std::shared_ptr<bzn::chaos_base> chaos,
bzn::protobuf_handler proto_handler,
std::chrono::milliseconds ws_idle_timeout
std::chrono::milliseconds ws_idle_timeout,
bzn::session_death_handler death_handler
)
: session_id(session_id)
, ep(std::move(ep))
, io_context(std::move(io_context))
, chaos(std::move(chaos))
, proto_handler(std::move(proto_handler))
, death_handler(std::move(death_handler))
, idle_timer(this->io_context->make_unique_steady_timer())
, ws_idle_timeout(std::move(ws_idle_timeout))
, write_buffer(nullptr, 0)
Expand All @@ -45,80 +48,80 @@ session::start_idle_timeout()

this->idle_timer->expires_from_now(this->ws_idle_timeout);
this->idle_timer->async_wait(
[self = shared_from_this()](auto /*ec*/)
[self = shared_from_this()](auto /*ec*/)
{
if (!self->activity)
{
if (!self->activity)
{
LOG(info) << "Closing session " << std::to_string(self->session_id) << " due to inactivity";
self->close();
return;
}
LOG(info) << "Closing session " << std::to_string(self->session_id) << " due to inactivity";
self->close();
return;
}

self->start_idle_timeout();
});
self->start_idle_timeout();
});
}

void
session::open_connection(std::shared_ptr<bzn::beast::websocket_base> ws_factory)
session::open(std::shared_ptr<bzn::beast::websocket_base> ws_factory)
{
this->start_idle_timeout();

std::shared_ptr<bzn::asio::tcp_socket_base> socket = this->io_context->make_unique_tcp_socket();
socket->async_connect(this->ep,
[self = shared_from_this(), socket, ws_factory](const boost::system::error_code& ec)
{
self->activity = true;
[self = shared_from_this(), socket, ws_factory](const boost::system::error_code& ec)
{
self->activity = true;

if (ec)
{
LOG(error) << "failed to connect to: " << self->ep.address().to_string() << ":" << self->ep.port() << " - " << ec.message();
if (ec)
{
LOG(error) << "failed to connect to: " << self->ep.address().to_string() << ":" << self->ep.port() << " - " << ec.message();

return;
}
return;
}

// we've completed the handshake...
// we've completed the handshake...

std::lock_guard<std::mutex> lock(self->socket_lock);
self->websocket = ws_factory->make_unique_websocket_stream(socket->get_tcp_socket());
self->websocket->async_handshake(self->ep.address().to_string(), "/",
[self, ws_factory](const boost::system::error_code& ec)
{
self->activity = true;
std::lock_guard<std::mutex> lock(self->socket_lock);
self->websocket = ws_factory->make_unique_websocket_stream(socket->get_tcp_socket());
self->websocket->async_handshake(self->ep.address().to_string(), "/",
[self, ws_factory](const boost::system::error_code& ec)
{
self->activity = true;

if (ec)
{
LOG(error) << "handshake failed: " << ec.message();
if (ec)
{
LOG(error) << "handshake failed: " << ec.message();

return;
}
return;
}

self->do_read();
self->do_write();
});
});
self->do_read();
self->do_write();
});
});
}

void
session::accept_connection(std::shared_ptr<bzn::beast::websocket_stream_base> ws)
session::accept(std::shared_ptr<bzn::beast::websocket_stream_base> ws)
{
this->start_idle_timeout();

std::lock_guard<std::mutex> lock(this->socket_lock);
this->websocket = std::move(ws);
this->websocket->async_accept(
[self = shared_from_this()](boost::system::error_code ec)
{
self->activity = true;

if (ec)
{
LOG(error) << "websocket accept failed: " << ec.message();
return;
}
[self = shared_from_this()](boost::system::error_code ec)
{
self->activity = true;

self->do_read();
self->do_write();
if (ec)
{
LOG(error) << "websocket accept failed: " << ec.message();
return;
}

self->do_read();
self->do_write();
}
);
}

Expand All @@ -136,39 +139,39 @@ session::do_read()
this->reading = true;

this->websocket->async_read(
*buffer, [self = shared_from_this(), buffer](boost::system::error_code ec, auto /*bytes_transferred*/)
{
self->activity = true;
*buffer, [self = shared_from_this(), buffer](boost::system::error_code ec, auto /*bytes_transferred*/)
{
self->activity = true;

if(ec)
if(ec)
{
// don't log close of websocket...
if (ec != boost::beast::websocket::error::closed && ec != boost::asio::error::eof)
{
// don't log close of websocket...
if (ec != boost::beast::websocket::error::closed && ec != boost::asio::error::eof)
{
LOG(error) << "websocket read failed: " << ec.message();
}
self->close();
return;
LOG(error) << "websocket read failed: " << ec.message();
}
self->close();
return;
}

// get the message...
std::stringstream ss;
ss << boost::beast::buffers(buffer->data());

bzn_envelope proto_msg;
// get the message...
std::stringstream ss;
ss << boost::beast::buffers(buffer->data());

if (proto_msg.ParseFromIstream(&ss))
{
self->io_context->post(std::bind(self->proto_handler, proto_msg, self));
}
else
{
LOG(error) << "Failed to parse incoming message";
}
bzn_envelope proto_msg;

self->reading = false;
self->do_read();
if (proto_msg.ParseFromIstream(&ss))
{
self->io_context->post(std::bind(self->proto_handler, proto_msg, self));
}
else
{
LOG(error) << "Failed to parse incoming message";
}

self->reading = false;
self->do_read();
}
);
}

Expand Down Expand Up @@ -254,7 +257,7 @@ session::send_message(std::shared_ptr<bzn::encoded_message> msg)
void
session::close()
{
// TODO: re-open socket later if we still have messages to send?
// TODO: re-open socket later if we still have messages to send? (KEP-1037)
LOG(info) << "closing session";

std::lock_guard<std::mutex> lock(this->socket_lock);
Expand All @@ -265,6 +268,7 @@ session::close()

this->closing = true;
LOG(debug) << "closing session " << std::to_string(this->session_id);
this->io_context->post(this->death_handler);

if (this->websocket->is_open())
{
Expand Down

0 comments on commit 3ec3544

Please sign in to comment.