Skip to content
This repository has been archived by the owner on Mar 3, 2020. It is now read-only.

Commit

Permalink
KEP-1760 - Error handling for non-db errors
Browse files Browse the repository at this point in the history
  • Loading branch information
paularchard committed Oct 30, 2019
1 parent 8edae3e commit 76691ab
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 29 deletions.
4 changes: 4 additions & 0 deletions crypto/crypto.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ crypto::extract_payload(const bzn_envelope& msg)
{
return msg.checkpoint_msg();
}
case bzn_envelope::kSwarmError:
{
return msg.swarm_error();
}
default :
{
throw std::runtime_error(
Expand Down
59 changes: 47 additions & 12 deletions pbft/pbft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ pbft::start()
this->node->register_for_message(bzn_envelope::kDatabaseResponse,
std::bind(&pbft::handle_database_response_message, shared_from_this(), std::placeholders::_1, std::placeholders::_2));

this->node->register_for_message(bzn_envelope::kSwarmError,
std::bind(&pbft::handle_swarm_error_response_message, shared_from_this(), std::placeholders::_1, std::placeholders::_2));

this->node->register_error_handler([weak_this = this->weak_from_this()](const boost::asio::ip::tcp::endpoint& ep, const boost::system::error_code& ec)
{
if (auto strong_this = weak_this.lock())
Expand Down Expand Up @@ -302,25 +305,35 @@ pbft::setup_request_operation(const bzn_envelope& request_env, const bzn::hash_t

void
pbft::send_error_response(const bzn_envelope& request_env, const std::shared_ptr<session_base>& session
, const std::string& msg) const
, const std::string& hash, const std::string& msg) const
{
database_msg req;
if (session && req.ParseFromString(request_env.database_msg()))
{
database_response resp;
*resp.mutable_header() = req.header();
resp.mutable_error()->set_message(msg);
swarm_error err;
*err.mutable_message() = msg;
*err.mutable_data() = std::to_string(req.header().nonce());
*err.mutable_hash() = hash;

bzn_envelope response;
response.set_swarm_error(err.SerializeAsString());

response.set_sender(this->uuid);
response.set_timestamp(this->now());
response.set_swarm_id(this->options->get_swarm_id());

session->send_message(std::make_shared<std::string>(this->wrap_message(resp).SerializeAsString()));
return session->send_message(std::make_shared<std::string>(response.SerializeAsString()));
}
}

void
pbft::handle_request(const bzn_envelope& request_env, const std::shared_ptr<session_base>& session)
{
const auto hash = this->crypto->hash(request_env);

if (request_env.timestamp() < (this->now() - MAX_REQUEST_AGE_MS) || request_env.timestamp() > (this->now() + MAX_REQUEST_AGE_MS))
{
this->send_error_response(request_env, session, TIMESTAMP_ERROR_MSG);
this->send_error_response(request_env, session, hash, TIMESTAMP_ERROR_MSG);

LOG(info) << "Rejecting request because it is outside allowable timestamp range: "
<< request_env.ShortDebugString();
Expand All @@ -334,14 +347,12 @@ pbft::handle_request(const bzn_envelope& request_env, const std::shared_ptr<sess
const size_t OVERHEAD_SIZE{512};
if (!request_env.database_msg().empty() && request_env.database_msg().size() >= (bzn::MAX_VALUE_SIZE - OVERHEAD_SIZE))
{
this->send_error_response(request_env, session, TOO_LARGE_ERROR_MSG);
this->send_error_response(request_env, session, hash, TOO_LARGE_ERROR_MSG);

LOG(warning) << "Rejecting request because it is too large [" << request_env.database_msg().size() << " bytes]";
return;
}

const auto hash = this->crypto->hash(request_env);

if (session)
{
if (this->sessions_waiting_on_forwarded_requests.find(hash) == this->sessions_waiting_on_forwarded_requests.end())
Expand All @@ -360,16 +371,16 @@ pbft::handle_request(const bzn_envelope& request_env, const std::shared_ptr<sess

if ((this->next_issued_sequence_number.value()) - this->last_executed_sequence_number > this->options->get_admission_window())
{
// TODO: send error message (KEP-1760)
LOG(debug) << "Dropping request because we're too busy";
LOG(debug) << "Rejecting request because we're too busy";
this->send_error_response(request_env, session, hash, TOO_BUSY_ERROR_MSG);
return;
}

// keep track of what requests we've seen based on timestamp and only send preprepares once
if (this->already_seen_request(request_env, hash))
{
// TODO: send error message to client
LOG(debug) << "Rejecting duplicate request: " << request_env.ShortDebugString().substr(0, MAX_MESSAGE_SIZE);
this->send_error_response(request_env, session, hash, DUPLICATE_ERROR_MSG);
return;
}

Expand Down Expand Up @@ -941,6 +952,30 @@ pbft::handle_database_response_message(const bzn_envelope& msg, std::shared_ptr<
LOG(error) << "failed to read database response";
}

void
pbft::handle_swarm_error_response_message(const bzn_envelope& msg, std::shared_ptr<bzn::session_base> /*session*/)
{
swarm_error err;

std::lock_guard<std::mutex> lock(this->pbft_lock);

if (err.ParseFromString(msg.swarm_error()))
{
if (const auto session_it = this->sessions_waiting_on_forwarded_requests.find(err.hash());
session_it != this->sessions_waiting_on_forwarded_requests.end())
{
session_it->second->send_message(std::make_shared<bzn::encoded_message>(msg.SerializeAsString()));
this->monitor->finish_timer(bzn::statistic::request_latency, err.hash());
return;
}

LOG(warning) << "session not found for swarm error response";
return;
}

LOG(error) << "failed to read swarm error response";
}

uint64_t
pbft::get_low_water_mark()
{
Expand Down
6 changes: 5 additions & 1 deletion pbft/pbft.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ namespace
const std::string SAVED_NEWVIEW_KEY{"saved_newview"};
const std::string TIMESTAMP_ERROR_MSG{"INVALID TIMESTAMP"};
const std::string TOO_LARGE_ERROR_MSG{"REQUEST TOO LARGE"};
const std::string TOO_BUSY_ERROR_MSG{"SERVER TOO BUSY"};
const std::string DUPLICATE_ERROR_MSG{"DUPLICATE REQUEST"};
}


Expand Down Expand Up @@ -89,6 +91,8 @@ namespace bzn

void handle_database_response_message(const bzn_envelope& msg, std::shared_ptr<bzn::session_base> session);

void handle_swarm_error_response_message(const bzn_envelope& msg, std::shared_ptr<bzn::session_base> session);

bool is_primary() const override;

std::optional<peer_address_t> get_current_primary() const override;
Expand Down Expand Up @@ -187,7 +191,7 @@ namespace bzn
void broadcast(const bzn_envelope& message);

void send_error_response(const bzn_envelope& request_env, const std::shared_ptr<session_base>& session
, const std::string& msg) const;
, const std::string& hash, const std::string& msg) const;


void handle_audit_heartbeat_timeout(const boost::system::error_code& ec);
Expand Down
46 changes: 34 additions & 12 deletions pbft/test/pbft_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,16 +225,6 @@ namespace bzn::test
service.apply_operation(op);
}

TEST_F(pbft_test, messages_before_low_water_mark_dropped)
{
this->build_pbft();
EXPECT_CALL(*mock_node, send_signed_message(A<const boost::asio::ip::tcp::endpoint&>(), ResultOf(is_prepare, Eq(true))))
.Times(Exactly(0));

this->preprepare_msg.set_sequence(this->pbft->get_low_water_mark());
pbft->handle_message(preprepare_msg, default_original_msg);
}

MATCHER(operation_ptr_has_session, "")
{
return arg->has_session();
Expand Down Expand Up @@ -357,12 +347,44 @@ namespace bzn::test
.Times(Exactly(TEST_PEER_LIST.size() * reqs));

this->request_msg.set_timestamp(now());
for (size_t i = 0; i < reqs + 1; i++)
for (size_t i = 0; i < reqs; i++)
{
pbft->handle_database_message(this->request_msg, this->mock_session);
this->request_msg.set_timestamp(this->request_msg.timestamp() + 1);
}

// TODO: check for error response after doing KEP-1760
EXPECT_CALL(*mock_session, send_message(_))
.Times(Exactly(1))
.WillOnce(Invoke([&](auto& msg)
{
bzn_envelope env;
ASSERT_TRUE(env.ParseFromString(*msg));
ASSERT_EQ(env.payload_case(), bzn_envelope::kSwarmError);

swarm_error err;
ASSERT_TRUE(err.ParseFromString(env.swarm_error()));
ASSERT_EQ(err.message(), "SERVER TOO BUSY");
}));

pbft->handle_database_message(this->request_msg, this->mock_session);

}

TEST_F(pbft_test, too_big_request_generates_error)
{
this->build_pbft();

database_msg dmsg;
dmsg.mutable_create()->set_key(std::string("key"));
dmsg.mutable_create()->set_value(std::string(bzn::MAX_VALUE_SIZE + 1, 'a'));

bzn_envelope request;
request.set_database_msg(dmsg.SerializeAsString());
request.set_sender(TEST_NODE_UUID);

EXPECT_CALL(*this->mock_session, send_message(ResultOf(test::is_swarm_error, Eq(true))));
pbft->handle_database_message(request, this->mock_session);
}


}
7 changes: 7 additions & 0 deletions pbft/test/pbft_test_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,13 @@ namespace bzn::test
return msg.type() == PBFT_MSG_NEWVIEW;
}

bool
is_swarm_error(std::shared_ptr<std::string> msg)
{
bzn_envelope env;
return env.ParseFromString(*msg) && env.payload_case() == bzn_envelope::kSwarmError;
}

bzn_envelope
from(uuid_t uuid)
{
Expand Down
1 change: 1 addition & 0 deletions pbft/test/pbft_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ namespace bzn::test
bool is_audit(std::shared_ptr<bzn_envelope> msg);
bool is_viewchange(std::shared_ptr<bzn_envelope> wrapped_msg);
bool is_newview(std::shared_ptr<bzn_envelope> wrapped_msg);
bool is_swarm_error(std::shared_ptr<std::string> msg);

bzn_envelope from(uuid_t uuid);
}
11 changes: 9 additions & 2 deletions pbft/test/pbft_timestamp_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ namespace bzn
// this time no pre-prepare should be issued
EXPECT_CALL(*this->mock_node, send_signed_message(A<const boost::asio::ip::tcp::endpoint&>(), ResultOf(test::is_preprepare, Eq(true))))
.Times(Exactly(0));
this->handle_request(request2);
std::shared_ptr<bzn::mock_session_base> session = std::make_shared<bzn::mock_session_base>();
EXPECT_CALL(*session, send_message(ResultOf(test::is_swarm_error, Eq(true))));

this->handle_request(request2, session);
}

TEST_F(pbft_proto_test, similar_request_generates_preprepare)
Expand Down Expand Up @@ -137,7 +140,11 @@ namespace bzn
// we should NOT get pre-prepare messages since this is an old request
EXPECT_CALL(*this->mock_node, send_signed_message(A<const boost::asio::ip::tcp::endpoint&>(), ResultOf(test::is_preprepare, Eq(true))))
.Times(Exactly(0));
this->handle_request(request);

std::shared_ptr<bzn::mock_session_base> session = std::make_shared<bzn::mock_session_base>();
EXPECT_CALL(*session, send_message(ResultOf(test::is_swarm_error, Eq(true))));

this->handle_request(request, session);
}

TEST_F(pbft_proto_test, range_test)
Expand Down
7 changes: 5 additions & 2 deletions pbft/test/pbft_viewchange_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,9 @@ namespace bzn
std::shared_ptr<bzn::mock_pbft_service_base> mock_service2 =
std::make_shared<NiceMock<bzn::mock_pbft_service_base>>();

EXPECT_CALL(*(mock_node2), register_error_handler(_))
.Times(Exactly(1));

EXPECT_CALL(*(mock_io_context2), make_unique_steady_timer())
.Times(AtMost(4)).WillOnce(Invoke([&]()
{ return std::move(audit_heartbeat_timer2); }))
Expand Down Expand Up @@ -344,7 +347,7 @@ namespace bzn
run_transaction_through_primary();
this->stabilize_checkpoint(100);

for (size_t i = 0; i < 50; i++)
for (size_t i = 0; i < 20; i++)
{
run_transaction_through_primary(false);
}
Expand Down Expand Up @@ -382,7 +385,7 @@ namespace bzn
if (p.uuid == TEST_NODE_UUID)
{
EXPECT_CALL(*this->mock_node, send_signed_message(A<const boost::asio::ip::tcp::endpoint&>(), ResultOf(test::is_prepare, Eq(true))))
.Times(Exactly(50 * TEST_PEER_LIST.size()));
.Times(Exactly(20 * TEST_PEER_LIST.size()));
pbft_msg msg;
ASSERT_TRUE(msg.ParseFromString(wmsg->pbft()));
wmsg->set_sender("uuid2");
Expand Down
9 changes: 9 additions & 0 deletions proto/bluzelle.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@

syntax = "proto3";

message swarm_error
{
string message = 1;
bytes data = 2;
bytes hash = 3;
}


message bzn_envelope
{
string swarm_id = 1;
Expand All @@ -34,5 +42,6 @@ message bzn_envelope
bytes status_request = 17;
bytes status_response = 18;
bytes checkpoint_msg = 19;
bytes swarm_error = 20;
}
}

0 comments on commit 76691ab

Please sign in to comment.