diff --git a/.gitignore b/.gitignore index dfe78b3c..27182d0e 100644 --- a/.gitignore +++ b/.gitignore @@ -8,5 +8,6 @@ scripts/bluzelle_pb2.py scripts/database_pb2.py scripts/audit_pb2.py scripts/pbft_pb2.py +scripts/status_pb2.py *.pyc *.pem diff --git a/audit/audit.hpp b/audit/audit.hpp index 9945655d..92ce722d 100644 --- a/audit/audit.hpp +++ b/audit/audit.hpp @@ -16,7 +16,6 @@ #include #include -#include #include #include #include diff --git a/audit/audit_base.hpp b/audit/audit_base.hpp index 9481f268..2d7a7fd8 100644 --- a/audit/audit_base.hpp +++ b/audit/audit_base.hpp @@ -21,7 +21,7 @@ #include #include #include -#include +#include namespace bzn { diff --git a/crypto/crypto.cpp b/crypto/crypto.cpp index 7378a6fe..7b58d451 100644 --- a/crypto/crypto.cpp +++ b/crypto/crypto.cpp @@ -35,8 +35,45 @@ crypto::crypto(std::shared_ptr options) } } +const std::string& +crypto::extract_payload(const bzn_envelope& msg) +{ + switch (msg.payload_case()) + { + case bzn_envelope::kPbftRequest : + { + return msg.pbft_request(); + } + case bzn_envelope::kDatabaseResponse : + { + return msg.database_response(); + } + case bzn_envelope::kJson : + { + return msg.json(); + } + case bzn_envelope::kAudit : + { + return msg.audit(); + } + case bzn_envelope::kPbft : + { + return msg.pbft(); + } + case bzn_envelope::kPbftMembership : + { + return msg.pbft_membership(); + } + default : + { + throw std::runtime_error( + "Crypto does not know how to handle a message with type " + std::to_string(msg.payload_case())); + } + } +} + bool -crypto::verify(const wrapped_bzn_msg& msg) +crypto::verify(const bzn_envelope& msg) { BIO_ptr_t bio(BIO_new(BIO_s_mem()), &BIO_free); EC_KEY_ptr_t pubkey(nullptr, &EC_KEY_free); @@ -54,6 +91,7 @@ crypto::verify(const wrapped_bzn_msg& msg) std::string signature = msg.signature(); char* sig_ptr = signature.data(); + bool result = // Reconstruct the PEM file in memory (this is awkward, but it avoids dealing with EC specifics) (0 < BIO_write(bio.get(), PEM_PREFIX.c_str(), PEM_PREFIX.length())) @@ -67,7 +105,7 @@ crypto::verify(const wrapped_bzn_msg& msg) // Perform the signature validation && (1 == EVP_DigestVerifyInit(context.get(), NULL, EVP_sha512(), NULL, key.get())) - && (1 == EVP_DigestVerifyUpdate(context.get(), msg.payload().c_str(), msg.payload().length())) + && (1 == EVP_DigestVerifyUpdate(context.get(), this->extract_payload(msg).c_str(), this->extract_payload(msg).length())) && (1 == EVP_DigestVerifyFinal(context.get(), reinterpret_cast(sig_ptr), msg.signature().length())); /* Any errors here can be attributed to a bad (potentially malicious) incoming message, and we we should not @@ -79,7 +117,7 @@ crypto::verify(const wrapped_bzn_msg& msg) } bool -crypto::sign(wrapped_bzn_msg& msg) +crypto::sign(bzn_envelope& msg) { if (msg.sender().empty()) { @@ -98,7 +136,7 @@ crypto::sign(wrapped_bzn_msg& msg) bool result = (bool) (context) && (1 == EVP_DigestSignInit(context.get(), NULL, EVP_sha512(), NULL, this->private_key_EVP.get())) - && (1 == EVP_DigestSignUpdate(context.get(), msg.payload().c_str(), msg.payload().size())) + && (1 == EVP_DigestSignUpdate(context.get(), this->extract_payload(msg).c_str(), this->extract_payload(msg).length())) && (1 == EVP_DigestSignFinal(context.get(), NULL, &signature_length)); auto deleter = [](unsigned char* ptr){OPENSSL_free(ptr);}; diff --git a/crypto/crypto.hpp b/crypto/crypto.hpp index 52e00d51..a4e41a20 100644 --- a/crypto/crypto.hpp +++ b/crypto/crypto.hpp @@ -29,9 +29,9 @@ namespace bzn crypto(std::shared_ptr options); - bool sign(wrapped_bzn_msg& msg) override; + bool sign(bzn_envelope& msg) override; - bool verify(const wrapped_bzn_msg& msg) override; + bool verify(const bzn_envelope& msg) override; std::string hash(const std::string& msg) override; @@ -46,6 +46,8 @@ namespace bzn void log_openssl_errors(); + const std::string& extract_payload(const bzn_envelope& msg); + std::shared_ptr options; EVP_PKEY_ptr_t private_key_EVP = EVP_PKEY_ptr_t(nullptr, &EVP_PKEY_free); diff --git a/crypto/crypto_base.hpp b/crypto/crypto_base.hpp index 522faff5..bf26c4a8 100644 --- a/crypto/crypto_base.hpp +++ b/crypto/crypto_base.hpp @@ -26,14 +26,14 @@ namespace bzn * @msg message to sign * @return if signature was successful */ - virtual bool sign(wrapped_bzn_msg& msg) = 0; + virtual bool sign(bzn_envelope& msg) = 0; /* * verify that the signature on a message is correct and matches its sender * @msg message to verify * @return signature is present, valid and matches sender */ - virtual bool verify(const wrapped_bzn_msg& msg) = 0; + virtual bool verify(const bzn_envelope& msg) = 0; /* * Compute the hash of some message diff --git a/crypto/test/crypto_test.cpp b/crypto/test/crypto_test.cpp index 592275bc..c27ecc87 100644 --- a/crypto/test/crypto_test.cpp +++ b/crypto/test/crypto_test.cpp @@ -44,11 +44,11 @@ class crypto_test : public Test "8Mcwr7lq+Hi7/xx7A37wZBHVtCRpaXbJNqRhIErf6FnOZI3m71sQoA==\n" "-----END PUBLIC KEY-----\n"; - wrapped_bzn_msg msg; + bzn_envelope msg; crypto_test() { - this->msg.set_payload("pretend this is a serialized protobuf message"); + this->msg.set_pbft("pretend this is a serialized protobuf message"); std::ofstream ofile(private_key_file.c_str()); ofile << test_private_key_pem; @@ -81,8 +81,8 @@ TEST_F(crypto_test, messages_use_my_public_key) TEST_F(crypto_test, messages_signed_and_verified) { - wrapped_bzn_msg msg2 = msg; - wrapped_bzn_msg msg3 = msg; + bzn_envelope msg2 = msg; + bzn_envelope msg3 = msg; EXPECT_TRUE(crypto->sign(msg)); EXPECT_TRUE(crypto->verify(msg)); @@ -90,7 +90,7 @@ TEST_F(crypto_test, messages_signed_and_verified) TEST_F(crypto_test, bad_signature_caught) { - wrapped_bzn_msg msg2 = msg; + bzn_envelope msg2 = msg; EXPECT_TRUE(crypto->sign(msg)); @@ -100,7 +100,7 @@ TEST_F(crypto_test, bad_signature_caught) TEST_F(crypto_test, bad_sender_caught) { - wrapped_bzn_msg msg3 = msg; + bzn_envelope msg3 = msg; EXPECT_TRUE(crypto->sign(msg)); diff --git a/include/bluzelle.hpp b/include/bluzelle.hpp index 51fdbfad..901bf286 100644 --- a/include/bluzelle.hpp +++ b/include/bluzelle.hpp @@ -33,6 +33,8 @@ namespace bzn using session_id = uint64_t; + using hash_t = std::string; + } // bzn diff --git a/mocks/mock_node_base.hpp b/mocks/mock_node_base.hpp index 8d4af0e6..8cfce7a1 100644 --- a/mocks/mock_node_base.hpp +++ b/mocks/mock_node_base.hpp @@ -27,7 +27,7 @@ class Mocknode_base : public node_base { MOCK_METHOD2(register_for_message, bool(const std::string& msg_type, bzn::message_handler message_handler)); MOCK_METHOD2(register_for_message, - bool(const bzn_msg_type msg_type, bzn::protobuf_handler message_handler)); + bool(const bzn_envelope::PayloadCase msg_type, bzn::protobuf_handler message_handler)); MOCK_METHOD0(start, void()); MOCK_METHOD2(send_message, diff --git a/mocks/mock_pbft_failure_detector.hpp b/mocks/mock_pbft_failure_detector.hpp index f8f141ab..42c4e606 100644 --- a/mocks/mock_pbft_failure_detector.hpp +++ b/mocks/mock_pbft_failure_detector.hpp @@ -23,9 +23,9 @@ namespace bzn class Mockpbft_failure_detector_base : public pbft_failure_detector_base { public: - MOCK_METHOD1(request_seen, void(const pbft_request& req)); + MOCK_METHOD1(request_seen, void(const bzn::hash_t& req)); - MOCK_METHOD1(request_executed, void(const pbft_request& req)); + MOCK_METHOD1(request_executed, void(const bzn::hash_t& req)); MOCK_METHOD1(register_failure_handler, void(std::function handler)); }; diff --git a/mocks/mock_pbft_service_base.hpp b/mocks/mock_pbft_service_base.hpp index 4a7cef27..2b590086 100644 --- a/mocks/mock_pbft_service_base.hpp +++ b/mocks/mock_pbft_service_base.hpp @@ -22,8 +22,8 @@ namespace bzn { class mock_pbft_service_base : public pbft_service_base { public: - MOCK_METHOD2(apply_operation, - void(const pbft_request& request, uint64_t sequence_number)); + MOCK_METHOD1(apply_operation, + void(std::shared_ptr)); MOCK_CONST_METHOD2(query, void(const pbft_request& request, uint64_t sequence_number)); MOCK_CONST_METHOD1(service_state_hash, @@ -31,7 +31,7 @@ namespace bzn { MOCK_METHOD1(consolidate_log, void(uint64_t sequence_number)); MOCK_METHOD1(register_execute_handler, - void(std::function handler)); + void(bzn::execute_handler_t handler)); MOCK_METHOD1(apply_operation, void(const std::shared_ptr&)); }; diff --git a/node/node.cpp b/node/node.cpp index 162d95d4..cb4fbd65 100644 --- a/node/node.cpp +++ b/node/node.cpp @@ -69,7 +69,7 @@ node::register_for_message(const std::string& msg_type, bzn::message_handler msg bool -node::register_for_message(const bzn_msg_type type, bzn::protobuf_handler msg_handler) +node::register_for_message(const bzn_envelope::PayloadCase type, bzn::protobuf_handler msg_handler) { std::lock_guard lock(this->message_map_mutex); @@ -81,7 +81,7 @@ node::register_for_message(const bzn_msg_type type, bzn::protobuf_handler msg_ha if (this->protobuf_map.find(type) != this->protobuf_map.end()) { - LOG(debug) << bzn_msg_type_Name(type) << " message type already registered"; + LOG(debug) << type << " message type already registered"; return false; } @@ -143,7 +143,7 @@ node::priv_msg_handler(const Json::Value& msg, std::shared_ptr session) +node::priv_protobuf_handler(const bzn_envelope& msg, std::shared_ptr session) { std::lock_guard lock(this->message_map_mutex); @@ -156,13 +156,13 @@ node::priv_protobuf_handler(const wrapped_bzn_msg& msg, std::shared_ptrprotobuf_map.find(msg.type()); it != this->protobuf_map.end()) + if (auto it = this->protobuf_map.find(msg.payload_case()); it != this->protobuf_map.end()) { it->second(msg, std::move(session)); } else { - LOG(debug) << "no handler for message type " << bzn_msg_type_Name(msg.type()); + LOG(debug) << "no handler for message type " << msg.payload_case(); } } @@ -224,7 +224,7 @@ node::send_message(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr msg) +node::send_message(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr msg) { if(msg->sender().empty()) { diff --git a/node/node.hpp b/node/node.hpp index 1bed882d..f2bf450f 100644 --- a/node/node.hpp +++ b/node/node.hpp @@ -36,13 +36,13 @@ namespace bzn bool register_for_message(const std::string& msg_type, bzn::message_handler msg_handler) override; - bool register_for_message(const bzn_msg_type type, bzn::protobuf_handler msg_handler) override; + bool register_for_message(const bzn_envelope::PayloadCase type, bzn::protobuf_handler msg_handler) override; void start() override; void send_message(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr msg) override; - void send_message(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr msg); + void send_message(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr msg); void send_message_str(const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr msg) override; @@ -53,7 +53,7 @@ namespace bzn void do_accept(); void priv_msg_handler(const bzn::json_message& msg, std::shared_ptr session); - void priv_protobuf_handler(const wrapped_bzn_msg& msg, std::shared_ptr session); + void priv_protobuf_handler(const bzn_envelope& msg, std::shared_ptr session); std::unique_ptr tcp_acceptor; std::shared_ptr io_context; @@ -63,7 +63,7 @@ namespace bzn const std::chrono::milliseconds ws_idle_timeout; std::unordered_map message_map; - std::unordered_map protobuf_map; + std::unordered_map protobuf_map; std::mutex message_map_mutex; std::once_flag start_once; diff --git a/node/node_base.hpp b/node/node_base.hpp index 92f9e33a..c7ee2ab8 100644 --- a/node/node_base.hpp +++ b/node/node_base.hpp @@ -41,7 +41,7 @@ namespace bzn * @param msg_handler callback * @return true if registration succeeded */ - virtual bool register_for_message(const bzn_msg_type type, bzn::protobuf_handler msg_handler) = 0; + virtual bool register_for_message(const bzn_envelope::PayloadCase type, bzn::protobuf_handler msg_handler) = 0; /** * Start server's listener etc. diff --git a/node/session.cpp b/node/session.cpp index eb3f51fd..ba9ce50a 100644 --- a/node/session.cpp +++ b/node/session.cpp @@ -92,7 +92,7 @@ session::do_read() Json::Value msg; Json::Reader reader; - wrapped_bzn_msg proto_msg; + bzn_envelope proto_msg; if (reader.parse(ss.str(), msg)) { diff --git a/node/session_base.hpp b/node/session_base.hpp index 42ca7698..95431265 100644 --- a/node/session_base.hpp +++ b/node/session_base.hpp @@ -24,7 +24,7 @@ namespace bzn class session_base; using message_handler = std::function session)>; - using protobuf_handler = std::function session)>; + using protobuf_handler = std::function session)>; class session_base { diff --git a/node/test/node_test.cpp b/node/test/node_test.cpp index e571af67..d7a9b3a0 100644 --- a/node/test/node_test.cpp +++ b/node/test/node_test.cpp @@ -183,22 +183,20 @@ namespace bzn // Add our test callback... unsigned int callback_execute = 0u; - node->register_for_message(BZN_MSG_PBFT, [&](const auto& /*msg*/, auto) + node->register_for_message(bzn_envelope::kPbft, [&](const auto& /*msg*/, auto) { callback_execute++; }); - wrapped_bzn_msg bad_msg; - bad_msg.set_payload("some stuff"); + bzn_envelope bad_msg; + bad_msg.set_pbft("some stuff"); bad_msg.set_sender("Elizabeth the Second, by the Grace of God of the United Kingdom, Canada and Her other Realms and Territories Queen, Head of the Commonwealth, Defender of the Faith"); bad_msg.set_signature("probably not a valid signature"); - bad_msg.set_type(BZN_MSG_PBFT); - wrapped_bzn_msg anon_msg; - anon_msg.set_payload("some stuff"); + bzn_envelope anon_msg; + anon_msg.set_pbft("some stuff"); anon_msg.set_sender(""); anon_msg.set_signature(""); - anon_msg.set_type(BZN_MSG_PBFT); node->priv_protobuf_handler(bad_msg, mock_session); EXPECT_EQ(callback_execute, 0u); diff --git a/node/test/session_test.cpp b/node/test/session_test.cpp index d49b4afa..702cc113 100644 --- a/node/test/session_test.cpp +++ b/node/test/session_test.cpp @@ -179,7 +179,7 @@ namespace bzn ASSERT_FALSE(proto_handler_called); - wrapped_bzn_msg proto_msg; + bzn_envelope proto_msg; write_to_buffer(proto_msg.SerializeAsString()); read_handler(boost::system::error_code(), 0); diff --git a/pbft/database_pbft_service.cpp b/pbft/database_pbft_service.cpp index 6d339788..31350557 100644 --- a/pbft/database_pbft_service.cpp +++ b/pbft/database_pbft_service.cpp @@ -50,10 +50,10 @@ database_pbft_service::apply_operation(const std::shared_ptr lock(this->lock); // store op... - if (auto result = this->unstable_storage->create(this->uuid, std::to_string(op->sequence), op->request.SerializeAsString()); + if (auto result = this->unstable_storage->create(this->uuid, std::to_string(op->sequence), op->get_request().SerializeAsString()); result != bzn::storage_base::result::ok) { - LOG(fatal) << "failed to store pbft request: " << op->request.DebugString() << ", " << uint32_t(result); + LOG(fatal) << "failed to store pbft request: " << op->get_request().DebugString() << ", " << uint32_t(result); // these are fatal... something bad is going on. throw std::runtime_error("Failed to store pbft request! (" + std::to_string(uint8_t(result)) + ")"); @@ -104,7 +104,7 @@ database_pbft_service::process_awaiting_operations() this->crud->handle_request(request.operation(), nullptr); } - this->io_context->post(std::bind(this->execute_handler, request, this->next_request_sequence)); + this->io_context->post(std::bind(this->execute_handler, nullptr)); // TODO: need to find the pbft_operation here; requires pbft_operation not being an in-memory construct if (auto result = this->unstable_storage->remove(uuid, key); result != bzn::storage_base::result::ok) { diff --git a/pbft/dummy_pbft_service.cpp b/pbft/dummy_pbft_service.cpp index f1dcf6a1..d0e68762 100644 --- a/pbft/dummy_pbft_service.cpp +++ b/pbft/dummy_pbft_service.cpp @@ -40,7 +40,7 @@ dummy_pbft_service::apply_operation(const std::shared_ptr& op) this->send_execute_response(op); // todo: use shared from this as post could act on a long gone dummy_pbft_service? - this->io_context->post(std::bind(this->execute_handler, op->request, this->next_request_sequence)); + this->io_context->post(std::bind(this->execute_handler, op)); this->waiting_operations.erase(this->next_request_sequence); this->next_request_sequence++; diff --git a/pbft/pbft.cpp b/pbft/pbft.cpp index 6592bc23..10482cdf 100644 --- a/pbft/pbft.cpp +++ b/pbft/pbft.cpp @@ -33,6 +33,7 @@ pbft::pbft( , bzn::uuid_t uuid , std::shared_ptr service , std::shared_ptr failure_detector + , std::shared_ptr crypto ) : node(std::move(node)) , uuid(std::move(uuid)) @@ -40,6 +41,7 @@ pbft::pbft( , failure_detector(std::move(failure_detector)) , io_context(io_context) , audit_heartbeat_timer(this->io_context->make_unique_steady_timer()) + , crypto(std::move(crypto)) { if (peers.empty()) { @@ -59,10 +61,10 @@ pbft::start() std::call_once(this->start_once, [this]() { - this->node->register_for_message(bzn_msg_type::BZN_MSG_PBFT, + this->node->register_for_message(bzn_envelope::PayloadCase::kPbft, std::bind(&pbft::handle_bzn_message, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); - this->node->register_for_message(bzn_msg_type::BZN_MSG_PBFT_MEMBERSHIP, + this->node->register_for_message(bzn_envelope::PayloadCase::kPbftMembership, std::bind(&pbft::handle_membership_message, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); this->node->register_for_message("database", @@ -74,16 +76,22 @@ pbft::start() this->service->register_execute_handler( [weak_this = this->weak_from_this(), fd = this->failure_detector] - (const pbft_request& req, uint64_t sequence) + (std::shared_ptr op) { - fd->request_executed(req); + if (!op) + { + // TODO: Get real pbft_operation pointers from pbft_service + LOG(error) << "Ignoring null operation pointer recieved from pbft_service"; + } - if (sequence % CHECKPOINT_INTERVAL == 0) + fd->request_executed(op->request_hash); + + if (op->sequence % CHECKPOINT_INTERVAL == 0) { auto strong_this = weak_this.lock(); if(strong_this) { - strong_this->checkpoint_reached_locally(sequence); + strong_this->checkpoint_reached_locally(op->sequence); } else { @@ -130,15 +138,15 @@ pbft::handle_audit_heartbeat_timeout(const boost::system::error_code& ec) } void -pbft::handle_bzn_message(const wrapped_bzn_msg& msg, std::shared_ptr /*session*/) +pbft::handle_bzn_message(const bzn_envelope& msg, std::shared_ptr /*session*/) { - if (msg.type() != BZN_MSG_PBFT) + if (msg.payload_case() != bzn_envelope::kPbft ) { LOG(error) << "Got misdirected message " << msg.DebugString().substr(0, MAX_MESSAGE_SIZE); } pbft_msg inner_msg; - if (!inner_msg.ParseFromString(msg.payload())) + if (!inner_msg.ParseFromString(msg.pbft())) { LOG(error) << "Failed to parse payload of wrapped message " << msg.DebugString().substr(0, MAX_MESSAGE_SIZE); return; @@ -148,10 +156,10 @@ pbft::handle_bzn_message(const wrapped_bzn_msg& msg, std::shared_ptr /*session*/) +pbft::handle_membership_message(const bzn_envelope& msg, std::shared_ptr /*session*/) { pbft_membership_msg inner_msg; - if (!inner_msg.ParseFromString(msg.payload())) + if (!inner_msg.ParseFromString(msg.pbft_membership())) { LOG(error) << "Failed to parse payload of wrapped message " << msg.DebugString().substr(0, MAX_MESSAGE_SIZE); return; @@ -173,7 +181,7 @@ pbft::handle_membership_message(const wrapped_bzn_msg& msg, std::shared_ptrfailure_detector->request_seen(msg.request()); - } - std::lock_guard lock(this->pbft_lock); switch (msg.type()) @@ -238,10 +241,11 @@ pbft::preliminary_filter_msg(const pbft_msg& msg) } std::shared_ptr -pbft::setup_request_operation(const pbft_request& msg, const std::shared_ptr& session) +pbft::setup_request_operation(const bzn::encoded_message& request, const std::shared_ptr& session) { const uint64_t request_seq = this->next_issued_sequence_number++; - auto op = this->find_operation(this->view, request_seq, msg); + auto op = this->find_operation(this->view, request_seq, this->crypto->hash(request)); + op->record_request(request); if (session) { @@ -252,7 +256,7 @@ pbft::setup_request_operation(const pbft_request& msg, const std::shared_ptr& session) +pbft::handle_request(const pbft_request& /*msg*/, const bzn::json_message& original_msg, const std::shared_ptr& session) { if (!this->is_primary()) { @@ -265,12 +269,27 @@ pbft::handle_request(const pbft_request& msg, const bzn::json_message& original_ //TODO: keep track of what requests we've seen based on timestamp and only send preprepares once - KEP-329 - auto op = setup_request_operation(msg, session); + auto op = setup_request_operation(original_msg.toStyledString(), session); this->do_preprepare(op); } void -pbft::handle_preprepare(const pbft_msg& msg, const wrapped_bzn_msg& original_msg) +pbft::maybe_record_request(const pbft_msg& msg, const std::shared_ptr& op) +{ + if (!msg.request().empty() && !op->has_request()) + { + if (this->crypto->hash(msg.request()) != msg.request_hash()) + { + LOG(info) << "Not recording request because its hash does not match"; + return; + } + + op->record_request(msg.request()); + } +} + +void +pbft::handle_preprepare(const pbft_msg& msg, const bzn_envelope& original_msg) { // If we've already accepted a preprepare for this view+sequence, and it's not this one, then we should reject this one // Note that if we get the same preprepare more than once, we can still accept it @@ -278,7 +297,7 @@ pbft::handle_preprepare(const pbft_msg& msg, const wrapped_bzn_msg& original_msg if (auto lookup = this->accepted_preprepares.find(log_key); lookup != this->accepted_preprepares.end() - && std::get<2>(lookup->second) != pbft_operation::request_hash(msg.request())) + && std::get<2>(lookup->second) != msg.request_hash()) { LOG(debug) << "Rejecting preprepare because I've already accepted a conflicting one \n"; @@ -288,11 +307,12 @@ pbft::handle_preprepare(const pbft_msg& msg, const wrapped_bzn_msg& original_msg { auto op = this->find_operation(msg); op->record_preprepare(original_msg); + this->maybe_record_request(msg, op); // This assignment will be redundant if we've seen this preprepare before, but that's fine accepted_preprepares[log_key] = op->get_operation_key(); - if (msg.has_request() && msg.request().type() == PBFT_REQ_NEW_CONFIG) + if (op->has_request() && op->get_request().type() == PBFT_REQ_NEW_CONFIG) { this->handle_config_message(msg, op); } @@ -303,22 +323,24 @@ pbft::handle_preprepare(const pbft_msg& msg, const wrapped_bzn_msg& original_msg } void -pbft::handle_prepare(const pbft_msg& msg, const wrapped_bzn_msg& original_msg) +pbft::handle_prepare(const pbft_msg& msg, const bzn_envelope& original_msg) { // Prepare messages are never rejected, assuming the sanity checks passed auto op = this->find_operation(msg); op->record_prepare(original_msg); + this->maybe_record_request(msg, op); this->maybe_advance_operation_state(op); } void -pbft::handle_commit(const pbft_msg& msg, const wrapped_bzn_msg& original_msg) +pbft::handle_commit(const pbft_msg& msg, const bzn_envelope& original_msg) { // Commit messages are never rejected, assuming the sanity checks passed auto op = this->find_operation(msg); op->record_commit(original_msg); + this->maybe_record_request(msg, op); this->maybe_advance_operation_state(op); } @@ -400,7 +422,7 @@ pbft::common_message_setup(const std::shared_ptr& op, pbft_msg_t pbft_msg msg; msg.set_view(op->view); msg.set_sequence(op->sequence); - msg.set_allocated_request(new pbft_request(op->request)); + msg.set_request_hash(op->request_hash); msg.set_type(type); return msg; @@ -412,6 +434,7 @@ pbft::do_preprepare(const std::shared_ptr& op) LOG(debug) << "Doing preprepare for operation " << op->debug_string(); pbft_msg msg = this->common_message_setup(op, PBFT_MSG_PREPREPARE); + msg.set_request(op->get_encoded_request()); this->broadcast(this->wrap_message(msg, "preprepare")); } @@ -430,10 +453,10 @@ void pbft::do_prepared(const std::shared_ptr& op) { // accept new configuration if applicable - if (op->request.type() == PBFT_REQ_NEW_CONFIG && op->request.has_config()) + if (op->has_request() && op->get_request().type() == PBFT_REQ_NEW_CONFIG && op->get_request().has_config()) { pbft_configuration config; - if (config.from_string(op->request.config().configuration())) + if (config.from_string(op->get_request().config().configuration())) { this->configurations.enable(config.get_hash()); } @@ -451,10 +474,10 @@ void pbft::do_committed(const std::shared_ptr& op) { // commit new configuration if applicable - if (op->request.type() == PBFT_REQ_NEW_CONFIG && op->request.has_config()) + if (op->has_request() && op->get_request().type() == PBFT_REQ_NEW_CONFIG && op->get_request().has_config()) { pbft_configuration config; - if (config.from_string(op->request.config().configuration())) + if (config.from_string(op->get_request().config().configuration())) { // get rid of all other previous configs, except for currently active one this->configurations.remove_prior_to(config.get_hash()); @@ -467,7 +490,7 @@ pbft::do_committed(const std::shared_ptr& op) if (this->audit_enabled) { audit_message msg; - msg.mutable_pbft_commit()->set_operation(pbft_operation::request_hash(op->request)); + msg.mutable_pbft_commit()->set_operation(op->request_hash); msg.mutable_pbft_commit()->set_sequence_number(op->sequence); msg.mutable_pbft_commit()->set_sender_uuid(this->uuid); @@ -500,27 +523,26 @@ pbft::get_primary() const std::shared_ptr pbft::find_operation(const pbft_msg& msg) { - return this->find_operation(msg.view(), msg.sequence(), msg.request()); + return this->find_operation(msg.view(), msg.sequence(), msg.request_hash()); } std::shared_ptr pbft::find_operation(const std::shared_ptr& op) { - return this->find_operation(op->view, op->sequence, op->request); + return this->find_operation(op->view, op->sequence, op->request_hash); } std::shared_ptr -pbft::find_operation(uint64_t view, uint64_t sequence, const pbft_request& request) +pbft::find_operation(uint64_t view, uint64_t sequence, const bzn::hash_t& req_hash) { - auto key = bzn::operation_key_t(view, sequence, pbft_operation::request_hash(request)); + auto key = bzn::operation_key_t(view, sequence, req_hash); auto lookup = operations.find(key); if (lookup == operations.end()) { - LOG(debug) << "Creating operation for seq " << sequence << " view " << view << " req " - << request.ShortDebugString(); + LOG(debug) << "Creating operation for seq " << sequence << " view " << view << " req " << req_hash; - std::shared_ptr op = std::make_shared(view, sequence, request, + std::shared_ptr op = std::make_shared(view, sequence, req_hash, this->current_peers_ptr()); auto result = operations.emplace(std::piecewise_construct, std::forward_as_tuple(std::move(key)), std::forward_as_tuple(op)); @@ -534,9 +556,8 @@ pbft::find_operation(uint64_t view, uint64_t sequence, const pbft_request& reque bzn::encoded_message pbft::wrap_message(const pbft_msg& msg, const std::string& /*debug_info*/) { - wrapped_bzn_msg result; - result.set_payload(msg.SerializeAsString()); - result.set_type(bzn_msg_type::BZN_MSG_PBFT); + bzn_envelope result; + result.set_pbft(msg.SerializeAsString()); result.set_sender(this->uuid); return result.SerializeAsString(); @@ -610,7 +631,7 @@ pbft::checkpoint_reached_locally(uint64_t sequence) } void -pbft::handle_checkpoint(const pbft_msg& msg, const wrapped_bzn_msg& original_msg) +pbft::handle_checkpoint(const pbft_msg& msg, const bzn_envelope& original_msg) { if (msg.sequence() <= stable_checkpoint.first) { @@ -871,7 +892,7 @@ pbft::broadcast_new_configuration(pbft_configuration::shared_const_ptr config) cfg_msg->set_configuration(config->to_string()); req.set_allocated_config(cfg_msg); - auto op = this->setup_request_operation(req); + auto op = this->setup_request_operation(req.SerializeAsString()); this->do_preprepare(op); } @@ -884,7 +905,7 @@ pbft::is_configuration_acceptable_in_new_view(hash_t config_hash) void pbft::handle_config_message(const pbft_msg& msg, const std::shared_ptr& op) { - auto const& request = op->request; + auto const& request = op->get_request(); assert(request.type() == PBFT_REQ_NEW_CONFIG); auto config = std::make_shared(); if (msg.type() == PBFT_MSG_PREPREPARE && config->from_string(request.config().configuration())) diff --git a/pbft/pbft.hpp b/pbft/pbft.hpp index 6d44676a..f7b34a3f 100644 --- a/pbft/pbft.hpp +++ b/pbft/pbft.hpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -48,11 +49,12 @@ namespace bzn , bzn::uuid_t uuid , std::shared_ptr service , std::shared_ptr failure_detector + , std::shared_ptr crypto ); void start() override; - void handle_message(const pbft_msg& msg, const wrapped_bzn_msg& original_msg) override; + void handle_message(const pbft_msg& msg, const bzn_envelope& original_msg) override; void handle_database_message(const bzn::json_message& json, std::shared_ptr session); @@ -83,17 +85,17 @@ namespace bzn bzn::json_message get_status() override; private: - std::shared_ptr find_operation(uint64_t view, uint64_t sequence, const pbft_request& request); + std::shared_ptr find_operation(uint64_t view, uint64_t sequence, const bzn::hash_t& request_hash); std::shared_ptr find_operation(const pbft_msg& msg); std::shared_ptr find_operation(const std::shared_ptr& op); bool preliminary_filter_msg(const pbft_msg& msg); void handle_request(const pbft_request& msg, const bzn::json_message& original_msg, const std::shared_ptr& session = nullptr); - void handle_preprepare(const pbft_msg& msg, const wrapped_bzn_msg& original_msg); - void handle_prepare(const pbft_msg& msg, const wrapped_bzn_msg& original_msg); - void handle_commit(const pbft_msg& msg, const wrapped_bzn_msg& original_msg); - void handle_checkpoint(const pbft_msg& msg, const wrapped_bzn_msg& original_msg); + void handle_preprepare(const pbft_msg& msg, const bzn_envelope& original_msg); + void handle_prepare(const pbft_msg& msg, const bzn_envelope& original_msg); + void handle_commit(const pbft_msg& msg, const bzn_envelope& original_msg); + void handle_checkpoint(const pbft_msg& msg, const bzn_envelope& original_msg); void handle_join_or_leave(const pbft_membership_msg& msg); void handle_config_message(const pbft_msg& msg, const std::shared_ptr& op); @@ -103,13 +105,13 @@ namespace bzn void do_prepared(const std::shared_ptr& op); void do_committed(const std::shared_ptr& op); - void handle_bzn_message(const wrapped_bzn_msg& msg, std::shared_ptr session); - void handle_membership_message(const wrapped_bzn_msg& msg, std::shared_ptr session = nullptr); + void handle_bzn_message(const bzn_envelope& msg, std::shared_ptr session); + void handle_membership_message(const bzn_envelope& msg, std::shared_ptr session = nullptr); bzn::encoded_message wrap_message(const pbft_msg& message, const std::string& debug_info = ""); bzn::encoded_message wrap_message(const audit_message& message, const std::string& debug_info = ""); pbft_msg common_message_setup(const std::shared_ptr& op, pbft_msg_type type); - std::shared_ptr setup_request_operation(const pbft_request& msg + std::shared_ptr setup_request_operation(const bzn::encoded_message& msg , const std::shared_ptr& session = nullptr); void broadcast(const bzn::encoded_message& message); @@ -136,6 +138,8 @@ namespace bzn bool move_to_new_configuration(hash_t config_hash); bool proposed_config_is_acceptable(std::shared_ptr config); + void maybe_record_request(const pbft_msg& msg, const std::shared_ptr& op); + // Using 1 as first value here to distinguish from default value of 0 in protobuf uint64_t view = 1; uint64_t next_issued_sequence_number = 1; diff --git a/pbft/pbft_base.hpp b/pbft/pbft_base.hpp index 1d3b4bdc..ff19033e 100644 --- a/pbft/pbft_base.hpp +++ b/pbft/pbft_base.hpp @@ -21,7 +21,7 @@ #include #include -#include +#include namespace bzn { @@ -32,7 +32,7 @@ namespace bzn public: virtual void start() = 0; - virtual void handle_message(const pbft_msg& msg, const wrapped_bzn_msg& original_msg) = 0; + virtual void handle_message(const pbft_msg& msg, const bzn_envelope& original_msg) = 0; virtual bool is_primary() const = 0; diff --git a/pbft/pbft_failure_detector.cpp b/pbft/pbft_failure_detector.cpp index 17f1f93e..f5ff35da 100644 --- a/pbft/pbft_failure_detector.cpp +++ b/pbft/pbft_failure_detector.cpp @@ -39,16 +39,16 @@ pbft_failure_detector::handle_timeout(boost::system::error_code /*ec*/) { std::lock_guard lock(this->lock); - if (this->completed_requests.count(pbft_operation::request_hash(this->ordered_requests.front())) == 0) + if (this->completed_requests.count(this->ordered_requests.front()) == 0) { - LOG(error) << "Failure detector detected unexecuted request " << this->ordered_requests.front().ShortDebugString() << '\n'; + LOG(error) << "Failure detector detected unexecuted request " << this->ordered_requests.front() << '\n'; this->start_timer(); this->io_context->post(std::bind(this->failure_handler)); return; } while (this->ordered_requests.size() > 0 && - this->completed_requests.count(pbft_operation::request_hash(this->ordered_requests.front())) > 0) + this->completed_requests.count(this->ordered_requests.front()) > 0) { this->ordered_requests.pop_front(); } @@ -60,16 +60,14 @@ pbft_failure_detector::handle_timeout(boost::system::error_code /*ec*/) } void -pbft_failure_detector::request_seen(const pbft_request& req) +pbft_failure_detector::request_seen(const bzn::hash_t& req_hash) { std::lock_guard lock(this->lock); - hash_t req_hash = pbft_operation::request_hash(req); - if (this->outstanding_requests.count(req_hash) == 0 && this->completed_requests.count(req_hash) == 0) { - LOG(debug) << "Failure detector recording new request " << req.ShortDebugString() << '\n'; - this->ordered_requests.emplace_back(req); + LOG(debug) << "Failure detector recording new request " << req_hash << '\n'; + this->ordered_requests.emplace_back(req_hash); this->outstanding_requests.emplace(req_hash); if (this->ordered_requests.size() == 1) @@ -80,12 +78,10 @@ pbft_failure_detector::request_seen(const pbft_request& req) } void -pbft_failure_detector::request_executed(const pbft_request& req) +pbft_failure_detector::request_executed(const bzn::hash_t& req_hash) { std::lock_guard lock(this->lock); - hash_t req_hash = pbft_operation::request_hash(req); - this->outstanding_requests.erase(req_hash); this->completed_requests.emplace(req_hash); // TODO KEP-538: Need to garbage collect completed_requests eventually diff --git a/pbft/pbft_failure_detector.hpp b/pbft/pbft_failure_detector.hpp index cca388d6..9179a4c2 100644 --- a/pbft/pbft_failure_detector.hpp +++ b/pbft/pbft_failure_detector.hpp @@ -26,9 +26,9 @@ namespace bzn public: pbft_failure_detector(std::shared_ptr); - void request_seen(const pbft_request& req) override; + void request_seen(const bzn::hash_t& req_hash) override; - void request_executed(const pbft_request& req) override; + void request_executed(const bzn::hash_t& req_hash) override; void register_failure_handler(std::function handler) override; @@ -41,7 +41,7 @@ namespace bzn std::unique_ptr request_progress_timer; - std::list ordered_requests; + std::list ordered_requests; std::unordered_set outstanding_requests; std::unordered_set completed_requests; diff --git a/pbft/pbft_failure_detector_base.hpp b/pbft/pbft_failure_detector_base.hpp index 0bc09bd6..6c8e40e6 100644 --- a/pbft/pbft_failure_detector_base.hpp +++ b/pbft/pbft_failure_detector_base.hpp @@ -14,6 +14,7 @@ #pragma once +#include #include #include @@ -24,9 +25,9 @@ namespace bzn { public: - virtual void request_seen(const pbft_request& req) = 0; + virtual void request_seen(const bzn::hash_t& req_hash) = 0; - virtual void request_executed(const pbft_request& req) = 0; + virtual void request_executed(const bzn::hash_t& req_hash) = 0; virtual void register_failure_handler(std::function handler) = 0; diff --git a/pbft/pbft_operation.cpp b/pbft/pbft_operation.cpp index c438fccb..4b896688 100644 --- a/pbft/pbft_operation.cpp +++ b/pbft/pbft_operation.cpp @@ -18,28 +18,71 @@ using namespace bzn; -pbft_operation::pbft_operation(uint64_t view, uint64_t sequence, pbft_request request, std::shared_ptr> peers) +pbft_operation::pbft_operation(uint64_t view, uint64_t sequence, const bzn::hash_t& request_hash, std::shared_ptr> peers) : view(view) , sequence(sequence) - , request(std::move(request)) + , request_hash(request_hash) , peers(std::move(peers)) { } +bool +pbft_operation::has_request() const +{ + return this->request_saved; +} + +const pbft_request& +pbft_operation::get_request() const +{ + if (!this->request_saved) + { + throw std::runtime_error("Tried to get a request that is not saved"); + } + + return this->parsed_request; +} + +const bzn::encoded_message& +pbft_operation::get_encoded_request() const +{ + if (!this->request_saved) + { + throw std::runtime_error("Tried to get a request that is not saved"); + } + + return this->encoded_request; +} + void -pbft_operation::record_preprepare(const wrapped_bzn_msg& /*encoded_preprepare*/) +pbft_operation::record_request(const bzn::encoded_message& wrapped_request) +{ + // not actually parsing the request because, for now, its still json + this->encoded_request = wrapped_request; + + if (this->parsed_request.ParseFromString(wrapped_request)) + { + LOG(error) << "Tried to record request as something not a valid request (perhaps an old json request?)"; + // We don't consider this a failure case, for now, because it could be an old json request + } + + this->request_saved = true; +} + +void +pbft_operation::record_preprepare(const bzn_envelope& /*encoded_preprepare*/) { this->preprepare_seen = true; } bool -pbft_operation::has_preprepare() +pbft_operation::has_preprepare() const { return this->preprepare_seen; } void -pbft_operation::record_prepare(const wrapped_bzn_msg& encoded_prepare) +pbft_operation::record_prepare(const bzn_envelope& encoded_prepare) { // TODO: Save message this->prepares_seen.insert(encoded_prepare.sender()); @@ -52,19 +95,19 @@ pbft_operation::faulty_nodes_bound() const } bool -pbft_operation::is_prepared() +pbft_operation::is_prepared() const { - return this->has_preprepare() && this->prepares_seen.size() > 2 * this->faulty_nodes_bound(); + return this->has_request() && this->has_preprepare() && this->prepares_seen.size() > 2 * this->faulty_nodes_bound(); } void -pbft_operation::record_commit(const wrapped_bzn_msg& encoded_commit) +pbft_operation::record_commit(const bzn_envelope& encoded_commit) { this->commits_seen.insert(encoded_commit.sender()); } bool -pbft_operation::is_committed() +pbft_operation::is_committed() const { return this->is_prepared() && this->commits_seen.size() > 2 * this->faulty_nodes_bound(); } @@ -92,28 +135,21 @@ pbft_operation::end_commit_phase() } operation_key_t -pbft_operation::get_operation_key() +pbft_operation::get_operation_key() const { - return std::tuple(this->view, this->sequence, request_hash(this->request)); + return {this->view, this->sequence, this->request_hash}; } pbft_operation_state -pbft_operation::get_state() +pbft_operation::get_state() const { return this->state; } std::string -pbft_operation::debug_string() -{ - return boost::str(boost::format("(v%1%, s%2%) - %3%") % this->view % this->sequence % this->request.ShortDebugString()); -} - -bzn::hash_t -pbft_operation::request_hash(const pbft_request& req) +pbft_operation::debug_string() const { - // TODO: Actually hash the request; KEP-466 - return req.ShortDebugString(); + return boost::str(boost::format("(v%1%, s%2%) - %3%") % this->view % this->sequence % this->parsed_request.ShortDebugString()); } void @@ -123,7 +159,7 @@ pbft_operation::set_session(std::weak_ptr session) } std::weak_ptr -pbft_operation::session() +pbft_operation::session() const { return this->listener_session; } diff --git a/pbft/pbft_operation.hpp b/pbft/pbft_operation.hpp index 1b27c710..ed4bad90 100644 --- a/pbft/pbft_operation.hpp +++ b/pbft/pbft_operation.hpp @@ -15,7 +15,7 @@ #pragma once #include -#include +#include #include #include #include @@ -23,7 +23,6 @@ namespace bzn { - using hash_t = std::string; // View, sequence using operation_key_t = std::tuple; @@ -40,34 +39,38 @@ namespace bzn { public: - pbft_operation(uint64_t view, uint64_t sequence, pbft_request msg, std::shared_ptr> peers); + pbft_operation(uint64_t view, uint64_t sequence, const bzn::hash_t& request_hash, std::shared_ptr> peers); void set_session(std::weak_ptr); - static hash_t request_hash(const pbft_request& req); + operation_key_t get_operation_key() const; + pbft_operation_state get_state() const; - operation_key_t get_operation_key(); - pbft_operation_state get_state(); + void record_preprepare(const bzn_envelope& encoded_preprepare); + bool has_preprepare() const; - void record_preprepare(const wrapped_bzn_msg& encoded_preprepare); - bool has_preprepare(); + void record_prepare(const bzn_envelope& encoded_prepare); + bool is_prepared() const; - void record_prepare(const wrapped_bzn_msg& encoded_prepare); - bool is_prepared(); - - void record_commit(const wrapped_bzn_msg& encoded_commit); - bool is_committed(); + void record_commit(const bzn_envelope& encoded_commit); + bool is_committed() const; void begin_commit_phase(); void end_commit_phase(); - std::weak_ptr session(); + std::weak_ptr session() const; + + const pbft_request& get_request() const; + const bzn::encoded_message& get_encoded_request() const; + + void record_request(const bzn::encoded_message& encoded_request); + bool has_request() const; const uint64_t view; const uint64_t sequence; - const pbft_request request; + const bzn::hash_t request_hash; - std::string debug_string(); + std::string debug_string() const; size_t faulty_nodes_bound() const; @@ -82,5 +85,9 @@ namespace bzn std::weak_ptr listener_session; + bzn::encoded_message encoded_request; + pbft_request parsed_request; + + bool request_saved = false; }; } diff --git a/pbft/pbft_service_base.hpp b/pbft/pbft_service_base.hpp index c1342bcd..79d0fdc7 100644 --- a/pbft/pbft_service_base.hpp +++ b/pbft/pbft_service_base.hpp @@ -21,7 +21,7 @@ namespace bzn { - using execute_handler_t = std::function; + using execute_handler_t = std::function)>; class pbft_service_base { diff --git a/pbft/test/CMakeLists.txt b/pbft/test/CMakeLists.txt index 774a6202..564ba52a 100644 --- a/pbft/test/CMakeLists.txt +++ b/pbft/test/CMakeLists.txt @@ -9,6 +9,6 @@ set(test_srcs pbft_config_store_test.cpp pbft_join_leave_test.cpp database_pbft_service_test.cpp) -set(test_libs pbft ${Protobuf_LIBRARIES} bootstrap storage) +set(test_libs pbft crypto options ${Protobuf_LIBRARIES} bootstrap storage) add_gmock_test(pbft) diff --git a/pbft/test/database_pbft_service_test.cpp b/pbft/test/database_pbft_service_test.cpp index 102d05a3..7a145a79 100644 --- a/pbft/test/database_pbft_service_test.cpp +++ b/pbft/test/database_pbft_service_test.cpp @@ -73,8 +73,8 @@ TEST(database_pbft_service, test_that_failed_storing_of_operation_throws) EXPECT_CALL(*mock_storage, create(_, _, _)).WillOnce(Return(bzn::storage_base::result::exists)); EXPECT_CALL(*mock_storage, update(_, _, _)).WillOnce(Return(bzn::storage_base::result::ok)); - pbft_request msg; - auto operation = std::make_shared(0, 1, msg, nullptr); + auto operation = std::make_shared(0, 1, "somehash", nullptr); + operation->record_request("pretend this is a request"); EXPECT_THROW(dps.apply_operation(operation), std::runtime_error); } @@ -89,13 +89,13 @@ TEST(database_pbft_service, test_that_stored_operation_is_executed_in_order_and_ bzn::database_pbft_service dps(mock_io_context, mem_storage, mock_crud, TEST_UUID); pbft_request msg; - msg.mutable_operation()->mutable_header()->set_db_uuid(TEST_UUID); msg.mutable_operation()->mutable_header()->set_transaction_id(uint64_t(123)); msg.mutable_operation()->mutable_create()->set_key("key2"); msg.mutable_operation()->mutable_create()->set_value("value2"); - auto operation2 = std::make_shared(0, 2, msg, nullptr); + auto operation2 = std::make_shared(0, 2, "somehasha", nullptr); + operation2->record_request(msg.SerializeAsString()); dps.apply_operation(operation2); @@ -106,7 +106,8 @@ TEST(database_pbft_service, test_that_stored_operation_is_executed_in_order_and_ msg.mutable_operation()->mutable_create()->set_value("value3"); auto mock_session = std::make_shared(); - auto operation3 = std::make_shared(0, 3, msg, nullptr); + auto operation3 = std::make_shared(0, 3, "somehashb", nullptr); + operation3->record_request(msg.SerializeAsString()); operation3->set_session(mock_session); dps.apply_operation(operation3); @@ -117,7 +118,8 @@ TEST(database_pbft_service, test_that_stored_operation_is_executed_in_order_and_ msg.mutable_operation()->mutable_create()->set_key("key1"); msg.mutable_operation()->mutable_create()->set_value("value1"); - auto operation1 = std::make_shared(0, 1, msg, nullptr); + auto operation1 = std::make_shared(0, 1, "somehashc", nullptr); + operation1->record_request(msg.SerializeAsString()); operation1->set_session(std::make_shared()); EXPECT_CALL(*mock_io_context, post(_)).Times(3); diff --git a/pbft/test/pbft_audit_test.cpp b/pbft/test/pbft_audit_test.cpp index ba95418d..0b6ca6c9 100644 --- a/pbft/test/pbft_audit_test.cpp +++ b/pbft/test/pbft_audit_test.cpp @@ -25,7 +25,7 @@ namespace bzn::test this->build_pbft(); this->pbft->set_audit_enabled(true); - wrapped_bzn_msg dummy_original_msg; + bzn_envelope dummy_original_msg; pbft_msg preprepare = pbft_msg(this->preprepare_msg); preprepare.set_sequence(1); this->pbft->handle_message(preprepare, dummy_original_msg); @@ -33,9 +33,9 @@ namespace bzn::test for (const auto& peer : TEST_PEER_LIST) { pbft_msg prepare = pbft_msg(preprepare); - wrapped_bzn_msg prepare_wrap; + bzn_envelope prepare_wrap; pbft_msg commit = pbft_msg(preprepare); - wrapped_bzn_msg commit_wrap; + bzn_envelope commit_wrap; prepare.set_type(PBFT_MSG_PREPARE); prepare_wrap.set_sender(peer.uuid); commit.set_type(PBFT_MSG_COMMIT); diff --git a/pbft/test/pbft_checkpoint_tests.cpp b/pbft/test/pbft_checkpoint_tests.cpp index 768ab07d..f177fdd5 100644 --- a/pbft/test/pbft_checkpoint_tests.cpp +++ b/pbft/test/pbft_checkpoint_tests.cpp @@ -50,7 +50,8 @@ namespace bzn::test .Times(Exactly(TEST_PEER_LIST.size())); this->build_pbft(); - this->service_execute_handler(this->request_msg, CHECKPOINT_INTERVAL); + auto op = std::make_shared(1, CHECKPOINT_INTERVAL, "somehash", nullptr); + this->service_execute_handler(op); } TEST_F(pbft_checkpoint_test, no_checkpoint_on_message_before_local_state) @@ -69,7 +70,7 @@ namespace bzn::test TEST_F(pbft_checkpoint_test, unstable_checkpoint_on_local_state_before_message) { this->build_pbft(); - this->service_execute_handler(this->request_msg, CHECKPOINT_INTERVAL); + this->service_execute_handler(std::make_shared(1, CHECKPOINT_INTERVAL, "somehash", nullptr)); EXPECT_EQ(CHECKPOINT_INTERVAL, this->pbft->latest_checkpoint().first); EXPECT_EQ(0u, this->pbft->latest_stable_checkpoint().first); @@ -79,7 +80,7 @@ namespace bzn::test TEST_F(pbft_checkpoint_test, stable_checkpoint_on_message_after_local_state) { this->build_pbft(); - this->service_execute_handler(this->request_msg, CHECKPOINT_INTERVAL); + this->service_execute_handler(std::make_shared(1, CHECKPOINT_INTERVAL, "somehash", nullptr)); for (const auto& peer : TEST_PEER_LIST) { pbft_msg msg = cp1_msg; @@ -99,7 +100,7 @@ namespace bzn::test pbft_msg msg = cp1_msg; this->pbft->handle_message(msg, from(peer.uuid)); } - this->service_execute_handler(this->request_msg, CHECKPOINT_INTERVAL); + this->service_execute_handler(std::make_shared(1, CHECKPOINT_INTERVAL, "somehash", nullptr)); EXPECT_EQ(CHECKPOINT_INTERVAL, this->pbft->latest_checkpoint().first); EXPECT_EQ(CHECKPOINT_INTERVAL, this->pbft->latest_stable_checkpoint().first); @@ -109,13 +110,13 @@ namespace bzn::test TEST_F(pbft_checkpoint_test, unstable_checkpoint_does_not_discard_stable_checkpoint) { this->build_pbft(); - this->service_execute_handler(this->request_msg, CHECKPOINT_INTERVAL); + this->service_execute_handler(std::make_shared(1, CHECKPOINT_INTERVAL, "somehash", nullptr)); for (const auto& peer : TEST_PEER_LIST) { pbft_msg msg = cp1_msg; this->pbft->handle_message(msg, from(peer.uuid)); } - this->service_execute_handler(this->request_msg, CHECKPOINT_INTERVAL*2); + this->service_execute_handler(std::make_shared(1, CHECKPOINT_INTERVAL*2, "somehash", nullptr)); EXPECT_EQ(CHECKPOINT_INTERVAL*2, this->pbft->latest_checkpoint().first); EXPECT_EQ(CHECKPOINT_INTERVAL, this->pbft->latest_stable_checkpoint().first); @@ -125,13 +126,13 @@ namespace bzn::test TEST_F(pbft_checkpoint_test, stable_checkpoint_discards_old_stable_checkpoint) { this->build_pbft(); - this->service_execute_handler(this->request_msg, CHECKPOINT_INTERVAL); + this->service_execute_handler(std::make_shared(1, CHECKPOINT_INTERVAL, "somehash", nullptr)); for (const auto& peer : TEST_PEER_LIST) { pbft_msg msg = cp1_msg; this->pbft->handle_message(msg, from(peer.uuid)); } - this->service_execute_handler(this->request_msg, CHECKPOINT_INTERVAL*2); + this->service_execute_handler(std::make_shared(1, CHECKPOINT_INTERVAL*2, "somehash", nullptr)); for (const auto& peer : TEST_PEER_LIST) { pbft_msg msg = cp1_msg; @@ -154,14 +155,14 @@ namespace bzn::test msg.set_type(PBFT_MSG_PREPREPARE); msg.set_view(1); msg.set_sequence(i); - msg.set_allocated_request(new pbft_request(this->request_msg)); + msg.set_request_hash("somehash"); this->pbft->handle_message(msg, default_original_msg); } EXPECT_EQ(9u, this->pbft->outstanding_operations_count()); - this->service_execute_handler(this->request_msg, CHECKPOINT_INTERVAL); + this->service_execute_handler(std::make_shared(1, CHECKPOINT_INTERVAL, "somehash", nullptr)); for (const auto& peer : TEST_PEER_LIST) { pbft_msg msg = cp1_msg; @@ -187,7 +188,7 @@ namespace bzn::test uint64_t initial_low = this->pbft->get_low_water_mark(); uint64_t initial_high = this->pbft->get_high_water_mark(); - this->service_execute_handler(this->request_msg, CHECKPOINT_INTERVAL); + this->service_execute_handler(std::make_shared(1, CHECKPOINT_INTERVAL, "somehash", nullptr)); EXPECT_EQ(this->pbft->get_high_water_mark(), initial_high); EXPECT_EQ(this->pbft->get_low_water_mark(), initial_low); diff --git a/pbft/test/pbft_failure_detector_test.cpp b/pbft/test/pbft_failure_detector_test.cpp index 57364ad6..f9c0ee12 100644 --- a/pbft/test/pbft_failure_detector_test.cpp +++ b/pbft/test/pbft_failure_detector_test.cpp @@ -40,8 +40,8 @@ namespace this->failure_detected = true; } - pbft_request req_a; - pbft_request req_b; + bzn::hash_t req_a = "a"; + bzn::hash_t req_b = "b"; void build_failure_detector() { @@ -62,15 +62,6 @@ namespace .WillRepeatedly(Invoke( [&](auto handler){this->request_timer_callback = handler;} )); - - //req_a.set_operation("do something"); - //req_b.set_operation("do something else"); - - req_a.set_client("alice"); - req_b.set_client("bob"); - - req_a.set_timestamp(1); - req_b.set_timestamp(2); } }; diff --git a/pbft/test/pbft_join_leave_test.cpp b/pbft/test/pbft_join_leave_test.cpp index eec410b3..63289e01 100644 --- a/pbft/test/pbft_join_leave_test.cpp +++ b/pbft/test/pbft_join_leave_test.cpp @@ -22,26 +22,52 @@ namespace bzn const bzn::peer_address_t new_peer{"127.0.0.1", 8090, 83, "name_new", "uuid_new"}; const bzn::peer_address_t new_peer2{"127.0.0.1", 8091, 84, "name_new2", "uuid_new2"}; - MATCHER_P2(message_is_correct_type, msg_type, req_type, "") + std::optional extract_pbft_msg_option(std::string m) { - wrapped_bzn_msg message; - if (message.ParseFromString(*arg)) + bzn_envelope message; + if (message.ParseFromString(m)) { - if (message.type() == bzn_msg_type::BZN_MSG_PBFT) + if (message.payload_case() == bzn_envelope::kPbft) { pbft_msg pmsg; - if (pmsg.ParseFromString(message.payload())) + if (pmsg.ParseFromString(message.pbft())) { - if (pmsg.type() == msg_type) - { - auto req = pmsg.request(); - if (req.type() == req_type) - { - return true; - } - } + return pmsg; } + } + } + return {}; + } + + MATCHER_P(message_has_correct_req_hash, req_hash, "") + { + auto pmsg = extract_pbft_msg_option(*arg); + if (pmsg) + { + return pmsg->request_hash() == req_hash; + } + return false; + } + + MATCHER_P(message_has_correct_pbft_type, pbft_type, "") + { + auto pmsg = extract_pbft_msg_option(*arg); + if (pmsg) + { + return pmsg->type() == pbft_type; + } + return false; + } + MATCHER_P(message_has_req_with_correct_type, req_type, "") + { + auto pmsg = extract_pbft_msg_option(*arg); + if (pmsg) + { + pbft_request req; + if (req.ParseFromString(pmsg->request())) + { + return req.type() == req_type; } } return false; @@ -64,14 +90,17 @@ namespace bzn preprepare.set_view(1); preprepare.set_sequence(100); preprepare.set_type(PBFT_MSG_PREPREPARE); - preprepare.set_allocated_request(req); + preprepare.set_request(req->SerializeAsString()); + auto crypto = std::make_shared(std::make_shared()); + auto expect_hash = crypto->hash(preprepare.request()); + preprepare.set_request_hash(expect_hash); // receiving node should send out prepare messsage to everyone for (auto const &p : TEST_PEER_LIST) { EXPECT_CALL(*(mock_node), send_message_str(bzn::make_endpoint(p), - message_is_correct_type(PBFT_MSG_PREPARE, PBFT_REQ_NEW_CONFIG))) + AllOf(message_has_correct_req_hash(expect_hash), message_has_correct_pbft_type(PBFT_MSG_PREPARE)))) .Times(Exactly(1)); } @@ -88,7 +117,7 @@ namespace bzn prepare.set_view(op->view); prepare.set_sequence(op->sequence); prepare.set_type(PBFT_MSG_PREPARE); - prepare.set_allocated_request(new pbft_request(op->request)); + prepare.set_request_hash(op->request_hash); auto wmsg = wrap_pbft_msg(prepare); wmsg.set_sender(node.uuid); @@ -102,19 +131,18 @@ namespace bzn commit.set_view(op->view); commit.set_sequence(op->sequence); commit.set_type(PBFT_MSG_COMMIT); - commit.set_allocated_request(new pbft_request(op->request)); + commit.set_request_hash(op->request_hash); auto wmsg = wrap_pbft_msg(commit); wmsg.set_sender(node.uuid); pbft->handle_message(commit, wmsg); } - wrapped_bzn_msg + bzn_envelope wrap_pbft_membership_msg(const pbft_membership_msg& msg) { - wrapped_bzn_msg result; - result.set_payload(msg.SerializeAsString()); - result.set_type(bzn_msg_type::BZN_MSG_PBFT_MEMBERSHIP); + bzn_envelope result; + result.set_pbft_membership(msg.SerializeAsString()); return result; } } @@ -139,7 +167,7 @@ namespace bzn { EXPECT_CALL(*(this->mock_node), send_message_str(bzn::make_endpoint(p), - message_is_correct_type(PBFT_MSG_PREPREPARE, PBFT_REQ_NEW_CONFIG))) + AllOf(message_has_req_with_correct_type(PBFT_REQ_NEW_CONFIG), message_has_correct_pbft_type(PBFT_MSG_PREPREPARE)))) .Times(Exactly(1)); } @@ -167,9 +195,9 @@ namespace bzn for (auto const &p : TEST_PEER_LIST) { EXPECT_CALL(*(this->mock_node), - send_message_str(bzn::make_endpoint(p), - message_is_correct_type(PBFT_MSG_PREPREPARE, PBFT_REQ_NEW_CONFIG))) - .Times(Exactly(1)); + send_message_str(bzn::make_endpoint(p), + AllOf(message_has_req_with_correct_type(PBFT_REQ_NEW_CONFIG), message_has_correct_pbft_type(PBFT_MSG_PREPREPARE)))) + .Times(Exactly(1)); } auto wmsg = wrap_pbft_membership_msg(leave_msg); @@ -241,7 +269,7 @@ namespace bzn { EXPECT_CALL(*(mock_node), send_message_str(bzn::make_endpoint(p), - message_is_correct_type(PBFT_MSG_COMMIT, PBFT_REQ_NEW_CONFIG))) + AllOf(message_has_correct_req_hash(msg.request_hash()), message_has_correct_pbft_type(PBFT_MSG_COMMIT)))) .Times(Exactly(1)); } @@ -279,7 +307,7 @@ namespace bzn { EXPECT_CALL(*(mock_node), send_message_str(bzn::make_endpoint(p), - message_is_correct_type(PBFT_MSG_COMMIT, PBFT_REQ_NEW_CONFIG))) + AllOf(message_has_correct_req_hash(msg.request_hash()), message_has_correct_pbft_type(PBFT_MSG_COMMIT)))) .Times(Exactly(1)); } diff --git a/pbft/test/pbft_operation_test.cpp b/pbft/test/pbft_operation_test.cpp index fa5e0f78..db333907 100644 --- a/pbft/test/pbft_operation_test.cpp +++ b/pbft/test/pbft_operation_test.cpp @@ -40,15 +40,16 @@ namespace { public: pbft_request request; + bzn::hash_t request_hash = "somehash"; uint64_t view = 6; uint64_t sequence = 19; bzn::pbft_operation op; - wrapped_bzn_msg empty_original_msg; + bzn_envelope empty_original_msg; pbft_operation_test() - : op(view, sequence, request, std::make_shared>(TEST_PEER_LIST)) + : op(view, sequence, request_hash, std::make_shared>(TEST_PEER_LIST)) { } }; @@ -62,25 +63,41 @@ namespace TEST_F(pbft_operation_test, prepared_after_all_msgs) { - wrapped_bzn_msg preprepare; + bzn_envelope preprepare; this->op.record_preprepare(preprepare); for (const auto& peer : TEST_PEER_LIST) { - wrapped_bzn_msg msg; + bzn_envelope msg; msg.set_sender(peer.uuid); op.record_prepare(msg); + op.record_request("pretend this is a request"); } EXPECT_TRUE(this->op.is_prepared()); } + TEST_F(pbft_operation_test, not_prepared_without_request) + { + bzn_envelope preprepare; + this->op.record_preprepare(preprepare); + + for (const auto& peer : TEST_PEER_LIST) + { + bzn_envelope msg; + msg.set_sender(peer.uuid); + op.record_prepare(msg); + } + + EXPECT_FALSE(this->op.is_prepared()); + } + TEST_F(pbft_operation_test, not_prepared_without_preprepare) { for (const auto& peer : TEST_PEER_LIST) { - wrapped_bzn_msg msg; + bzn_envelope msg; msg.set_sender(peer.uuid); op.record_prepare(msg); } @@ -91,12 +108,12 @@ namespace TEST_F(pbft_operation_test, not_prepared_with_2f) { - wrapped_bzn_msg preprepare; + bzn_envelope preprepare; this->op.record_preprepare(preprepare); for (const auto& peer : TEST_2F_PEER_LIST) { - wrapped_bzn_msg msg; + bzn_envelope msg; msg.set_sender(peer.uuid); op.record_prepare(msg); } @@ -107,14 +124,15 @@ namespace TEST_F(pbft_operation_test, prepared_with_2f_PLUS_1) { - wrapped_bzn_msg preprepare; + bzn_envelope preprepare; this->op.record_preprepare(preprepare); for (const auto& peer : TEST_2F_PLUS_1_PEER_LIST) { - wrapped_bzn_msg msg; + bzn_envelope msg; msg.set_sender(peer.uuid); op.record_prepare(msg); + op.record_request("pretend this is a request"); } EXPECT_TRUE(this->op.is_prepared()); diff --git a/pbft/test/pbft_test.cpp b/pbft/test/pbft_test.cpp index 55839aa6..3df59daa 100644 --- a/pbft/test/pbft_test.cpp +++ b/pbft/test/pbft_test.cpp @@ -123,19 +123,11 @@ namespace bzn::test this->build_pbft(); EXPECT_CALL(*mock_node, send_message_str(_, _)).Times(Exactly(TEST_PEER_LIST.size())); - pbft_msg prepreparea(this->preprepare_msg); - pbft_msg preprepareb(this->preprepare_msg); - pbft_msg prepreparec(this->preprepare_msg); - pbft_msg preprepared(this->preprepare_msg); - - preprepareb.mutable_request()->set_timestamp(99); - prepreparec.mutable_request()->mutable_operation(); - preprepared.mutable_request()->set_client("certainly not bob"); - - this->pbft->handle_message(prepreparea, default_original_msg); - this->pbft->handle_message(preprepareb, default_original_msg); - this->pbft->handle_message(prepreparec, default_original_msg); - this->pbft->handle_message(preprepared, default_original_msg); + pbft_msg preprepare2(this->preprepare_msg); + preprepare2.set_request_hash("some other hash"); + + this->pbft->handle_message(this->preprepare_msg, default_original_msg); + this->pbft->handle_message(preprepare2, default_original_msg); } TEST_F(pbft_test, test_commit_messages_sent) @@ -220,11 +212,11 @@ namespace bzn::test pbft_request msg; auto peers = std::make_shared>(); - auto op = std::make_shared(1, 1, msg, peers); + auto op = std::make_shared(1, 1, "somehash", peers); op->set_session(mock_session); dummy_pbft_service service(this->mock_io_context); - service.register_execute_handler([](auto, auto){}); + service.register_execute_handler([](auto){}); service.apply_operation(op); } diff --git a/pbft/test/pbft_test_common.cpp b/pbft/test/pbft_test_common.cpp index db72fc12..1b3e4de7 100644 --- a/pbft/test/pbft_test_common.cpp +++ b/pbft/test/pbft_test_common.cpp @@ -21,7 +21,7 @@ namespace bzn::test // This pattern copied from audit_test, to allow us to declare expectations on the timer that pbft will // construct - EXPECT_CALL(*(this->mock_node), register_for_message(bzn_msg_type::BZN_MSG_PBFT, _)) + EXPECT_CALL(*(this->mock_node), register_for_message(bzn_envelope::kPbft, _)) .Times(Exactly(1)) .WillOnce( Invoke( @@ -32,7 +32,7 @@ namespace bzn::test } )); - EXPECT_CALL(*(this->mock_node), register_for_message(bzn_msg_type::BZN_MSG_PBFT_MEMBERSHIP, _)) + EXPECT_CALL(*(this->mock_node), register_for_message(bzn_envelope::kPbftMembership, _)) .Times(Exactly(1)) .WillOnce( Invoke( @@ -86,8 +86,8 @@ namespace bzn::test preprepare_msg.set_type(PBFT_MSG_PREPREPARE); preprepare_msg.set_sequence(19); preprepare_msg.set_view(1); - preprepare_msg.mutable_request()->set_client("bob"); - preprepare_msg.mutable_request()->set_timestamp(1); + preprepare_msg.set_request("hi"); + preprepare_msg.set_request_hash(this->crypto->hash("hi")); } void @@ -100,6 +100,7 @@ namespace bzn::test , this->uuid , this->mock_service , this->mock_failure_detector + , this->crypto ); this->pbft->set_audit_enabled(false); this->pbft->start(); @@ -120,27 +121,26 @@ namespace bzn::test pbft_msg extract_pbft_msg(std::string msg) { - wrapped_bzn_msg outer; + bzn_envelope outer; outer.ParseFromString(msg); pbft_msg result; - result.ParseFromString(outer.payload()); + result.ParseFromString(outer.pbft()); return result; } std::string extract_sender(std::string msg) { - wrapped_bzn_msg outer; + bzn_envelope outer; outer.ParseFromString(msg); return outer.sender(); } - wrapped_bzn_msg + bzn_envelope wrap_pbft_msg(const pbft_msg& msg) { - wrapped_bzn_msg result; - result.set_payload(msg.SerializeAsString()); - result.set_type(bzn_msg_type::BZN_MSG_PBFT); + bzn_envelope result; + result.set_pbft(msg.SerializeAsString()); return result; } @@ -191,10 +191,10 @@ namespace bzn::test return json["bzn-api"] == "audit"; } - wrapped_bzn_msg + bzn_envelope from(uuid_t uuid) { - wrapped_bzn_msg result; + bzn_envelope result; result.set_sender(uuid); return result; } diff --git a/pbft/test/pbft_test_common.hpp b/pbft/test/pbft_test_common.hpp index 7b762fd7..1a36f97a 100644 --- a/pbft/test/pbft_test_common.hpp +++ b/pbft/test/pbft_test_common.hpp @@ -28,6 +28,8 @@ #include #include #include +#include +#include using namespace ::testing; @@ -49,7 +51,7 @@ namespace bzn::test pbft_request request_msg; pbft_msg preprepare_msg; - wrapped_bzn_msg default_original_msg; + bzn_envelope default_original_msg; std::shared_ptr mock_io_context = std::make_shared>(); @@ -61,6 +63,9 @@ namespace bzn::test std::shared_ptr mock_session = std::make_shared>(); + std::shared_ptr options = std::make_shared(); + std::shared_ptr crypto = std::make_shared(options); + std::shared_ptr pbft; std::unique_ptr audit_heartbeat_timer = @@ -68,7 +73,7 @@ namespace bzn::test bzn::asio::wait_handler audit_heartbeat_timer_callback; - std::function service_execute_handler; + bzn::execute_handler_t service_execute_handler; bzn::protobuf_handler message_handler; bzn::message_handler database_handler; @@ -87,7 +92,7 @@ namespace bzn::test pbft_msg extract_pbft_msg(std::string msg); uuid_t extract_sender(std::string msg); - wrapped_bzn_msg + bzn_envelope wrap_pbft_msg(const pbft_msg& msg); bzn::json_message @@ -99,5 +104,5 @@ namespace bzn::test bool is_checkpoint(std::shared_ptr msg); bool is_audit(std::shared_ptr msg); - wrapped_bzn_msg from(uuid_t uuid); + bzn_envelope from(uuid_t uuid); } diff --git a/proto/CMakeLists.txt b/proto/CMakeLists.txt index df0aa5de..a3bba5fc 100644 --- a/proto/CMakeLists.txt +++ b/proto/CMakeLists.txt @@ -1,4 +1,4 @@ -protobuf_generate_cpp(PROTO_SRC PROTO_HEADER bluzelle.proto database.proto pbft.proto audit.proto) +protobuf_generate_cpp(PROTO_SRC PROTO_HEADER bluzelle.proto database.proto pbft.proto audit.proto status.proto) add_library(proto ${PROTO_HEADER} ${PROTO_SRC}) set_target_properties(proto PROPERTIES COMPILE_FLAGS "-Wno-unused") set(PROTO_INCLUDE_DIR ${CMAKE_BINARY_DIR}/proto) diff --git a/proto/bluzelle.proto b/proto/bluzelle.proto index 0ce3d4b2..cd760914 100644 --- a/proto/bluzelle.proto +++ b/proto/bluzelle.proto @@ -15,34 +15,30 @@ syntax = "proto3"; import "database.proto"; -import "audit.proto"; -import "pbft.proto"; + +message bzn_envelope +{ + string sender = 1; + bytes signature = 2; + + oneof payload + { + bytes pbft_request = 3; + bytes database_response = 4; + bytes json = 5; + bytes audit = 6; + bytes pbft = 7; + bytes pbft_membership = 8; + bytes status_request = 9; + } +} message bzn_msg { + // Keeping this around for raft oneof msg { database_msg db = 10; string json = 11; - audit_message audit_message = 12; - pbft_msg pbft = 13; - pbft_membership_msg membership = 14; } } - -enum bzn_msg_type -{ - BZN_MSG_UNDEFINED = 0; - BZN_MSG_PBFT = 1; - BZN_MSG_PBFT_MEMBERSHIP = 2; -} - -message wrapped_bzn_msg -{ - // This is stored as a serialized string because we need to sign it, and serialization is not guarenteed to be deterministic - bytes payload = 1; - bzn_msg_type type = 2; - - string sender = 3; - bytes signature = 4; -} diff --git a/proto/pbft.proto b/proto/pbft.proto index 3e9d2315..7c8fb475 100644 --- a/proto/pbft.proto +++ b/proto/pbft.proto @@ -26,8 +26,10 @@ message pbft_msg uint64 sequence = 3; // used for preprepare, prepare, commit - // TODO: Most messages should contain only the hash of the request - KEP-344 - pbft_request request = 4; + bytes request_hash = 5; + + // most messages should only have the hash, not the original request + bytes request = 4; // for checkpoints string state_hash = 6; diff --git a/proto/status.proto b/proto/status.proto new file mode 100644 index 00000000..99e81ad9 --- /dev/null +++ b/proto/status.proto @@ -0,0 +1,26 @@ +// Copyright (C) 2018 Bluzelle +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License, version 3, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +syntax = "proto3"; + +message status_request {} + +message status_response +{ + string swarm_version = 1; + string swarm_git_commit = 2; + string uptime = 3; + string module_status_json = 4; + bool pbft_enabled = 5; +} diff --git a/raft/raft.cpp b/raft/raft.cpp index 3964227e..a8014b39 100644 --- a/raft/raft.cpp +++ b/raft/raft.cpp @@ -18,7 +18,7 @@ #include #include #include -#include +#include #include #include diff --git a/scripts/crud b/scripts/crud index bee791af..bfd264cc 100755 --- a/scripts/crud +++ b/scripts/crud @@ -32,6 +32,7 @@ try: import bluzelle_pb2 import database_pb2 import pbft_pb2 + import status_pb2 except ImportError as e: raise ImportError("{}\n\nTo generate Bluzelle protobuf modules:\n" "\n" @@ -169,11 +170,13 @@ def size_request(): msg.db.size.SetInParent() return msg + def subscribe_request(key): msg = bluzelle_pb2.bzn_msg() msg.db.subscribe.key = key return msg + def unsubscribe_request(key): global transaction_id msg = bluzelle_pb2.bzn_msg() @@ -181,8 +184,29 @@ def unsubscribe_request(key): msg.db.unsubscribe.transaction_id = transaction_id return msg + def status_handler(args): - return send_status_request(args.node) + global use_pbft + + if not use_pbft: + return send_status_request(args.node) + + ws = websocket.create_connection("ws://" + args.node) + msg_outer = bluzelle_pb2.bzn_envelope() + msg_inner = status_pb2.status_request() + msg_outer.status_request = msg_inner.SerializeToString() + + print("Sending: \n{}".format(msg_outer).expandtabs(4)) + ws.send_binary(msg_outer.SerializeToString()) + resp = status_pb2.status_response() + resp.ParseFromString(ws.recv()) + + print("Response: \n{}".format(resp).expandtabs(4)) + modules = json.loads(resp.module_status_json) + print("{}".format(json.dumps(modules,indent=4))) + print("-" * 60 + '\n') + + return resp def create_handler(args): diff --git a/status/CMakeLists.txt b/status/CMakeLists.txt index 515897e7..59bdd2dd 100644 --- a/status/CMakeLists.txt +++ b/status/CMakeLists.txt @@ -3,7 +3,7 @@ add_library(status STATIC status.hpp ) -target_link_libraries(status ) +target_link_libraries(status proto) add_dependencies(status jsoncpp proto) target_include_directories(status PRIVATE ${JSONCPP_INCLUDE_DIRS}) diff --git a/status/status.cpp b/status/status.cpp index 23b8c013..bedc22f4 100644 --- a/status/status.cpp +++ b/status/status.cpp @@ -15,6 +15,7 @@ #include #include #include +#include using namespace bzn; @@ -27,6 +28,7 @@ namespace const std::string MODULE_KEY{"module"}; const std::string UPTIME_KEY{"uptime"}; const std::string COMMIT_KEY{"commit"}; + const std::string PBFT_ENABLED_KEY{"pbft_enabled"}; std::string get_uptime(const std::chrono::steady_clock::time_point& start_time) { @@ -46,10 +48,11 @@ namespace } -status::status(std::shared_ptr node, bzn::status::status_provider_list_t&& status_providers) +status::status(std::shared_ptr node, bzn::status::status_provider_list_t&& status_providers, const bool pbft_enabled) : node(std::move(node)) , status_providers(std::move(status_providers)) , start_time(std::chrono::steady_clock::now()) + , pbft_enabled(pbft_enabled) { } @@ -65,19 +68,20 @@ status::start() { throw std::runtime_error("Unable to register for STATUS messages!"); } + + if (!this->node->register_for_message(bzn_envelope::kStatusRequest, + std::bind(&status::handle_status_request_message, shared_from_this(), std::placeholders::_1, std::placeholders::_2))) + { + throw std::runtime_error("Unable to register for STATUS REQUEST messages!"); + } }); } -void -status::handle_ws_status_messages(const bzn::json_message& ws_msg, std::shared_ptr session) +bzn::json_message +status::query_modules() { - auto response_msg = std::make_shared(ws_msg); - - (*response_msg)[VERSION_KEY] = SWARM_VERSION; - (*response_msg)[COMMIT_KEY] = SWARM_GIT_COMMIT; - (*response_msg)[UPTIME_KEY] = get_uptime(this->start_time); - (*response_msg)[MODULE_KEY] = bzn::json_message(); + Json::Value module_status; for (const auto& provider : this->status_providers) { @@ -88,11 +92,46 @@ status::handle_ws_status_messages(const bzn::json_message& ws_msg, std::shared_p entry[NAME_KEY] = provider_shared_ptr->get_name(); entry[STATUS_KEY] = provider_shared_ptr->get_status(); - (*response_msg)[MODULE_KEY].append(entry); + module_status.append(entry); } } + return module_status; +} + + +void +status::handle_ws_status_messages(const bzn::json_message& ws_msg, std::shared_ptr session) +{ + auto response_msg = std::make_shared(ws_msg); + + (*response_msg)[VERSION_KEY] = SWARM_VERSION; + (*response_msg)[COMMIT_KEY] = SWARM_GIT_COMMIT; + (*response_msg)[UPTIME_KEY] = get_uptime(this->start_time); + (*response_msg)[MODULE_KEY] = this->query_modules(); + (*response_msg)[PBFT_ENABLED_KEY] = this->pbft_enabled; + LOG(debug) << response_msg->toStyledString().substr(0, MAX_MESSAGE_SIZE); session->send_message(response_msg, false); } + + +void +status::handle_status_request_message(const bzn_envelope& /*msg*/, std::shared_ptr session) +{ + status_response srm; + + srm.set_swarm_version(SWARM_VERSION); + srm.set_swarm_git_commit(SWARM_GIT_COMMIT); + srm.set_uptime(get_uptime(this->start_time)); + srm.set_pbft_enabled(this->pbft_enabled); + + Json::Value module_status; + module_status[MODULE_KEY] = this->query_modules(); + srm.set_module_status_json(module_status.toStyledString()); + + LOG(debug) << srm.DebugString().substr(0, MAX_MESSAGE_SIZE); + + session->send_message(std::make_shared(srm.SerializeAsString()), false); +} diff --git a/status/status.hpp b/status/status.hpp index d56266f2..c20eda62 100644 --- a/status/status.hpp +++ b/status/status.hpp @@ -29,19 +29,25 @@ namespace bzn public: using status_provider_list_t = std::vector>; - status(std::shared_ptr node, status_provider_list_t&& status_providers); + status(std::shared_ptr node, status_provider_list_t&& status_providers, bool pbft_enabled); void start(); private: void handle_ws_status_messages(const bzn::json_message& ws_msg, std::shared_ptr session); + void handle_status_request_message(const bzn_envelope& msg, std::shared_ptr session); + + bzn::json_message query_modules(); + std::shared_ptr node; status_provider_list_t status_providers; std::once_flag start_once; const std::chrono::steady_clock::time_point start_time; + + const bool pbft_enabled; }; } // namespace bzn diff --git a/status/test/CMakeLists.txt b/status/test/CMakeLists.txt index 39b267cf..a4757a0a 100644 --- a/status/test/CMakeLists.txt +++ b/status/test/CMakeLists.txt @@ -1,4 +1,4 @@ set(test_srcs status_test.cpp) -set(test_libs status ) +set(test_libs status ${Protobuf_LIBRARIES}) add_gmock_test(status) diff --git a/status/test/status_test.cpp b/status/test/status_test.cpp index 2f6a8a43..2e6482ea 100644 --- a/status/test/status_test.cpp +++ b/status/test/status_test.cpp @@ -13,6 +13,8 @@ // along with this program. If not, see . #include +#include +#include #include #include #include @@ -27,21 +29,32 @@ TEST(status_test, test_that_status_registers_and_responses_to_requests) // success { - auto status = std::make_shared(mock_node, bzn::status::status_provider_list_t{}); + auto status = std::make_shared(mock_node, bzn::status::status_provider_list_t{}, true); EXPECT_CALL(*mock_node, register_for_message("status", _)).WillOnce(Return(true)); + EXPECT_CALL(*mock_node, register_for_message(bzn_envelope::kStatusRequest, _)).WillOnce(Return(true)); status->start(); } // failure { - auto status = std::make_shared(mock_node, bzn::status::status_provider_list_t{}); + auto status = std::make_shared(mock_node, bzn::status::status_provider_list_t{}, true); EXPECT_CALL(*mock_node, register_for_message("status", _)).WillOnce(Return(false)); EXPECT_THROW(status->start(), std::runtime_error); } + + // failure + { + auto status = std::make_shared(mock_node, bzn::status::status_provider_list_t{}, false); + + EXPECT_CALL(*mock_node, register_for_message("status", _)).WillOnce(Return(true)); + EXPECT_CALL(*mock_node, register_for_message(bzn_envelope::kStatusRequest, _)).WillOnce(Return(false)); + + EXPECT_THROW(status->start(), std::runtime_error); + } } @@ -52,7 +65,7 @@ TEST(status_test, test_that_status_request_queries_status_providers) auto mock_status_provider = std::make_shared(); - auto status = std::make_shared(mock_node, bzn::status::status_provider_list_t{mock_status_provider, mock_status_provider}); + auto status = std::make_shared(mock_node, bzn::status::status_provider_list_t{mock_status_provider, mock_status_provider}, true); bzn::message_handler mh; EXPECT_CALL(*mock_node, register_for_message("status", _)).WillOnce(Invoke( @@ -62,6 +75,14 @@ TEST(status_test, test_that_status_request_queries_status_providers) return true; })); + bzn::protobuf_handler pbh; + EXPECT_CALL(*mock_node, register_for_message(bzn_envelope::kStatusRequest, _)).WillOnce(Invoke( + [&](auto, auto handler) + { + pbh = handler; + return true; + })); + status->start(); // make a request... @@ -70,7 +91,7 @@ TEST(status_test, test_that_status_request_queries_status_providers) request["cmd"] = "state"; request["transaction-id"] = 85746; - EXPECT_CALL(*mock_status_provider, get_status()).Times(2).WillRepeatedly(Invoke( + EXPECT_CALL(*mock_status_provider, get_status()).WillRepeatedly(Invoke( []() { bzn::json_message status; @@ -86,10 +107,36 @@ TEST(status_test, test_that_status_request_queries_status_providers) [&](std::shared_ptr msg, bool) { ASSERT_EQ((*msg)["transaction-id"].asInt64(), request["transaction-id"].asInt64()); + ASSERT_EQ((*msg)["pbft_enabled"].asBool(), true); ASSERT_EQ((*msg)["module"].size(), size_t(2)); ASSERT_EQ((*msg)["module"][0]["name"].asString(), "mock1"); ASSERT_EQ((*msg)["module"][1]["name"].asString(), "mock2"); })); mh(request, mock_session); + + // make protobuf request... + EXPECT_CALL(*mock_status_provider, get_name()).WillOnce(Invoke( + [](){ return "mock1";})).WillOnce(Invoke([](){return "mock2";})); + + EXPECT_CALL(*mock_session, send_message(An>(), false)).WillOnce(Invoke( + [&](std::shared_ptr msg, bool) + { + status_response sr; + + ASSERT_TRUE(sr.ParseFromString(*msg)); + ASSERT_TRUE(sr.pbft_enabled()); + ASSERT_EQ(sr.swarm_version(), SWARM_VERSION); + ASSERT_EQ(sr.swarm_git_commit(), SWARM_GIT_COMMIT); + ASSERT_EQ(sr.uptime(), "0 days, 0 hours, 0 minutes"); + + Json::Value ms; + Json::Reader reader; + reader.parse(sr.module_status_json(), ms); + ASSERT_EQ(ms["module"].size(), size_t(2)); + ASSERT_EQ(ms["module"][0]["name"].asString(), "mock1"); + ASSERT_EQ(ms["module"][1]["name"].asString(), "mock2"); + })); + + pbh(bzn_envelope(), mock_session); } diff --git a/swarm/main.cpp b/swarm/main.cpp index 71f49145..dc85308c 100644 --- a/swarm/main.cpp +++ b/swarm/main.cpp @@ -247,11 +247,11 @@ main(int argc, const char* argv[]) auto crud = std::make_shared(stable_storage, std::make_shared(io_context)); auto pbft = std::make_shared(node, io_context, peers.get_peers(), options->get_uuid(), - std::make_shared(io_context, unstable_storage, crud, options->get_uuid()), failure_detector); + std::make_shared(io_context, unstable_storage, crud, options->get_uuid()), failure_detector, crypto); pbft->set_audit_enabled(options->get_simple_options().get(bzn::option_names::AUDIT_ENABLED)); - status = std::make_shared(node, bzn::status::status_provider_list_t{pbft}); + status = std::make_shared(node, bzn::status::status_provider_list_t{pbft}, true); crud->start(); pbft->start(); @@ -284,7 +284,7 @@ main(int argc, const char* argv[]) auto crud = std::make_shared(node, raft, storage, std::make_shared(io_context)); auto http_server = std::make_shared(io_context, crud, ep); - status = std::make_shared(node, bzn::status::status_provider_list_t{raft}); + status = std::make_shared(node, bzn::status::status_provider_list_t{raft}, false); raft->set_audit_enabled(options->get_simple_options().get(bzn::option_names::AUDIT_ENABLED));