Skip to content
This repository has been archived by the owner on Mar 3, 2020. It is now read-only.

Commit

Permalink
kep-808 - a garbage collection callback is added to a session as it i…
Browse files Browse the repository at this point in the history
…s added to the session pool

kep-808 Made changes requested by Isabel
  • Loading branch information
paularchard authored and rnistuk committed Mar 4, 2019
1 parent 49e683e commit efa762b
Show file tree
Hide file tree
Showing 27 changed files with 1,251 additions and 152 deletions.
9 changes: 7 additions & 2 deletions mocks/mock_session_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ namespace bzn {
void());
MOCK_CONST_METHOD0(is_open,
bool());
MOCK_METHOD1(open, void(std::shared_ptr<bzn::beast::websocket_base> ws_factory));
MOCK_METHOD1(accept, void(std::shared_ptr<bzn::beast::websocket_stream_base> ws));
MOCK_METHOD1(open,
void(std::shared_ptr<bzn::beast::websocket_base> ws_factory));
MOCK_METHOD1(accept,
void(std::shared_ptr<bzn::beast::websocket_stream_base> ws));
MOCK_METHOD1(add_shutdown_handler,
void(bzn::session_shutdown_handler handler));

};
} // namespace bzn
6 changes: 3 additions & 3 deletions node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ node::do_accept()
, self->chaos
, std::bind(&node::priv_protobuf_handler, self, std::placeholders::_1, std::placeholders::_2)
, self->options->get_ws_idle_timeout()
, [](){}
, std::list<bzn::session_shutdown_handler>{[](){}}
, self->crypto);

session->accept(std::move(ws));

LOG(info) << "accepting new incomming connection with " << key;
LOG(info) << "accepting new incoming connection with " << key;
// Do not attempt to identify the incoming session; one ip address could be running multiple daemons
// and we can't identify them based on the outgoing ports they choose
}
Expand Down Expand Up @@ -163,7 +163,7 @@ node::find_session(const boost::asio::ip::tcp::endpoint& ep)
, this->chaos
, std::bind(&node::priv_protobuf_handler, shared_from_this(), std::placeholders::_1, std::placeholders::_2)
, this->options->get_ws_idle_timeout()
, std::bind(&node::priv_session_shutdown_handler, shared_from_this(), key)
, std::list<bzn::session_shutdown_handler>{std::bind(&node::priv_session_shutdown_handler, shared_from_this(), key)}
, this->crypto);
session->open(this->websocket);
sessions.insert_or_assign(key, session);
Expand Down
1 change: 0 additions & 1 deletion node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
namespace bzn
{
using ep_key_t = std::string;
using session_shutdown_handler = std::function<void()>;

class node final : public bzn::node_base, public std::enable_shared_from_this<node>
{
Expand Down
16 changes: 13 additions & 3 deletions node/session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ session::session(
std::shared_ptr<bzn::chaos_base> chaos,
bzn::protobuf_handler proto_handler,
std::chrono::milliseconds ws_idle_timeout,
bzn::session_shutdown_handler shutdown_handler,
std::list<bzn::session_shutdown_handler> shutdown_handlers,
std::shared_ptr<bzn::crypto_base> crypto
)
: session_id(session_id)
, ep(std::move(ep))
, io_context(std::move(io_context))
, chaos(std::move(chaos))
, proto_handler(std::move(proto_handler))
, shutdown_handler(std::move(shutdown_handler))
, shutdown_handlers(std::move(shutdown_handlers))
, idle_timer(this->io_context->make_unique_steady_timer())
, ws_idle_timeout(std::move(ws_idle_timeout))
, write_buffer(nullptr, 0)
Expand Down Expand Up @@ -127,6 +127,12 @@ session::accept(std::shared_ptr<bzn::beast::websocket_stream_base> ws)
);
}

void
session::add_shutdown_handler(const bzn::session_shutdown_handler handler)
{
this->shutdown_handlers.push_back(handler);
}

void
session::do_read()
{
Expand Down Expand Up @@ -281,7 +287,11 @@ session::close()

this->closing = true;
LOG(debug) << "closing session " << std::to_string(this->session_id);
this->io_context->post(this->shutdown_handler);

for(const auto& handler : this->shutdown_handlers)
{
this->io_context->post(handler);
}

if (this->websocket && this->websocket->is_open())
{
Expand Down
6 changes: 4 additions & 2 deletions node/session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace bzn
std::shared_ptr<bzn::chaos_base> chaos,
bzn::protobuf_handler proto_handler,
std::chrono::milliseconds ws_idle_timeout,
bzn::session_shutdown_handler shutdown_handler,
std::list<bzn::session_shutdown_handler> shutdown_handlers,
std::shared_ptr<bzn::crypto_base> crypto);

~session();
Expand All @@ -57,6 +57,8 @@ namespace bzn
void open(std::shared_ptr<bzn::beast::websocket_base> ws_factory) override;
void accept(std::shared_ptr<bzn::beast::websocket_stream_base> ws) override;

void add_shutdown_handler(const bzn::session_shutdown_handler handler) override;

private:
void do_read();
void do_write();
Expand All @@ -73,7 +75,7 @@ namespace bzn
std::list<std::shared_ptr<bzn::encoded_message>> write_queue;

bzn::protobuf_handler proto_handler;
bzn::session_shutdown_handler shutdown_handler;
std::list<bzn::session_shutdown_handler> shutdown_handlers;

std::unique_ptr<bzn::asio::steady_timer_base> idle_timer;
const std::chrono::milliseconds ws_idle_timeout;
Expand Down
7 changes: 7 additions & 0 deletions node/session_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace bzn
// forward declare...
class session_base;

using session_shutdown_handler = std::function<void()>;
using message_handler = std::function<void(const bzn::json_message& msg, std::shared_ptr<bzn::session_base> session)>;
using protobuf_handler = std::function<void(const bzn_envelope& msg, std::shared_ptr<bzn::session_base> session)>;

Expand Down Expand Up @@ -71,6 +72,12 @@ namespace bzn
* Accept an incoming connection on some websocket
*/
virtual void accept(std::shared_ptr<bzn::beast::websocket_stream_base> ws) = 0;

/**
* Add additional shutdown handlers to the session
* @param handler
*/
virtual void add_shutdown_handler(bzn::session_shutdown_handler handler) = 0;
};

} // bzn
43 changes: 40 additions & 3 deletions node/test/session_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <gmock/gmock.h>
#include <proto/bluzelle.pb.h>

#include <list>

using namespace ::testing;

namespace
Expand Down Expand Up @@ -66,7 +68,7 @@ class session_test2 : public Test

session_test2()
{
session = std::make_shared<bzn::session>(mock.io_context, 0, TEST_ENDPOINT, this->mock_chaos, [&](auto, auto){this->handler_called++;}, TEST_TIMEOUT, [](){}, nullptr);
session = std::make_shared<bzn::session>(mock.io_context, 0, TEST_ENDPOINT, this->mock_chaos, [&](auto, auto){this->handler_called++;}, TEST_TIMEOUT, std::list<bzn::session_shutdown_handler>{[](){}}, nullptr);
}

void yield()
Expand All @@ -93,7 +95,7 @@ namespace bzn

EXPECT_CALL(*mock_websocket_stream, async_read(_,_));

auto session = std::make_shared<bzn::session>(this->io_context, bzn::session_id(1), TEST_ENDPOINT, this->mock_chaos, [](auto, auto){}, TEST_TIMEOUT, [](){}, nullptr);
auto session = std::make_shared<bzn::session>(this->io_context, bzn::session_id(1), TEST_ENDPOINT, this->mock_chaos, [](auto, auto){}, TEST_TIMEOUT, std::list<bzn::session_shutdown_handler>{[](){}}, nullptr);
session->accept(mock_websocket_stream);
accept_handler(boost::system::error_code{});

Expand Down Expand Up @@ -146,7 +148,7 @@ namespace bzn
bzn::smart_mock_io mock;
mock.tcp_connect_works = false;

auto session = std::make_shared<bzn::session>(mock.io_context, 0, TEST_ENDPOINT, this->mock_chaos, [](auto, auto){}, TEST_TIMEOUT, [](){}, nullptr);
auto session = std::make_shared<bzn::session>(mock.io_context, 0, TEST_ENDPOINT, this->mock_chaos, [](auto, auto){}, TEST_TIMEOUT, std::list<bzn::session_shutdown_handler>{[](){}}, nullptr);
session->open(mock.websocket);

this->yield();
Expand All @@ -156,4 +158,39 @@ namespace bzn
mock.timer_callbacks.at(0)(boost::system::error_code{});
}

TEST_F(session_test2, additional_shutdown_handlers_can_be_added_to_session)
{

bzn::smart_mock_io mock;
mock.tcp_connect_works = false;

std::vector<uint8_t> handler_counters { 0,0,0 };
{
auto session = std::make_shared<bzn::session>(mock.io_context
, 0, TEST_ENDPOINT, this->mock_chaos, [](auto, auto){}, TEST_TIMEOUT
, std::list<bzn::session_shutdown_handler>{[&handler_counters]() {
++handler_counters[0];
}}, nullptr);

session->add_shutdown_handler([&handler_counters](){++handler_counters[1];});
session->add_shutdown_handler([&handler_counters](){++handler_counters[2];});

session->open(mock.websocket);

this->yield();

// we are just testing that this doesn't cause a segfault
mock.timer_callbacks.at(0)(boost::system::error_code{});
mock.timer_callbacks.at(0)(boost::system::error_code{});

}
this->yield();

// each shutdown handler must be called exactly once.
for(const auto handler_counter : handler_counters)
{
EXPECT_EQ(handler_counter, 1);
}
}

} // bzn
2 changes: 1 addition & 1 deletion pbft/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ add_library(pbft STATIC
pbft_config_store.cpp
database_pbft_service.cpp
database_pbft_service.hpp
)
pbft_persistent_state.cpp)

target_link_libraries(pbft utils pbft_operations proto)
target_include_directories(pbft PRIVATE ${JSONCPP_INCLUDE_DIRS} ${PROTO_INCLUDE_DIR})
Expand Down
18 changes: 9 additions & 9 deletions pbft/database_pbft_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@ database_pbft_service::process_awaiting_operations()
this->crud->handle_request(op_it->second->get_request().sender(), request, nullptr);
}

if (this->next_request_sequence == this->next_checkpoint)
{
if (this->crud->save_state())
{
this->last_checkpoint = this->next_request_sequence;
}
}

this->io_context->post(std::bind(this->execute_handler, (*op_it).second));
}

Expand All @@ -149,14 +157,6 @@ database_pbft_service::process_awaiting_operations()
throw std::runtime_error("Failed to remove pbft_request from database! (" + std::to_string(uint8_t(result)) + ")");
}

if (this->next_request_sequence == this->next_checkpoint)
{
if (this->crud->save_state())
{
this->last_checkpoint = this->next_request_sequence;
}
}

++this->next_request_sequence;

this->save_next_request_sequence();
Expand All @@ -166,7 +166,7 @@ database_pbft_service::process_awaiting_operations()
bzn::hash_t
database_pbft_service::service_state_hash(uint64_t /*sequence_number*/) const
{
// TODO: not sure how this works...
// TODO: not sure how this works... (KEP-1203)

return "";
}
Expand Down

0 comments on commit efa762b

Please sign in to comment.