Skip to content
This repository has been archived by the owner on Mar 3, 2020. It is now read-only.

Commit

Permalink
kep-808 - a garbage collection callback is added to a session as it i…
Browse files Browse the repository at this point in the history
…s added to the session pool
  • Loading branch information
rnistuk authored and rnistuk committed Mar 1, 2019
1 parent 49e683e commit 24929e5
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 16 deletions.
9 changes: 7 additions & 2 deletions mocks/mock_session_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ namespace bzn {
void());
MOCK_CONST_METHOD0(is_open,
bool());
MOCK_METHOD1(open, void(std::shared_ptr<bzn::beast::websocket_base> ws_factory));
MOCK_METHOD1(accept, void(std::shared_ptr<bzn::beast::websocket_stream_base> ws));
MOCK_METHOD1(open,
void(std::shared_ptr<bzn::beast::websocket_base> ws_factory));
MOCK_METHOD1(accept,
void(std::shared_ptr<bzn::beast::websocket_stream_base> ws));
MOCK_METHOD1(add_shutdown_handler,
void(bzn::session_shutdown_handler handler));

};
} // namespace bzn
6 changes: 3 additions & 3 deletions node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ node::do_accept()
, self->chaos
, std::bind(&node::priv_protobuf_handler, self, std::placeholders::_1, std::placeholders::_2)
, self->options->get_ws_idle_timeout()
, [](){}
, std::list<bzn::session_shutdown_handler>{[](){}}
, self->crypto);

session->accept(std::move(ws));

LOG(info) << "accepting new incomming connection with " << key;
LOG(info) << "accepting new incoming connection with " << key;
// Do not attempt to identify the incoming session; one ip address could be running multiple daemons
// and we can't identify them based on the outgoing ports they choose
}
Expand Down Expand Up @@ -163,7 +163,7 @@ node::find_session(const boost::asio::ip::tcp::endpoint& ep)
, this->chaos
, std::bind(&node::priv_protobuf_handler, shared_from_this(), std::placeholders::_1, std::placeholders::_2)
, this->options->get_ws_idle_timeout()
, std::bind(&node::priv_session_shutdown_handler, shared_from_this(), key)
, std::list<bzn::session_shutdown_handler>{std::bind(&node::priv_session_shutdown_handler, shared_from_this(), key)}
, this->crypto);
session->open(this->websocket);
sessions.insert_or_assign(key, session);
Expand Down
1 change: 0 additions & 1 deletion node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
namespace bzn
{
using ep_key_t = std::string;
using session_shutdown_handler = std::function<void()>;

class node final : public bzn::node_base, public std::enable_shared_from_this<node>
{
Expand Down
16 changes: 13 additions & 3 deletions node/session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ session::session(
std::shared_ptr<bzn::chaos_base> chaos,
bzn::protobuf_handler proto_handler,
std::chrono::milliseconds ws_idle_timeout,
bzn::session_shutdown_handler shutdown_handler,
std::list<bzn::session_shutdown_handler> shutdown_handlers,
std::shared_ptr<bzn::crypto_base> crypto
)
: session_id(session_id)
, ep(std::move(ep))
, io_context(std::move(io_context))
, chaos(std::move(chaos))
, proto_handler(std::move(proto_handler))
, shutdown_handler(std::move(shutdown_handler))
, shutdown_handlers(std::move(shutdown_handlers))
, idle_timer(this->io_context->make_unique_steady_timer())
, ws_idle_timeout(std::move(ws_idle_timeout))
, write_buffer(nullptr, 0)
Expand Down Expand Up @@ -127,6 +127,12 @@ session::accept(std::shared_ptr<bzn::beast::websocket_stream_base> ws)
);
}

void
session::add_shutdown_handler(const bzn::session_shutdown_handler handler)
{
this->shutdown_handlers.push_back(handler);
}

void
session::do_read()
{
Expand Down Expand Up @@ -281,7 +287,11 @@ session::close()

this->closing = true;
LOG(debug) << "closing session " << std::to_string(this->session_id);
this->io_context->post(this->shutdown_handler);

for(const auto& handler : this->shutdown_handlers)
{
this->io_context->post(handler);
}

if (this->websocket && this->websocket->is_open())
{
Expand Down
6 changes: 4 additions & 2 deletions node/session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace bzn
std::shared_ptr<bzn::chaos_base> chaos,
bzn::protobuf_handler proto_handler,
std::chrono::milliseconds ws_idle_timeout,
bzn::session_shutdown_handler shutdown_handler,
std::list<bzn::session_shutdown_handler> shutdown_handlers,
std::shared_ptr<bzn::crypto_base> crypto);

~session();
Expand All @@ -57,6 +57,8 @@ namespace bzn
void open(std::shared_ptr<bzn::beast::websocket_base> ws_factory) override;
void accept(std::shared_ptr<bzn::beast::websocket_stream_base> ws) override;

void add_shutdown_handler(const bzn::session_shutdown_handler handler) override;

private:
void do_read();
void do_write();
Expand All @@ -73,7 +75,7 @@ namespace bzn
std::list<std::shared_ptr<bzn::encoded_message>> write_queue;

bzn::protobuf_handler proto_handler;
bzn::session_shutdown_handler shutdown_handler;
std::list<bzn::session_shutdown_handler> shutdown_handlers;

std::unique_ptr<bzn::asio::steady_timer_base> idle_timer;
const std::chrono::milliseconds ws_idle_timeout;
Expand Down
7 changes: 7 additions & 0 deletions node/session_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace bzn
// forward declare...
class session_base;

using session_shutdown_handler = std::function<void()>;
using message_handler = std::function<void(const bzn::json_message& msg, std::shared_ptr<bzn::session_base> session)>;
using protobuf_handler = std::function<void(const bzn_envelope& msg, std::shared_ptr<bzn::session_base> session)>;

Expand Down Expand Up @@ -71,6 +72,12 @@ namespace bzn
* Accept an incoming connection on some websocket
*/
virtual void accept(std::shared_ptr<bzn::beast::websocket_stream_base> ws) = 0;

/**
* Add additional shutdown handlers to the session
* @param handler
*/
virtual void add_shutdown_handler(bzn::session_shutdown_handler handler) = 0;
};

} // bzn
43 changes: 40 additions & 3 deletions node/test/session_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <gmock/gmock.h>
#include <proto/bluzelle.pb.h>

#include <list>

using namespace ::testing;

namespace
Expand Down Expand Up @@ -66,7 +68,7 @@ class session_test2 : public Test

session_test2()
{
session = std::make_shared<bzn::session>(mock.io_context, 0, TEST_ENDPOINT, this->mock_chaos, [&](auto, auto){this->handler_called++;}, TEST_TIMEOUT, [](){}, nullptr);
session = std::make_shared<bzn::session>(mock.io_context, 0, TEST_ENDPOINT, this->mock_chaos, [&](auto, auto){this->handler_called++;}, TEST_TIMEOUT, std::list<bzn::session_shutdown_handler>{[](){}}, nullptr);
}

void yield()
Expand All @@ -93,7 +95,7 @@ namespace bzn

EXPECT_CALL(*mock_websocket_stream, async_read(_,_));

auto session = std::make_shared<bzn::session>(this->io_context, bzn::session_id(1), TEST_ENDPOINT, this->mock_chaos, [](auto, auto){}, TEST_TIMEOUT, [](){}, nullptr);
auto session = std::make_shared<bzn::session>(this->io_context, bzn::session_id(1), TEST_ENDPOINT, this->mock_chaos, [](auto, auto){}, TEST_TIMEOUT, std::list<bzn::session_shutdown_handler>{[](){}}, nullptr);
session->accept(mock_websocket_stream);
accept_handler(boost::system::error_code{});

Expand Down Expand Up @@ -146,7 +148,7 @@ namespace bzn
bzn::smart_mock_io mock;
mock.tcp_connect_works = false;

auto session = std::make_shared<bzn::session>(mock.io_context, 0, TEST_ENDPOINT, this->mock_chaos, [](auto, auto){}, TEST_TIMEOUT, [](){}, nullptr);
auto session = std::make_shared<bzn::session>(mock.io_context, 0, TEST_ENDPOINT, this->mock_chaos, [](auto, auto){}, TEST_TIMEOUT, std::list<bzn::session_shutdown_handler>{[](){}}, nullptr);
session->open(mock.websocket);

this->yield();
Expand All @@ -156,4 +158,39 @@ namespace bzn
mock.timer_callbacks.at(0)(boost::system::error_code{});
}

TEST_F(session_test2, additional_shutdown_handlers_can_be_added_to_session)
{

bzn::smart_mock_io mock;
mock.tcp_connect_works = false;

std::vector<uint8_t> handler_counters { 0,0,0 };
{
auto session = std::make_shared<bzn::session>(mock.io_context
, 0, TEST_ENDPOINT, this->mock_chaos, [](auto, auto){}, TEST_TIMEOUT
, std::list<bzn::session_shutdown_handler>{[&handler_counters]() {
++handler_counters[0];
}}, nullptr);

session->add_shutdown_handler([&handler_counters](){++handler_counters[1];});
session->add_shutdown_handler([&handler_counters](){++handler_counters[2];});

session->open(mock.websocket);

this->yield();

// we are just testing that this doesn't cause a segfault
mock.timer_callbacks.at(0)(boost::system::error_code{});
mock.timer_callbacks.at(0)(boost::system::error_code{});

}
this->yield();

// each shutdown handler must be called exactly once.
for(const auto handler_counter : handler_counters)
{
EXPECT_EQ(handler_counter, 1);
}
}

} // bzn
22 changes: 20 additions & 2 deletions pbft/pbft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ pbft::handle_request(const bzn_envelope& request_env, const std::shared_ptr<sess
{
if (this->sessions_waiting_on_forwarded_requests.find(hash) == this->sessions_waiting_on_forwarded_requests.end())
{
this->sessions_waiting_on_forwarded_requests[hash] = session;
this->add_session_to_pool(hash, session);
}
}

Expand Down Expand Up @@ -444,7 +444,8 @@ pbft::handle_join_or_leave(const bzn_envelope& env, const pbft_membership_msg& m
return;
}

this->sessions_waiting_on_forwarded_requests[msg_hash] = session;
this->add_session_to_pool(msg_hash, session);

if (!this->is_primary())
{
this->forward_request_to_primary(env);
Expand Down Expand Up @@ -1899,3 +1900,20 @@ uint32_t pbft::generate_random_number(uint32_t min, uint32_t max)
std::uniform_int_distribution<uint32_t> dist(min, max);
return dist(gen);
}

void pbft::add_session_to_pool(const std::string& msg_hash, std::shared_ptr<bzn::session_base> session)
{
if (session)
{
this->sessions_waiting_on_forwarded_requests[msg_hash] = session;
session->add_shutdown_handler([msg_hash, this]()
{
std::lock_guard<std::mutex> lock(this->pbft_lock);
auto it = this->sessions_waiting_on_forwarded_requests.find(msg_hash);
if (it != this->sessions_waiting_on_forwarded_requests.end() && !it->second->is_open())
{
this->sessions_waiting_on_forwarded_requests.erase(it);
}
});
}
}
4 changes: 4 additions & 0 deletions pbft/pbft.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ namespace bzn
{
// fwd declare test as it's not in the same namespace...
class pbft_test_database_response_is_forwarded_to_session_Test;
class pbft_test_add_session_to_pool_can_add_a_session_and_shutdown_handler_removes_session_from_pool_Test;
}

using request_hash_t = std::string;
Expand Down Expand Up @@ -216,6 +217,8 @@ namespace bzn
bool is_peer(const bzn::uuid_t& peer) const;
bool get_sequences_and_request_hashes_from_proofs( const pbft_msg& viewchange_msg, std::set<std::pair<uint64_t, std::string>>& sequence_request_pairs) const;

void add_session_to_pool(const std::string& msg_hash, std::shared_ptr<bzn::session_base> session);

// Using 1 as first value here to distinguish from default value of 0 in protobuf
uint64_t view = 1;
uint64_t next_issued_sequence_number = 1;
Expand Down Expand Up @@ -296,6 +299,7 @@ namespace bzn
FRIEND_TEST(pbft_newview_test, get_sequences_and_request_hashes_from_proofs);
FRIEND_TEST(pbft_newview_test, test_last_sequence_in_newview_prepared_proofs);
FRIEND_TEST(bzn::test::pbft_test, database_response_is_forwarded_to_session);
FRIEND_TEST(bzn::test::pbft_test, add_session_to_pool_can_add_a_session_and_shutdown_handler_removes_session_from_pool);

friend class pbft_proto_test;
friend class pbft_join_leave_test;
Expand Down
28 changes: 28 additions & 0 deletions pbft/test/pbft_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,34 @@ namespace bzn::test
this->database_response_handler(this->request_msg, mock_session);
}

TEST_F(pbft_test, add_session_to_pool_can_add_a_session_and_shutdown_handler_removes_session_from_pool)
{
this->build_pbft();

EXPECT_EQ(size_t(0), this->pbft->sessions_waiting_on_forwarded_requests.size());

bzn::session_shutdown_handler shutdown_handler{0};

EXPECT_CALL(*mock_session, add_shutdown_handler(_))
.Times(Exactly(1))
.WillRepeatedly(Invoke([&](auto handler) {
shutdown_handler = handler;
}));

EXPECT_CALL(*mock_session, is_open())
.WillOnce(Return(false));

pbft->handle_database_message(this->request_msg, this->mock_session);

EXPECT_EQ(size_t(1), this->pbft->sessions_waiting_on_forwarded_requests.size());

EXPECT_TRUE(shutdown_handler != nullptr);

shutdown_handler();

EXPECT_EQ(size_t(0), this->pbft->sessions_waiting_on_forwarded_requests.size());
}


TEST_F(pbft_test, client_request_executed_results_in_message_response)
{
Expand Down

0 comments on commit 24929e5

Please sign in to comment.