Skip to content
This repository has been archived by the owner on Mar 3, 2020. It is now read-only.

Commit

Permalink
KEP-677 Add status reporting to pbft module.
Browse files Browse the repository at this point in the history
  • Loading branch information
ebruck committed Oct 4, 2018
1 parent 22e848a commit d360694
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 59 deletions.
3 changes: 1 addition & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ matrix:
- os: osx
osx_image: xcode10
env:
- MATRIX_EVAL="brew update && brew unlink python && brew install protobuf && brew install snappy && brew install lz4 && brew install openssl && brew install cmake"
- CMAKE_URL="https://cmake.org/files/v3.12/cmake-3.12.1-Darwin-x86_64.tar.gz"
- MATRIX_EVAL="brew update && brew unlink python && brew install protobuf && brew install snappy && brew install lz4 && brew install openssl"
- CMAKE_COMMAND="cmake -DOPENSSL_ROOT_DIR=/usr/local/opt/openssl"

cache:
Expand Down
93 changes: 71 additions & 22 deletions pbft/pbft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,19 @@
using namespace bzn;

pbft::pbft(
std::shared_ptr<bzn::node_base> node
, std::shared_ptr<bzn::asio::io_context_base> io_context
, const bzn::peers_list_t& peers
, bzn::uuid_t uuid
, std::shared_ptr<pbft_service_base> service
, std::shared_ptr<pbft_failure_detector_base> failure_detector
)
: node(std::move(node))
, uuid(std::move(uuid))
, service(std::move(service))
, failure_detector(std::move(failure_detector))
, io_context(io_context)
, audit_heartbeat_timer(this->io_context->make_unique_steady_timer())
std::shared_ptr<bzn::node_base> node
, std::shared_ptr<bzn::asio::io_context_base> io_context
, const bzn::peers_list_t& peers
, bzn::uuid_t uuid
, std::shared_ptr<pbft_service_base> service
, std::shared_ptr<pbft_failure_detector_base> failure_detector
)
: node(std::move(node))
, uuid(std::move(uuid))
, service(std::move(service))
, failure_detector(std::move(failure_detector))
, io_context(io_context)
, audit_heartbeat_timer(this->io_context->make_unique_steady_timer())
{
if (peers.empty())
{
Expand All @@ -54,17 +54,17 @@ pbft::pbft(
std::iota(indicies.begin(), indicies.end(), 0);

std::sort(indicies.begin(), indicies.end(),
[&unordered_peers_list](const auto& i1, const auto& i2)
{
return unordered_peers_list[i1].uuid < unordered_peers_list[i2].uuid;
}
[&unordered_peers_list](const auto& i1, const auto& i2)
{
return unordered_peers_list[i1].uuid < unordered_peers_list[i2].uuid;
}
);

std::transform(indicies.begin(), indicies.end(), std::back_inserter(this->peer_index),
[&unordered_peers_list](auto& peer_index)
{
return unordered_peers_list[peer_index];
}
[&unordered_peers_list](auto& peer_index)
{
return unordered_peers_list[peer_index];
}
);

// TODO: stable checkpoint should be read from disk first: KEP-494
Expand Down Expand Up @@ -675,7 +675,7 @@ pbft::handle_database_message(const bzn::json_message& json, std::shared_ptr<bzn
*response.mutable_header() = msg.db().header();

pbft_request req;
req.set_operation(json.toStyledString());
*req.mutable_operation() = msg.db();
req.set_timestamp(0); //TODO: KEP-611

this->handle_request(req, session);
Expand All @@ -695,3 +695,52 @@ pbft::get_high_water_mark()
{
return this->high_water_mark;
}

std::string
pbft::get_name()
{
return "pbft";
}


bzn::json_message
pbft::get_status()
{
bzn::json_message status;

std::lock_guard<std::mutex> lock(this->pbft_lock);

status["outstanding_operations_count"] = uint64_t(this->outstanding_operations_count());
status["is_primary"] = this->is_primary();

auto primary = this->get_primary();
status["primary"]["host"] = primary.host;
status["primary"]["host_port"] = primary.port;
status["primary"]["http_port"] = primary.http_port;
status["primary"]["name"] = primary.name;
status["primary"]["uuid"] = primary.uuid;

status["latest_stable_checkpoint"]["count"] = this->latest_stable_checkpoint().first;
status["latest_stable_checkpoint"]["hash"] = this->latest_stable_checkpoint().second;
status["latest_checkpoint"]["count"] = this->latest_checkpoint().first;
status["latest_checkpoint"]["hash"] = this->latest_checkpoint().first;

status["unstable_checkpoints_count"] = uint64_t(this->unstable_checkpoints_count());
status["next_issued_sequence_number"] = this->next_issued_sequence_number;
status["view"] = this->view;

status["peer_index"] = bzn::json_message();
for(const auto& p : this->peer_index)
{
bzn::json_message peer;
peer["host"] = p.host;
peer["port"] = p.port;
peer["http_port"] = p.http_port;
peer["name"] = p.name;
peer["uuid"] = p.uuid;
status["peer_index"].append(peer);
}

return status;
}

14 changes: 11 additions & 3 deletions pbft/pbft.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
#pragma once

#include <include/bluzelle.hpp>
#include <include/boost_asio_beast.hpp>
#include <pbft/pbft_base.hpp>
#include <pbft/pbft_failure_detector.hpp>
#include <pbft/pbft_service_base.hpp>
#include <status/status_provider_base.hpp>
#include <mutex>
#include <pbft/pbft_failure_detector.hpp>
#include <include/boost_asio_beast.hpp>


namespace
{
Expand All @@ -35,7 +37,7 @@ namespace bzn
using request_hash_t = std::string;
using checkpoint_t = std::pair<uint64_t, bzn::hash_t>;

class pbft final : public bzn::pbft_base, public std::enable_shared_from_this<pbft>
class pbft final : public bzn::pbft_base, public bzn::status_provider_base, public std::enable_shared_from_this<pbft>
{
public:
pbft(
Expand Down Expand Up @@ -64,12 +66,18 @@ namespace bzn
void set_audit_enabled(bool setting);

checkpoint_t latest_stable_checkpoint() const;

checkpoint_t latest_checkpoint() const;

size_t unstable_checkpoints_count() const;

uint64_t get_low_water_mark();
uint64_t get_high_water_mark();

std::string get_name() override;

bzn::json_message get_status() override;

private:
std::shared_ptr<pbft_operation> find_operation(uint64_t view, uint64_t sequence, const pbft_request& request);
std::shared_ptr<pbft_operation> find_operation(const pbft_msg& msg);
Expand Down
4 changes: 2 additions & 2 deletions pbft/test/pbft_failure_detector_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ namespace
[&](auto handler){this->request_timer_callback = handler;}
));

req_a.set_operation("do something");
req_b.set_operation("do something else");
//req_a.set_operation("do something");
//req_b.set_operation("do something else");

req_a.set_client("alice");
req_b.set_client("bob");
Expand Down
2 changes: 1 addition & 1 deletion pbft/test/pbft_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ namespace bzn::test
pbft_msg preprepared(this->preprepare_msg);

preprepareb.mutable_request()->set_timestamp(99);
prepreparec.mutable_request()->set_operation("something else");
prepreparec.mutable_request()->mutable_operation();
preprepared.mutable_request()->set_client("certainly not bob");

this->pbft->handle_message(prepreparea);
Expand Down
2 changes: 1 addition & 1 deletion pbft/test/pbft_test_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ namespace bzn::test
{ this->service_execute_handler = handler; }
));

request_msg.mutable_request()->set_operation("do some stuff");
// request_msg.mutable_request()->set_operation("do some stuff");
request_msg.mutable_request()->set_client("bob");
request_msg.mutable_request()->set_timestamp(1);
request_msg.set_type(PBFT_MSG_REQUEST);
Expand Down
2 changes: 1 addition & 1 deletion proto/bluzelle.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ enum bzn_msg_type
message wrapped_bzn_msg
{
// This is stored as a serialized string because we need to sign it, and serialization is not guarenteed to be deterministic
string payload = 1;
bytes payload = 1;
bzn_msg_type type = 2;

//TODO: string sender
Expand Down
4 changes: 3 additions & 1 deletion proto/pbft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

syntax = "proto3";

import "database.proto";

message pbft_msg
{
pbft_msg_type type = 1;
Expand Down Expand Up @@ -45,7 +47,7 @@ enum pbft_msg_type {

message pbft_request
{
string operation = 1;
database_msg operation = 1;
uint64 timestamp = 2;
string client = 3;
}
40 changes: 19 additions & 21 deletions scripts/crud
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,21 @@ def send_request(node, uuid, msg, loop=False, ws=None):
req["msg"] = base64.b64encode(msg.SerializeToString())

if use_pbft:
# TODO: Wrap this more appropriately. Crud should not be hearing these
# directly in the first place.
msg_outer = pbft_pb2.pbft_msg()
msg_outer.type = 1 #request
msg_outer.request.operation = json.dumps(req)
msg_outer.request.client = "crud cli"
msg_outer.request.timestamp = 0 # TODO

req_outer = {}
req_outer["bzn-api"] = "pbft";
req_outer["pbft-data"] = base64.b64encode(msg_outer.SerializeToString())

print("Sending: \n{}".format(msg_outer))
ws.send(json.dumps(req_outer))

msg_outer = bluzelle_pb2.wrapped_bzn_msg()
msg_outer.type = 1 # BZN_MSG_PBFT

msg_inner = pbft_pb2.pbft_msg()
msg_inner.type = 1 # PBFT_MSG_REQUEST
msg_inner.request.operation.CopyFrom(msg.db)
msg_inner.request.client = "crud cli"
msg_inner.request.timestamp = 0 # TODO
msg_outer.payload = msg_inner.SerializeToString()

print("Sending: \n{}".format(msg_outer).expandtabs(4))
ws.send_binary(msg_outer.SerializeToString())
return
else:
print("Sending: \n{}".format(msg))
print("Sending: \n{}".format(msg).expandtabs(4))
ws.send(json.dumps(req))

print("-" * 60 + '\n')
Expand All @@ -84,10 +82,10 @@ def send_request(node, uuid, msg, loop=False, ws=None):

if resp.WhichOneof('response') == 'redirect':
redirect_node = "{}:{}".format(resp.redirect.leader_host, resp.redirect.leader_port)
print("redirecting to leader at {}...\n".format(redirect_node))
print("redirecting to leader at {}...\n".format(redirect_node).expandtabs(4))
resp = send_request(redirect_node, uuid, msg)
else:
print("Response: \n{}".format(resp))
print("Response: \n{}".format(resp).expandtabs(4))
print("-" * 60 + '\n')

if loop:
Expand All @@ -96,7 +94,7 @@ def send_request(node, uuid, msg, loop=False, ws=None):
print("Waiting....\n")
resp = database_pb2.database_response()
resp.ParseFromString(ws.recv())
print("Response: \n{}".format(resp))
print("Response: \n{}".format(resp).expandtabs(4))
print("-" * 60 + '\n')
except KeyboardInterrupt:
break
Expand All @@ -114,13 +112,13 @@ def send_status_request(node):
req["bzn-api"] = "status"
req["transaction_id"] = random.randint(1,sys.maxint);

print("Sending : \n" + json.dumps(req,indent=4))
print("Sending : \n" + json.dumps(req, indent=4))
print("-" * 60 + '\n')

ws.send(json.dumps(req))
resp = ws.recv()

print("Response: \n{}".format(resp))
print("Response: \n{}".format(resp).expandtabs(4))
print("-" * 60 + '\n')

ws.close()
Expand Down
9 changes: 4 additions & 5 deletions swarm/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,14 +235,13 @@ main(int argc, const char* argv[])
if (options->pbft_enabled())
{
auto failure_detector = std::make_shared<bzn::pbft_failure_detector>(io_context);
auto pbft = std::make_shared<bzn::pbft>(node, io_context, peers.get_peers(), options->get_uuid()
, std::make_shared<bzn::dummy_pbft_service>(io_context)
, failure_detector
);

auto pbft = std::make_shared<bzn::pbft>(node, io_context, peers.get_peers(), options->get_uuid(), std::make_shared<bzn::dummy_pbft_service>(io_context), failure_detector);
pbft->set_audit_enabled(options->get_simple_options().get<bool>(bzn::option_names::AUDIT_ENABLED));

auto status = std::make_shared<bzn::status>(node, bzn::status::status_provider_list_t{pbft});

pbft->start();
status->start();
}
else
{
Expand Down

0 comments on commit d360694

Please sign in to comment.