Skip to content
This repository has been archived by the owner on Mar 3, 2020. It is now read-only.

Commit

Permalink
KEP-793: Refactor pbft_operation type to an abstract base, clean up its
Browse files Browse the repository at this point in the history
internals a bit to shrink the surface area of that interface.
  • Loading branch information
isabelsavannah committed Nov 28, 2018
1 parent 8533322 commit 2e7d75b
Show file tree
Hide file tree
Showing 14 changed files with 398 additions and 215 deletions.
3 changes: 2 additions & 1 deletion pbft/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ add_library(pbft STATIC
pbft.hpp
pbft.cpp
pbft_operation.hpp
pbft_operation.cpp
pbft_memory_operation.hpp
pbft_memory_operation.cpp
pbft_configuration.hpp
pbft_configuration.cpp
dummy_pbft_service.cpp
Expand Down
4 changes: 2 additions & 2 deletions pbft/database_pbft_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ database_pbft_service::apply_operation(const std::shared_ptr<bzn::pbft_operation
std::lock_guard<std::mutex> lock(this->lock);

// store op...
if (auto result = this->unstable_storage->create(this->uuid, std::to_string(op->sequence), op->get_database_msg().SerializeAsString());
if (auto result = this->unstable_storage->create(this->uuid, std::to_string(op->get_sequence()), op->get_database_msg().SerializeAsString());
result != bzn::storage_result::ok)
{
LOG(fatal) << "failed to store pbft request: " << op->get_database_msg().DebugString() << ", " << uint32_t(result);
Expand All @@ -60,7 +60,7 @@ database_pbft_service::apply_operation(const std::shared_ptr<bzn::pbft_operation
}

// store requester session for eventual response...
this->operations_awaiting_result[op->sequence] = op;
this->operations_awaiting_result[op->get_sequence()] = op;

this->process_awaiting_operations();
}
Expand Down
6 changes: 3 additions & 3 deletions pbft/dummy_pbft_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ dummy_pbft_service::apply_operation(const std::shared_ptr<pbft_operation>& op)
{
std::lock_guard<std::mutex> lock(this->lock);

this->waiting_operations[op->sequence] = std::move(op);
this->waiting_operations[op->get_sequence()] = std::move(op);

while (this->waiting_operations.count(this->next_request_sequence) > 0)
{
auto op = waiting_operations[this->next_request_sequence];

LOG(info) << "Executing request " << op->debug_string() << ", sequence " << this->next_request_sequence
LOG(info) << "Executing request sequence " << this->next_request_sequence
<< "\n";

this->send_execute_response(op);
Expand Down Expand Up @@ -93,7 +93,7 @@ void
dummy_pbft_service::send_execute_response(const std::shared_ptr<pbft_operation>& op)
{
database_response resp;
resp.mutable_read()->set_value("dummy database execution of " + op->debug_string());
resp.mutable_read()->set_value("dummy database execution of sequence" + op->get_sequence());

if (auto session = op->session().lock())
{
Expand Down
94 changes: 56 additions & 38 deletions pbft/pbft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

#include <pbft/pbft.hpp>
#include <pbft/pbft_memory_operation.hpp>
#include <utils/make_endpoint.hpp>
#include <utils/bytes_to_debug_string.hpp>
#include <google/protobuf/text_format.h>
Expand Down Expand Up @@ -80,27 +81,26 @@ pbft::start()

this->service->register_execute_handler(
[weak_this = this->weak_from_this(), fd = this->failure_detector]
(std::shared_ptr<pbft_operation> op)
(std::shared_ptr<pbft_operation> op)
{
fd->request_executed(op->get_request_hash());

if (op->get_sequence() % CHECKPOINT_INTERVAL == 0)
{
auto strong_this = weak_this.lock();
if(strong_this)
{
// tell service to save the next checkpoint after this one
strong_this->service->save_service_state_at(op->get_sequence() + CHECKPOINT_INTERVAL);

fd->request_executed(op->request_hash);

if (op->sequence % CHECKPOINT_INTERVAL == 0)
{
auto strong_this = weak_this.lock();
if(strong_this)
{
// tell service to save the next checkpoint after this one
strong_this->service->save_service_state_at(op->sequence + CHECKPOINT_INTERVAL);

strong_this->checkpoint_reached_locally(op->sequence);
}
else
{
throw std::runtime_error("pbft_service callback failed because pbft does not exist");
}
}
strong_this->checkpoint_reached_locally(op->get_sequence());
}
else
{
throw std::runtime_error("pbft_service callback failed because pbft does not exist");
}
}
}
);

this->failure_detector->register_failure_handler(
Expand Down Expand Up @@ -353,7 +353,7 @@ pbft::handle_preprepare(const pbft_msg& msg, const bzn_envelope& original_msg)
else
{
auto op = this->find_operation(msg);
op->record_preprepare(original_msg);
op->record_pbft_msg(msg, original_msg);
this->maybe_record_request(msg, op);

// This assignment will be redundant if we've seen this preprepare before, but that's fine
Expand All @@ -375,7 +375,7 @@ pbft::handle_prepare(const pbft_msg& msg, const bzn_envelope& original_msg)
// Prepare messages are never rejected, assuming the sanity checks passed
auto op = this->find_operation(msg);

op->record_prepare(original_msg);
op->record_pbft_msg(msg, original_msg);
this->maybe_record_request(msg, op);
this->maybe_advance_operation_state(op);
}
Expand All @@ -386,7 +386,7 @@ pbft::handle_commit(const pbft_msg& msg, const bzn_envelope& original_msg)
// Commit messages are never rejected, assuming the sanity checks passed
auto op = this->find_operation(msg);

op->record_commit(original_msg);
op->record_pbft_msg(msg, original_msg);
this->maybe_record_request(msg, op);
this->maybe_advance_operation_state(op);
}
Expand Down Expand Up @@ -524,12 +524,12 @@ pbft::broadcast(const bzn::encoded_message& msg)
void
pbft::maybe_advance_operation_state(const std::shared_ptr<pbft_operation>& op)
{
if (op->get_state() == pbft_operation_state::prepare && op->is_prepared())
if (op->get_stage() == pbft_operation_stage::prepare && op->is_prepared())
{
this->do_prepared(op);
}

if (op->get_state() == pbft_operation_state::commit && op->is_committed())
if (op->get_stage() == pbft_operation_stage::commit && op->is_committed())
{
this->do_committed(op);
}
Expand All @@ -539,9 +539,9 @@ pbft_msg
pbft::common_message_setup(const std::shared_ptr<pbft_operation>& op, pbft_msg_type type)
{
pbft_msg msg;
msg.set_view(op->view);
msg.set_sequence(op->sequence);
msg.set_request_hash(op->request_hash);
msg.set_view(op->get_view());
msg.set_sequence(op->get_sequence());
msg.set_request_hash(op->get_request_hash());
msg.set_type(type);

return msg;
Expand All @@ -550,7 +550,7 @@ pbft::common_message_setup(const std::shared_ptr<pbft_operation>& op, pbft_msg_t
void
pbft::do_preprepare(const std::shared_ptr<pbft_operation>& op)
{
LOG(debug) << "Doing preprepare for operation " << op->debug_string();
LOG(debug) << "Doing preprepare for operation " << op->get_sequence();

pbft_msg msg = this->common_message_setup(op, PBFT_MSG_PREPREPARE);
msg.set_allocated_request(new bzn_envelope(op->get_request()));
Expand All @@ -561,7 +561,7 @@ pbft::do_preprepare(const std::shared_ptr<pbft_operation>& op)
void
pbft::do_preprepared(const std::shared_ptr<pbft_operation>& op)
{
LOG(debug) << "Entering prepare phase for operation " << op->debug_string();
LOG(debug) << "Entering prepare phase for operation " << op->get_sequence();

pbft_msg msg = this->common_message_setup(op, PBFT_MSG_PREPARE);

Expand All @@ -581,8 +581,8 @@ pbft::do_prepared(const std::shared_ptr<pbft_operation>& op)
}
}

LOG(debug) << "Entering commit phase for operation " << op->debug_string();
op->begin_commit_phase();
LOG(debug) << "Entering commit phase for operation " << op->get_sequence();
op->advance_operation_stage(pbft_operation_stage::commit);

pbft_msg msg = this->common_message_setup(op, PBFT_MSG_COMMIT);

Expand All @@ -603,14 +603,14 @@ pbft::do_committed(const std::shared_ptr<pbft_operation>& op)
}
}

LOG(debug) << "Operation " << op->debug_string() << " is committed-local";
op->end_commit_phase();
LOG(debug) << "Operation " << op->get_sequence() << " is committed-local";
op->advance_operation_stage(pbft_operation_stage::execute);

if (this->audit_enabled)
{
audit_message msg;
msg.mutable_pbft_commit()->set_operation(op->request_hash);
msg.mutable_pbft_commit()->set_sequence_number(op->sequence);
msg.mutable_pbft_commit()->set_operation(op->get_request_hash());
msg.mutable_pbft_commit()->set_sequence_number(op->get_sequence());
msg.mutable_pbft_commit()->set_sender_uuid(this->uuid);

this->broadcast(this->wrap_message(msg));
Expand All @@ -628,7 +628,7 @@ pbft::do_committed(const std::shared_ptr<pbft_operation>& op)
msg->set_allocated_nullmsg(new database_nullmsg);
bzn_envelope request;
request.set_database_msg(msg->SerializeAsString());
auto new_op = std::make_shared<pbft_operation>(op->view, op->sequence
auto new_op = std::make_shared<pbft_memory_operation>(op->get_view(), op->get_sequence()
, this->crypto->hash(request), nullptr);
new_op->record_request(request);
this->io_context->post(std::bind(&pbft_service_base::apply_operation, this->service, new_op));
Expand Down Expand Up @@ -663,7 +663,7 @@ pbft::find_operation(const pbft_msg& msg)
std::shared_ptr<pbft_operation>
pbft::find_operation(const std::shared_ptr<pbft_operation>& op)
{
return this->find_operation(op->view, op->sequence, op->request_hash);
return this->find_operation(op->get_view(), op->get_sequence(), op->get_request_hash());
}

std::shared_ptr<pbft_operation>
Expand All @@ -676,7 +676,7 @@ pbft::find_operation(uint64_t view, uint64_t sequence, const bzn::hash_t& req_ha
{
LOG(debug) << "Creating operation for seq " << sequence << " view " << view << " req " << bytes_to_debug_string(req_hash);

std::shared_ptr<pbft_operation> op = std::make_shared<pbft_operation>(view, sequence, req_hash,
std::shared_ptr<pbft_operation> op = std::make_shared<pbft_memory_operation>(view, sequence, req_hash,
this->current_peers_ptr());
bool added;
std::tie(std::ignore, added) = operations.emplace(std::piecewise_construct, std::forward_as_tuple(std::move(key)), std::forward_as_tuple(op));
Expand Down Expand Up @@ -944,7 +944,7 @@ pbft::clear_operations_until(const checkpoint_t& cp)
auto it = this->operations.begin();
while (it != this->operations.end())
{
if(it->second->sequence <= cp.first)
if(it->second->get_sequence() <= cp.first)
{
it = this->operations.erase(it);
ops_removed++;
Expand Down Expand Up @@ -1183,3 +1183,21 @@ pbft::already_seen_request(const bzn_envelope& req, const request_hash_t& hash)

return false;
}

size_t
pbft::faulty_nodes_bound(size_t swarm_size)
{
return (swarm_size-1)/3;
}

size_t
pbft::honest_member_size(size_t swarm_size)
{
return pbft::faulty_nodes_bound(swarm_size) + 1;
}

size_t
pbft::honest_majority_size(size_t swarm_size)
{
return pbft::faulty_nodes_bound(swarm_size) * 2 + 1;
}
17 changes: 17 additions & 0 deletions pbft/pbft.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,23 @@ namespace bzn

bzn::json_message get_status() override;

/*
* maximum number of tolerable faults (this can be a parameter, but for now we assume it has the worst-case value)
* f = floor( (n-1) / 3 )
*/
static size_t faulty_nodes_bound(size_t swarm_size);

/*
* minimum quorum size such that the majority of the quorum is guaranteed to be honest
* 2f+1
*/
static size_t honest_majority_size(size_t swarm_size);

/*
* minimum quorum size such that at least one member is guarenteed to be honest
*/
static size_t honest_member_size(size_t swarm_size);

private:
std::shared_ptr<pbft_operation> find_operation(uint64_t view, uint64_t sequence, const bzn::hash_t& request_hash);
std::shared_ptr<pbft_operation> find_operation(const pbft_msg& msg);
Expand Down

0 comments on commit 2e7d75b

Please sign in to comment.