Skip to content
This repository has been archived by the owner on Mar 3, 2020. It is now read-only.

Commit

Permalink
KEP-466: Address requests by their hash
Browse files Browse the repository at this point in the history
  • Loading branch information
isabelsavannah committed Nov 7, 2018
1 parent 3141b02 commit de52fef
Show file tree
Hide file tree
Showing 24 changed files with 249 additions and 147 deletions.
2 changes: 2 additions & 0 deletions include/bluzelle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ namespace bzn

using session_id = uint64_t;

using hash_t = std::string;

} // bzn


Expand Down
4 changes: 2 additions & 2 deletions mocks/mock_pbft_failure_detector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ namespace bzn
class Mockpbft_failure_detector_base : public pbft_failure_detector_base
{
public:
MOCK_METHOD1(request_seen, void(const pbft_request& req));
MOCK_METHOD1(request_seen, void(const bzn::hash_t& req));

MOCK_METHOD1(request_executed, void(const pbft_request& req));
MOCK_METHOD1(request_executed, void(const bzn::hash_t& req));

MOCK_METHOD1(register_failure_handler, void(std::function<void()> handler));
};
Expand Down
6 changes: 3 additions & 3 deletions mocks/mock_pbft_service_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ namespace bzn {

class mock_pbft_service_base : public pbft_service_base {
public:
MOCK_METHOD2(apply_operation,
void(const pbft_request& request, uint64_t sequence_number));
MOCK_METHOD1(apply_operation,
void(std::shared_ptr<bzn::pbft_operation>));
MOCK_CONST_METHOD2(query,
void(const pbft_request& request, uint64_t sequence_number));
MOCK_CONST_METHOD1(service_state_hash,
bzn::hash_t(uint64_t sequence_number));
MOCK_METHOD1(consolidate_log,
void(uint64_t sequence_number));
MOCK_METHOD1(register_execute_handler,
void(std::function<void(const pbft_request&, uint64_t)> handler));
void(bzn::execute_handler_t handler));
MOCK_METHOD1(apply_operation,
void(const std::shared_ptr<pbft_operation>&));
};
Expand Down
6 changes: 3 additions & 3 deletions pbft/database_pbft_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ database_pbft_service::apply_operation(const std::shared_ptr<bzn::pbft_operation
std::lock_guard<std::mutex> lock(this->lock);

// store op...
if (auto result = this->unstable_storage->create(this->uuid, std::to_string(op->sequence), op->request.SerializeAsString());
if (auto result = this->unstable_storage->create(this->uuid, std::to_string(op->sequence), op->get_request().SerializeAsString());
result != bzn::storage_base::result::ok)
{
LOG(fatal) << "failed to store pbft request: " << op->request.DebugString() << ", " << uint32_t(result);
LOG(fatal) << "failed to store pbft request: " << op->get_request().DebugString() << ", " << uint32_t(result);

// these are fatal... something bad is going on.
throw std::runtime_error("Failed to store pbft request! (" + std::to_string(uint8_t(result)) + ")");
Expand Down Expand Up @@ -104,7 +104,7 @@ database_pbft_service::process_awaiting_operations()
this->crud->handle_request(request.operation(), nullptr);
}

this->io_context->post(std::bind(this->execute_handler, request, this->next_request_sequence));
this->io_context->post(std::bind(this->execute_handler, nullptr)); // TODO: need to find the pbft_operation here; requires pbft_operation not being an in-memory construct

if (auto result = this->unstable_storage->remove(uuid, key); result != bzn::storage_base::result::ok)
{
Expand Down
2 changes: 1 addition & 1 deletion pbft/dummy_pbft_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ dummy_pbft_service::apply_operation(const std::shared_ptr<pbft_operation>& op)
this->send_execute_response(op);

// todo: use shared from this as post could act on a long gone dummy_pbft_service?
this->io_context->post(std::bind(this->execute_handler, op->request, this->next_request_sequence));
this->io_context->post(std::bind(this->execute_handler, op));

this->waiting_operations.erase(this->next_request_sequence);
this->next_request_sequence++;
Expand Down
76 changes: 46 additions & 30 deletions pbft/pbft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ pbft::pbft(
, bzn::uuid_t uuid
, std::shared_ptr<pbft_service_base> service
, std::shared_ptr<pbft_failure_detector_base> failure_detector
, std::shared_ptr<bzn::crypto_base> crypto
)
: node(std::move(node))
, uuid(std::move(uuid))
, service(std::move(service))
, failure_detector(std::move(failure_detector))
, io_context(io_context)
, audit_heartbeat_timer(this->io_context->make_unique_steady_timer())
, crypto(std::move(crypto))
{
if (peers.empty())
{
Expand Down Expand Up @@ -74,16 +76,16 @@ pbft::start()

this->service->register_execute_handler(
[weak_this = this->weak_from_this(), fd = this->failure_detector]
(const pbft_request& req, uint64_t sequence)
(std::shared_ptr<pbft_operation> op)
{
fd->request_executed(req);
fd->request_executed(op->request_hash);

if (sequence % CHECKPOINT_INTERVAL == 0)
if (op->sequence % CHECKPOINT_INTERVAL == 0)
{
auto strong_this = weak_this.lock();
if(strong_this)
{
strong_this->checkpoint_reached_locally(sequence);
strong_this->checkpoint_reached_locally(op->sequence);
}
else
{
Expand Down Expand Up @@ -183,11 +185,6 @@ pbft::handle_message(const pbft_msg& msg, const bzn_envelope& original_msg)
return;
}

if (msg.has_request())
{
this->failure_detector->request_seen(msg.request());
}

std::lock_guard<std::mutex> lock(this->pbft_lock);

switch (msg.type())
Expand Down Expand Up @@ -238,10 +235,11 @@ pbft::preliminary_filter_msg(const pbft_msg& msg)
}

std::shared_ptr<pbft_operation>
pbft::setup_request_operation(const pbft_request& msg, const std::shared_ptr<session_base>& session)
pbft::setup_request_operation(const bzn::encoded_message& request, const std::shared_ptr<session_base>& session)
{
const uint64_t request_seq = this->next_issued_sequence_number++;
auto op = this->find_operation(this->view, request_seq, msg);
auto op = this->find_operation(this->view, request_seq, this->crypto->hash(request));
op->record_request(request);

if (session)
{
Expand All @@ -252,7 +250,7 @@ pbft::setup_request_operation(const pbft_request& msg, const std::shared_ptr<ses
}

void
pbft::handle_request(const pbft_request& msg, const bzn::json_message& original_msg, const std::shared_ptr<session_base>& session)
pbft::handle_request(const pbft_request& /*msg*/, const bzn::json_message& original_msg, const std::shared_ptr<session_base>& session)
{
if (!this->is_primary())
{
Expand All @@ -265,10 +263,25 @@ pbft::handle_request(const pbft_request& msg, const bzn::json_message& original_

//TODO: keep track of what requests we've seen based on timestamp and only send preprepares once - KEP-329

auto op = setup_request_operation(msg, session);
auto op = setup_request_operation(original_msg.toStyledString(), session);
this->do_preprepare(op);
}

void
pbft::maybe_record_request(const pbft_msg& msg, const std::shared_ptr<pbft_operation>& op)
{
if (!msg.request().empty() && !op->has_request())
{
if (this->crypto->hash(msg.request()) != msg.request_hash())
{
LOG(info) << "Not recording request because its hash does not match";
return;
}

op->record_request(msg.request());
}
}

void
pbft::handle_preprepare(const pbft_msg& msg, const bzn_envelope& original_msg)
{
Expand All @@ -278,7 +291,7 @@ pbft::handle_preprepare(const pbft_msg& msg, const bzn_envelope& original_msg)

if (auto lookup = this->accepted_preprepares.find(log_key);
lookup != this->accepted_preprepares.end()
&& std::get<2>(lookup->second) != pbft_operation::request_hash(msg.request()))
&& std::get<2>(lookup->second) != msg.request_hash())
{

LOG(debug) << "Rejecting preprepare because I've already accepted a conflicting one \n";
Expand All @@ -288,11 +301,12 @@ pbft::handle_preprepare(const pbft_msg& msg, const bzn_envelope& original_msg)
{
auto op = this->find_operation(msg);
op->record_preprepare(original_msg);
this->maybe_record_request(msg, op);

// This assignment will be redundant if we've seen this preprepare before, but that's fine
accepted_preprepares[log_key] = op->get_operation_key();

if (msg.has_request() && msg.request().type() == PBFT_REQ_NEW_CONFIG)
if (op->has_request() && op->get_request().type() == PBFT_REQ_NEW_CONFIG)
{
this->handle_config_message(msg, op);
}
Expand All @@ -309,6 +323,7 @@ pbft::handle_prepare(const pbft_msg& msg, const bzn_envelope& original_msg)
auto op = this->find_operation(msg);

op->record_prepare(original_msg);
this->maybe_record_request(msg, op);
this->maybe_advance_operation_state(op);
}

Expand All @@ -319,6 +334,7 @@ pbft::handle_commit(const pbft_msg& msg, const bzn_envelope& original_msg)
auto op = this->find_operation(msg);

op->record_commit(original_msg);
this->maybe_record_request(msg, op);
this->maybe_advance_operation_state(op);
}

Expand Down Expand Up @@ -400,7 +416,7 @@ pbft::common_message_setup(const std::shared_ptr<pbft_operation>& op, pbft_msg_t
pbft_msg msg;
msg.set_view(op->view);
msg.set_sequence(op->sequence);
msg.set_allocated_request(new pbft_request(op->request));
msg.set_request_hash(op->request_hash);
msg.set_type(type);

return msg;
Expand All @@ -412,6 +428,7 @@ pbft::do_preprepare(const std::shared_ptr<pbft_operation>& op)
LOG(debug) << "Doing preprepare for operation " << op->debug_string();

pbft_msg msg = this->common_message_setup(op, PBFT_MSG_PREPREPARE);
msg.set_request(op->get_encoded_request());

this->broadcast(this->wrap_message(msg, "preprepare"));
}
Expand All @@ -430,10 +447,10 @@ void
pbft::do_prepared(const std::shared_ptr<pbft_operation>& op)
{
// accept new configuration if applicable
if (op->request.type() == PBFT_REQ_NEW_CONFIG && op->request.has_config())
if (op->has_request() && op->get_request().type() == PBFT_REQ_NEW_CONFIG && op->get_request().has_config())
{
pbft_configuration config;
if (config.from_string(op->request.config().configuration()))
if (config.from_string(op->get_request().config().configuration()))
{
this->configurations.enable(config.get_hash());
}
Expand All @@ -451,10 +468,10 @@ void
pbft::do_committed(const std::shared_ptr<pbft_operation>& op)
{
// commit new configuration if applicable
if (op->request.type() == PBFT_REQ_NEW_CONFIG && op->request.has_config())
if (op->has_request() && op->get_request().type() == PBFT_REQ_NEW_CONFIG && op->get_request().has_config())
{
pbft_configuration config;
if (config.from_string(op->request.config().configuration()))
if (config.from_string(op->get_request().config().configuration()))
{
// get rid of all other previous configs, except for currently active one
this->configurations.remove_prior_to(config.get_hash());
Expand All @@ -467,7 +484,7 @@ pbft::do_committed(const std::shared_ptr<pbft_operation>& op)
if (this->audit_enabled)
{
audit_message msg;
msg.mutable_pbft_commit()->set_operation(pbft_operation::request_hash(op->request));
msg.mutable_pbft_commit()->set_operation(op->request_hash);
msg.mutable_pbft_commit()->set_sequence_number(op->sequence);
msg.mutable_pbft_commit()->set_sender_uuid(this->uuid);

Expand Down Expand Up @@ -500,27 +517,26 @@ pbft::get_primary() const
std::shared_ptr<pbft_operation>
pbft::find_operation(const pbft_msg& msg)
{
return this->find_operation(msg.view(), msg.sequence(), msg.request());
return this->find_operation(msg.view(), msg.sequence(), msg.request_hash());
}

std::shared_ptr<pbft_operation>
pbft::find_operation(const std::shared_ptr<pbft_operation>& op)
{
return this->find_operation(op->view, op->sequence, op->request);
return this->find_operation(op->view, op->sequence, op->request_hash);
}

std::shared_ptr<pbft_operation>
pbft::find_operation(uint64_t view, uint64_t sequence, const pbft_request& request)
pbft::find_operation(uint64_t view, uint64_t sequence, const bzn::hash_t& req_hash)
{
auto key = bzn::operation_key_t(view, sequence, pbft_operation::request_hash(request));
auto key = bzn::operation_key_t(view, sequence, req_hash);

auto lookup = operations.find(key);
if (lookup == operations.end())
{
LOG(debug) << "Creating operation for seq " << sequence << " view " << view << " req "
<< request.ShortDebugString();
LOG(debug) << "Creating operation for seq " << sequence << " view " << view << " req " << req_hash;

std::shared_ptr<pbft_operation> op = std::make_shared<pbft_operation>(view, sequence, request,
std::shared_ptr<pbft_operation> op = std::make_shared<pbft_operation>(view, sequence, req_hash,
this->current_peers_ptr());
auto result = operations.emplace(std::piecewise_construct, std::forward_as_tuple(std::move(key)), std::forward_as_tuple(op));

Expand Down Expand Up @@ -870,7 +886,7 @@ pbft::broadcast_new_configuration(pbft_configuration::shared_const_ptr config)
cfg_msg->set_configuration(config->to_string());
req.set_allocated_config(cfg_msg);

auto op = this->setup_request_operation(req);
auto op = this->setup_request_operation(req.SerializeAsString());
this->do_preprepare(op);
}

Expand All @@ -883,7 +899,7 @@ pbft::is_configuration_acceptable_in_new_view(hash_t config_hash)
void
pbft::handle_config_message(const pbft_msg& msg, const std::shared_ptr<pbft_operation>& op)
{
auto const& request = op->request;
auto const& request = op->get_request();
assert(request.type() == PBFT_REQ_NEW_CONFIG);
auto config = std::make_shared<pbft_configuration>();
if (msg.type() == PBFT_MSG_PREPREPARE && config->from_string(request.config().configuration()))
Expand Down
7 changes: 5 additions & 2 deletions pbft/pbft.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ namespace bzn
, bzn::uuid_t uuid
, std::shared_ptr<pbft_service_base> service
, std::shared_ptr<pbft_failure_detector_base> failure_detector
, std::shared_ptr<bzn::crypto_base> crypto
);

void start() override;
Expand Down Expand Up @@ -84,7 +85,7 @@ namespace bzn
bzn::json_message get_status() override;

private:
std::shared_ptr<pbft_operation> find_operation(uint64_t view, uint64_t sequence, const pbft_request& request);
std::shared_ptr<pbft_operation> find_operation(uint64_t view, uint64_t sequence, const bzn::hash_t& request_hash);
std::shared_ptr<pbft_operation> find_operation(const pbft_msg& msg);
std::shared_ptr<pbft_operation> find_operation(const std::shared_ptr<pbft_operation>& op);

Expand All @@ -110,7 +111,7 @@ namespace bzn
bzn::encoded_message wrap_message(const audit_message& message, const std::string& debug_info = "");

pbft_msg common_message_setup(const std::shared_ptr<pbft_operation>& op, pbft_msg_type type);
std::shared_ptr<pbft_operation> setup_request_operation(const pbft_request& msg
std::shared_ptr<pbft_operation> setup_request_operation(const bzn::encoded_message& msg
, const std::shared_ptr<session_base>& session = nullptr);

void broadcast(const bzn::encoded_message& message);
Expand All @@ -137,6 +138,8 @@ namespace bzn
bool move_to_new_configuration(hash_t config_hash);
bool proposed_config_is_acceptable(std::shared_ptr<pbft_configuration> config);

void maybe_record_request(const pbft_msg& msg, const std::shared_ptr<pbft_operation>& op);

// Using 1 as first value here to distinguish from default value of 0 in protobuf
uint64_t view = 1;
uint64_t next_issued_sequence_number = 1;
Expand Down

0 comments on commit de52fef

Please sign in to comment.