Skip to content
This repository has been archived by the owner on Mar 3, 2020. It is now read-only.

Commit

Permalink
Merge branch 'devel' into task/iscroggin/KEP-544
Browse files Browse the repository at this point in the history
  • Loading branch information
isabelsavannah committed Sep 12, 2018
2 parents ef33c74 + 86c7fd2 commit 23ef988
Show file tree
Hide file tree
Showing 12 changed files with 175 additions and 45 deletions.
2 changes: 2 additions & 0 deletions mocks/mock_pbft_service_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ namespace bzn {
void(uint64_t sequence_number));
MOCK_METHOD1(register_execute_handler,
void(std::function<void(const pbft_request&, uint64_t)> handler));
MOCK_METHOD1(apply_operation,
void(const std::shared_ptr<pbft_operation>&));
};

} // namespace bzn
28 changes: 21 additions & 7 deletions pbft/dummy_pbft_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,27 @@ dummy_pbft_service::dummy_pbft_service(std::shared_ptr<bzn::asio::io_context_bas
}

void
dummy_pbft_service::apply_operation(const pbft_request& request, uint64_t sequence_number)
dummy_pbft_service::apply_operation(const std::shared_ptr<pbft_operation>& op)
{
std::lock_guard<std::mutex> lock(this->lock);

this->waiting_requests[sequence_number] = request;
this->waiting_operations[op->sequence] = std::move(op);

while (this->waiting_requests.count(this->next_request_sequence) > 0)
while (this->waiting_operations.count(this->next_request_sequence) > 0)
{
auto req = waiting_requests[this->next_request_sequence];
auto op = waiting_operations[this->next_request_sequence];

LOG(info) << "Executing request " << req.ShortDebugString() << ", sequence " << this->next_request_sequence
LOG(info) << "Executing request " << op->debug_string() << ", sequence " << this->next_request_sequence
<< "\n";

if (op->session())
{
this->send_execute_response(op);
}

boost::asio::post(std::bind(this->execute_handler, req, this->next_request_sequence));
this->io_context->post(std::bind(this->execute_handler, op->request, this->next_request_sequence));

this->waiting_requests.erase(this->next_request_sequence);
this->waiting_operations.erase(this->next_request_sequence);
this->next_request_sequence++;
}
}
Expand Down Expand Up @@ -74,3 +78,13 @@ dummy_pbft_service::service_state_hash(uint64_t sequence_number) const
{
return "I don't actually have a database [" + std::to_string(sequence_number) + "]";
}

void
dummy_pbft_service::send_execute_response(const std::shared_ptr<pbft_operation>& op)
{
database_response resp;
resp.mutable_resp()->set_value("dummy database execution of " + op->debug_string());
LOG(debug) << "Sending request result " << resp.ShortDebugString();
op->session()->send_datagram(std::make_shared<std::string>(resp.SerializeAsString()));
}

8 changes: 6 additions & 2 deletions pbft/dummy_pbft_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace bzn
{
public:
dummy_pbft_service(std::shared_ptr<bzn::asio::io_context_base> io_context);
void apply_operation(const pbft_request& request, uint64_t sequence_number) override;
void apply_operation(const std::shared_ptr<pbft_operation>& op) override;
void query(const pbft_request& request, uint64_t sequence_number) const override;
void consolidate_log(uint64_t sequence_number) override;
void register_execute_handler(execute_handler_t handler) override;
Expand All @@ -40,9 +40,13 @@ namespace bzn
private:
execute_handler_t execute_handler;
std::shared_ptr<bzn::asio::io_context_base> io_context;
void send_execute_response(const std::shared_ptr<pbft_operation>& op);

uint64_t next_request_sequence = 1;
std::unordered_map<uint64_t, pbft_request> waiting_requests;
std::shared_ptr<pbft_failure_detector_base> failure_detector;

std::unordered_map<uint64_t, std::shared_ptr<pbft_operation>> waiting_operations;

std::mutex lock;
};

Expand Down
66 changes: 59 additions & 7 deletions pbft/pbft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <algorithm>
#include <numeric>
#include <iterator>
#include <crud/crud_base.hpp>

namespace
{
Expand Down Expand Up @@ -79,7 +80,10 @@ pbft::start()
[this]()
{
this->node->register_for_message("pbft",
std::bind(&pbft::unwrap_message, shared_from_this(), std::placeholders::_1, std::placeholders::_2));
std::bind(&pbft::handle_pbft_message, shared_from_this(), std::placeholders::_1, std::placeholders::_2));

this->node->register_for_message("database",
std::bind(&pbft::handle_database_message, shared_from_this(), std::placeholders::_1, std::placeholders::_2));

this->audit_heartbeat_timer->expires_from_now(heartbeat_interval);
this->audit_heartbeat_timer->async_wait(
Expand Down Expand Up @@ -143,7 +147,7 @@ pbft::handle_audit_heartbeat_timeout(const boost::system::error_code& ec)
}

void
pbft::unwrap_message(const bzn::message& json, std::shared_ptr<bzn::session_base> /*session*/)
pbft::handle_pbft_message(const bzn::message& json, std::shared_ptr<bzn::session_base> /*session*/)
{
pbft_msg msg;
if (!msg.ParseFromString(boost::beast::detail::base64_decode(json["pbft-data"].asString())))
Expand Down Expand Up @@ -175,7 +179,7 @@ pbft::handle_message(const pbft_msg& msg)
switch (msg.type())
{
case PBFT_MSG_REQUEST :
this->handle_request(msg);
this->handle_request(msg.request());
break;
case PBFT_MSG_PREPREPARE :
this->handle_preprepare(msg);
Expand Down Expand Up @@ -226,7 +230,7 @@ pbft::preliminary_filter_msg(const pbft_msg& msg)
}

void
pbft::handle_request(const pbft_msg& msg)
pbft::handle_request(const pbft_request& msg, const std::shared_ptr<session_base>& session)
{
if (!this->is_primary())
{
Expand All @@ -240,7 +244,12 @@ pbft::handle_request(const pbft_msg& msg)
//TODO: keep track of what requests we've seen based on timestamp and only send preprepares once - KEP-329

const uint64_t request_seq = this->next_issued_sequence_number++;
auto op = this->find_operation(this->view, request_seq, msg.request());
auto op = this->find_operation(this->view, request_seq, msg);

if (session)
{
op->set_session(session);
}

this->do_preprepare(op);
}
Expand Down Expand Up @@ -376,8 +385,6 @@ pbft::do_committed(const std::shared_ptr<pbft_operation>& op)
LOG(debug) << "Operation " << op->debug_string() << " is committed-local";
op->end_commit_phase();

this->io_context->post(std::bind(&pbft_service_base::apply_operation, this->service, op->request, op->sequence));

if (this->audit_enabled)
{
audit_message msg;
Expand All @@ -387,6 +394,9 @@ pbft::do_committed(const std::shared_ptr<pbft_operation>& op)

this->broadcast(this->wrap_message(msg));
}

// Get a new shared pointer to the operation so that we can give pbft_service ownership on it
this->io_context->post(std::bind(&pbft_service_base::apply_operation, this->service, this->find_operation(op)));
}

size_t
Expand Down Expand Up @@ -414,6 +424,12 @@ pbft::find_operation(const pbft_msg& msg)
return this->find_operation(msg.view(), msg.sequence(), msg.request());
}

std::shared_ptr<pbft_operation>
pbft::find_operation(const std::shared_ptr<pbft_operation>& op)
{
return this->find_operation(op->view, op->sequence, op->request);
}

std::shared_ptr<pbft_operation>
pbft::find_operation(uint64_t view, uint64_t sequence, const pbft_request& request)
{
Expand Down Expand Up @@ -632,3 +648,39 @@ pbft::max_faulty_nodes() const
{
return this->peer_index.size()/3;
}

void
pbft::handle_database_message(const bzn::message& json, std::shared_ptr<bzn::session_base> session)
{
bzn_msg msg;
database_response response;

LOG(debug) << "got database message: " << json.toStyledString();

if (!json.isMember("msg"))
{
LOG(error) << "Invalid message: " << json.toStyledString().substr(0,MAX_MESSAGE_SIZE) << "...";
response.mutable_resp()->set_error(bzn::MSG_INVALID_CRUD_COMMAND);
session->send_message(std::make_shared<std::string>(response.SerializeAsString()), true);
return;
}

if (!msg.ParseFromString(boost::beast::detail::base64_decode(json["msg"].asString())))
{
LOG(error) << "Failed to decode message: " << json.toStyledString().substr(0,MAX_MESSAGE_SIZE) << "...";
response.mutable_resp()->set_error(bzn::MSG_INVALID_CRUD_COMMAND);
session->send_message(std::make_shared<std::string>(response.SerializeAsString()), true);
return;
}

*response.mutable_header() = msg.db().header();

pbft_request req;
req.set_operation(json.toStyledString());
req.set_timestamp(0); //TODO: KEP-611

this->handle_request(req, session);

LOG(debug) << "Sending request ack: " << response.ShortDebugString();
session->send_message(std::make_shared<std::string>(response.SerializeAsString()), false);
}
7 changes: 5 additions & 2 deletions pbft/pbft.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,13 @@ namespace bzn
private:
std::shared_ptr<pbft_operation> find_operation(uint64_t view, uint64_t sequence, const pbft_request& request);
std::shared_ptr<pbft_operation> find_operation(const pbft_msg& msg);
std::shared_ptr<pbft_operation> find_operation(const std::shared_ptr<pbft_operation>& op);

bzn::hash_t request_hash(const pbft_request& req);

bool preliminary_filter_msg(const pbft_msg& msg);

void handle_request(const pbft_msg& msg);
void handle_request(const pbft_request& msg, const std::shared_ptr<session_base>& session = nullptr);
void handle_preprepare(const pbft_msg& msg);
void handle_prepare(const pbft_msg& msg);
void handle_commit(const pbft_msg& msg);
Expand All @@ -86,7 +87,7 @@ namespace bzn
void do_prepared(const std::shared_ptr<pbft_operation>& op);
void do_committed(const std::shared_ptr<pbft_operation>& op);

void unwrap_message(const bzn::message& json, std::shared_ptr<bzn::session_base> session);
void handle_pbft_message(const bzn::message& json, std::shared_ptr<bzn::session_base> session);
bzn::message wrap_message(const pbft_msg& message, const std::string& debug_info = "");
bzn::message wrap_message(const audit_message& message, const std::string& debug_info = "");

Expand All @@ -101,6 +102,8 @@ namespace bzn
void checkpoint_reached_locally(uint64_t sequence);
void maybe_stabilize_checkpoint(const checkpoint_t& cp);

void handle_database_message(const bzn::message& json, std::shared_ptr<bzn::session_base> session);

inline size_t quorum_size() const;
inline size_t max_faulty_nodes() const;

Expand Down
11 changes: 11 additions & 0 deletions pbft/pbft_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,14 @@ pbft_operation::request_hash(const pbft_request& req)
return req.ShortDebugString();
}

void
pbft_operation::set_session(std::shared_ptr<bzn::session_base> session)
{
this->listener_session = std::move(session);
}

const std::shared_ptr<bzn::session_base>&
pbft_operation::session()
{
return this->listener_session;
}
7 changes: 7 additions & 0 deletions pbft/pbft_operation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "bootstrap/bootstrap_peers_base.hpp"
#include <cstdint>
#include <string>
#include <node/session_base.hpp>

namespace bzn
{
Expand All @@ -42,6 +43,8 @@ namespace bzn

pbft_operation(uint64_t view, uint64_t sequence, pbft_request msg, std::shared_ptr<const std::vector<peer_address_t>> peers);

void set_session(std::shared_ptr<bzn::session_base>);

static hash_t request_hash(const pbft_request& req);

operation_key_t get_operation_key();
Expand All @@ -59,6 +62,8 @@ namespace bzn
void begin_commit_phase();
void end_commit_phase();

const std::shared_ptr<bzn::session_base>& session();

const uint64_t view;
const uint64_t sequence;
const pbft_request request;
Expand All @@ -76,6 +81,8 @@ namespace bzn
std::set<bzn::uuid_t> prepares_seen;
std::set<bzn::uuid_t> commits_seen;

std::shared_ptr<bzn::session_base> listener_session;

};
}

2 changes: 1 addition & 1 deletion pbft/pbft_service_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ namespace bzn
* PBFT has concluded that an operation is committed-local, it can now be applied as soon as all earlier
* operations have been applied.
*/
virtual void apply_operation(const pbft_request& request, uint64_t sequence_number) = 0;
virtual void apply_operation(const std::shared_ptr<pbft_operation>& op) = 0;

/*
* Apply some read-only operation to the history of the service at some particular sequence number (either the
Expand Down
49 changes: 49 additions & 0 deletions pbft/test/pbft_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <pbft/test/pbft_test_common.hpp>
#include <set>
#include <mocks/mock_session_base.hpp>

namespace bzn::test
{
Expand Down Expand Up @@ -181,4 +182,52 @@ namespace bzn::test
mock_service->consolidate_log(2);
}

TEST_F(pbft_test, client_request_results_in_message_ack)
{
auto mock_session = std::make_shared<NiceMock<bzn::Mocksession_base>>();
std::string last_err;

EXPECT_CALL(*mock_session, send_message(A<std::shared_ptr<std::string>>(), _)).WillRepeatedly(Invoke(
[&](auto msg, auto)
{
database_response resp;
resp.ParseFromString(*msg);

last_err = resp.resp().error();
}
));

this->build_pbft();
bzn::message msg;
msg["bzn-api"] = "database";

this->database_handler(msg, mock_session);
EXPECT_NE(last_err, "");

msg["msg"] = "not a valid crud message";
this->database_handler(msg, mock_session);
EXPECT_NE(last_err, "");

bzn_msg payload;
msg["msg"] = boost::beast::detail::base64_encode(payload.SerializeAsString());
this->database_handler(msg, mock_session);
EXPECT_EQ(last_err, "");
}

TEST_F(pbft_test, client_request_executed_results_in_message_response)
{
auto mock_session = std::make_shared<bzn::Mocksession_base>();
EXPECT_CALL(*mock_session, send_datagram(_)).Times(Exactly(1));

pbft_request msg;
auto peers = std::make_shared<const std::vector<bzn::peer_address_t>>();
auto op = std::make_shared<pbft_operation>(1, 1, msg, peers);
op->set_session(mock_session);

dummy_pbft_service service(this->mock_io_context);
service.register_execute_handler([](auto, auto){});

service.apply_operation(op);
}

}
11 changes: 11 additions & 0 deletions pbft/test/pbft_test_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ namespace bzn::test
}
));

EXPECT_CALL(*(this->mock_node), register_for_message("database", _))
.Times(Exactly(1))
.WillOnce(
Invoke(
[&](const auto&, auto handler)
{
this->database_handler = handler;
return true;
}
));

EXPECT_CALL(*(this->mock_io_context), make_unique_steady_timer())
.Times(AtMost(1))
.WillOnce(
Expand Down
1 change: 1 addition & 0 deletions pbft/test/pbft_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ namespace bzn::test

std::function<void(const pbft_request&, uint64_t)> service_execute_handler;
bzn::message_handler message_handler;
bzn::message_handler database_handler;

bzn::uuid_t uuid = TEST_NODE_UUID;

Expand Down

0 comments on commit 23ef988

Please sign in to comment.