From 43714afc1e2dbb5381dfe67ddb58ba0e7ba777e5 Mon Sep 17 00:00:00 2001 From: "isabel@lotus" Date: Tue, 26 Feb 2019 17:01:38 -0800 Subject: [PATCH] KEP-1206: Infrastructure for reporting daemon statistics --- CMakeLists.txt | 1 + crypto/crypto.cpp | 16 ++- crypto/crypto.hpp | 4 +- crypto/test/crypto_test.cpp | 4 +- include/system_clock.hpp | 37 ++++++ mocks/mock_monitor.hpp | 32 +++++ mocks/mock_system_clock.hpp | 28 +++++ monitor/CMakeLists.txt | 8 ++ monitor/monitor.cpp | 147 +++++++++++++++++++++++ monitor/monitor.hpp | 54 +++++++++ monitor/monitor_base.hpp | 72 +++++++++++ monitor/test/CMakeLists.txt | 4 + monitor/test/monitor_test.cpp | 187 +++++++++++++++++++++++++++++ node/node.cpp | 9 +- node/node.hpp | 4 +- node/session.cpp | 14 ++- node/session.hpp | 4 +- node/test/node_test.cpp | 15 ++- node/test/session_test.cpp | 9 +- pbft/pbft.cpp | 5 + pbft/pbft.hpp | 3 + pbft/test/pbft_join_leave_test.cpp | 12 +- pbft/test/pbft_test_common.cpp | 1 + pbft/test/pbft_test_common.hpp | 4 +- pbft/test/pbft_viewchange_test.cpp | 3 +- swarm/CMakeLists.txt | 2 +- swarm/main.cpp | 8 +- 27 files changed, 660 insertions(+), 27 deletions(-) create mode 100644 include/system_clock.hpp create mode 100644 mocks/mock_monitor.hpp create mode 100644 mocks/mock_system_clock.hpp create mode 100644 monitor/CMakeLists.txt create mode 100644 monitor/monitor.cpp create mode 100644 monitor/monitor.hpp create mode 100644 monitor/monitor_base.hpp create mode 100644 monitor/test/CMakeLists.txt create mode 100644 monitor/test/monitor_test.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index f5a6e9e3..8b9c8085 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -84,6 +84,7 @@ add_subdirectory(audit) add_subdirectory(pbft) add_subdirectory(chaos) add_subdirectory(crypto) +add_subdirectory(monitor) include(cmake/static_analysis.cmake) diff --git a/crypto/crypto.cpp b/crypto/crypto.cpp index 937966d3..88e17504 100644 --- a/crypto/crypto.cpp +++ b/crypto/crypto.cpp @@ -26,8 +26,9 @@ namespace const std::string PEM_SUFFIX = "\n-----END PUBLIC KEY-----\n"; } -crypto::crypto(std::shared_ptr options) +crypto::crypto(std::shared_ptr options, std::shared_ptr monitor) : options(std::move(options)) + , monitor(std::move(monitor)) { LOG(info) << "Using " << SSLeay_version(SSLEAY_VERSION); if (this->options->get_simple_options().get(bzn::option_names::CRYPTO_ENABLED_OUTGOING)) @@ -154,6 +155,13 @@ crypto::verify(const bzn_envelope& msg) */ ERR_clear_error(); + this->monitor->send_counter(bzn::statistic::signature_verified); + this->monitor->send_counter(bzn::statistic::signature_verified_bytes, msg_text.length()); + if (!result) + { + this->monitor->send_counter(bzn::statistic::signature_rejected); + } + return result; } @@ -219,6 +227,9 @@ crypto::sign(bzn_envelope& msg) } + this->monitor->send_counter(bzn::statistic::signature_computed); + this->monitor->send_counter(bzn::statistic::signature_computed_bytes, msg_text.length()); + return result; } @@ -266,6 +277,9 @@ crypto::hash(const std::string& msg) throw std::runtime_error(std::string("\nfailed to compute message hash ") + msg); } + this->monitor->send_counter(bzn::statistic::hash_computed); + this->monitor->send_counter(bzn::statistic::hash_computed_bytes, msg.length()); + return std::string(reinterpret_cast(hash_buffer.get()), md_size); } diff --git a/crypto/crypto.hpp b/crypto/crypto.hpp index 17e38c44..48c27b76 100644 --- a/crypto/crypto.hpp +++ b/crypto/crypto.hpp @@ -20,6 +20,7 @@ #include #include #include +#include namespace bzn { @@ -27,7 +28,7 @@ namespace bzn { public: - crypto(std::shared_ptr options); + crypto(std::shared_ptr options, std::shared_ptr monitor); bool sign(bzn_envelope& msg) override; @@ -53,6 +54,7 @@ namespace bzn const std::string deterministic_serialize(const bzn_envelope& msg); std::shared_ptr options; + std::shared_ptr monitor; EVP_PKEY_ptr_t private_key_EVP = EVP_PKEY_ptr_t(nullptr, &EVP_PKEY_free); EC_KEY_ptr_t private_key_EC = EC_KEY_ptr_t(nullptr, &EC_KEY_free); diff --git a/crypto/test/crypto_test.cpp b/crypto/test/crypto_test.cpp index 4bf4b623..c0ae1c4e 100644 --- a/crypto/test/crypto_test.cpp +++ b/crypto/test/crypto_test.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -26,6 +27,7 @@ class crypto_test : public Test { public: std::shared_ptr options = std::make_shared(); + std::shared_ptr monitor = std::make_shared>(); std::shared_ptr crypto; const std::string private_key_file = "test_private_key.pem"; @@ -63,7 +65,7 @@ class crypto_test : public Test this->options->get_mutable_simple_options().set(bzn::option_names::CRYPTO_ENABLED_INCOMING, std::to_string(true)); this->options->get_mutable_simple_options().set(bzn::option_names::CRYPTO_ENABLED_OUTGOING, std::to_string(true)); - this->crypto = std::make_shared(this->options); + this->crypto = std::make_shared(this->options, this->monitor); } diff --git a/include/system_clock.hpp b/include/system_clock.hpp new file mode 100644 index 00000000..9186b3e0 --- /dev/null +++ b/include/system_clock.hpp @@ -0,0 +1,37 @@ +// Copyright (C) 2018 Bluzelle +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License, version 3, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +#pragma once +#include +#include + +namespace bzn +{ + class system_clock_base + { + public: + virtual uint64_t microseconds_since_epoch() = 0; + + virtual ~system_clock_base() = default; + }; + + class system_clock : public system_clock_base + { + uint64_t microseconds_since_epoch() + { + auto res = std::chrono::time_point_cast(std::chrono::high_resolution_clock::now()); + return res.time_since_epoch().count(); + } + }; +} diff --git a/mocks/mock_monitor.hpp b/mocks/mock_monitor.hpp new file mode 100644 index 00000000..6c4811b6 --- /dev/null +++ b/mocks/mock_monitor.hpp @@ -0,0 +1,32 @@ +// Copyright (C) 2018 Bluzelle +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License, version 3, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +#pragma once + +#include +#include + +namespace bzn { + + class mock_monitor : public monitor_base { + public: + MOCK_METHOD1(start_timer, + void(std::string instance_id)); + MOCK_METHOD2(finish_timer, + void(statistic stat, std::string instance_id)); + MOCK_METHOD2(send_counter, + void(statistic, uint64_t)); + }; + +} // namespace bzn diff --git a/mocks/mock_system_clock.hpp b/mocks/mock_system_clock.hpp new file mode 100644 index 00000000..ae3c0a0f --- /dev/null +++ b/mocks/mock_system_clock.hpp @@ -0,0 +1,28 @@ +// Copyright (C) 2018 Bluzelle +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License, version 3, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +#pragma once + +#include +#include + +namespace bzn +{ + class mock_system_clock : public system_clock_base + { + public: + MOCK_METHOD0(microseconds_since_epoch, uint64_t()); + }; +} + diff --git a/monitor/CMakeLists.txt b/monitor/CMakeLists.txt new file mode 100644 index 00000000..12b6e885 --- /dev/null +++ b/monitor/CMakeLists.txt @@ -0,0 +1,8 @@ +add_library(monitor + monitor_base.hpp + monitor.hpp + monitor.cpp + ) + +target_link_libraries(utils) +add_subdirectory(test) diff --git a/monitor/monitor.cpp b/monitor/monitor.cpp new file mode 100644 index 00000000..d2dffbb7 --- /dev/null +++ b/monitor/monitor.cpp @@ -0,0 +1,147 @@ +// Copyright (C) 2018 Bluzelle +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License, version 3, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +#include +#include +#include +#include + +using namespace bzn; + +namespace +{ + std::map statistic_names{ + { + {statistic::hash_computed, "crypto.hashes_computed"}, + {statistic::hash_computed_bytes, "crypto.bytes_hashed"}, + {statistic::signature_computed, "crypto.signatures_computed"}, + {statistic::signature_computed_bytes, "crypto.bytes_signed"}, + {statistic::signature_verified, "crypto.signatures_verified"}, + {statistic::signature_verified_bytes, "crypto.bytes_verified"}, + {statistic::signature_rejected, "crypto.signatures_rejected"}, + + {statistic::session_opened, "node.sessions_opened"}, + {statistic::message_sent, "node.messages_sent"}, + {statistic::message_sent_bytes, "node.bytes_sent"}, + + {statistic::pbft_no_primary, "pbft.liveness.no_primary"}, + {statistic::pbft_failure_detected, "pbft.liveness.failure_detected"}, + {statistic::pbft_commit_conflict, "pbft.safety.commit_conflict"}, + {statistic::pbft_primary_conflict, "pbft.safety.primary_conflict"}, + + {statistic::request_latency, "total-server-latency"} + } + }; + + const uint64_t MAX_OUTSTANDING_TIMERS = 10000; +} + +monitor::monitor(std::shared_ptr options, std::shared_ptr context, std::shared_ptr clock) + : options(std::move(options)) + , context(std::move(context)) + , clock(std::move(clock)) + , socket(this->context->make_unique_udp_socket()) + , monitor_endpoint(this->options->get_monitor_endpoint(this->context)) + , scope_prefix("com.bluzelle.swarm.singleton.node." + this->options->get_uuid()) +{ + if (this->monitor_endpoint) + { + LOG(info) << boost::format("Will send stats to %1%:%2%") + % this->monitor_endpoint->address().to_string() + % this->monitor_endpoint->port(); + } + else + { + LOG(info) << "No monitor is configured; stats will not be collected"; + } + +} + +void +monitor::start_timer(std::string timer_id) +{ + if (!this->monitor_endpoint) + { + return; + } + + std::lock_guard lock(this->timers_lock); + if (this->start_times.find(timer_id) != this->start_times.end()) + { + return; + } + + this->start_times.emplace(std::make_pair(timer_id, this->clock->microseconds_since_epoch())); + this->ordered_timers.push_back(timer_id); + + while (this->ordered_timers.size() > MAX_OUTSTANDING_TIMERS) + { + this->start_times.erase(this->ordered_timers.front()); + this->ordered_timers.pop_front(); + } +} + +void +monitor::finish_timer(bzn::statistic stat, std::string timer_id) +{ + if (!this->monitor_endpoint) + { + return; + } + + uint64_t result; + + { + std::lock_guard lock(this->timers_lock); + if (this->start_times.find(timer_id) == this->start_times.end()) + { + return; + } + + result = this->clock->microseconds_since_epoch() - start_times.at(timer_id); + this->start_times.erase(timer_id); + } + + auto stat_string = this->scope_prefix + "." + statistic_names.at(stat) + ":" + std::to_string(result) + "|us"; + LOG(debug) << stat_string; + this->send(stat_string); +} + +void +monitor::send_counter(bzn::statistic stat, uint64_t amount) +{ + if (!this->monitor_endpoint) + { + return; + } + + auto stat_string = this->scope_prefix + "." + statistic_names.at(stat) + ":" + std::to_string(amount) + "|c"; + LOG(debug) << stat_string; + + this->send(stat_string); +} + +void +monitor::send(const std::string& stat) +{ + std::shared_ptr buffer = std::make_shared(stat.c_str(), stat.size()); + this->socket->async_send_to(*buffer, *(this->monitor_endpoint), + [buffer](const boost::system::error_code& ec, std::size_t /*bytes*/) + { + if (ec) + { + LOG(error) << boost::format("UDP send failed"); + } + }); +} diff --git a/monitor/monitor.hpp b/monitor/monitor.hpp new file mode 100644 index 00000000..b468e02d --- /dev/null +++ b/monitor/monitor.hpp @@ -0,0 +1,54 @@ +// Copyright (C) 2018 Bluzelle +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License, version 3, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +#pragma once + +#include +#include +#include +#include + +#include +#include + +namespace bzn +{ + class monitor : public monitor_base + { + public: + + monitor(std::shared_ptr options, std::shared_ptr context, std::shared_ptr clock); + + void start_timer(std::string instance_id) override; + + void finish_timer(statistic stat, std::string instance_id) override; + + void send_counter(statistic stat, uint64_t amount = 1) override; + + private: + + void send(const std::string& stat); + + std::list ordered_timers; + std::unordered_map start_times; + std::mutex timers_lock; + + std::shared_ptr options; + std::shared_ptr context; + std::shared_ptr clock; + std::unique_ptr socket; + const std::optional monitor_endpoint; + const std::string scope_prefix; + }; +} diff --git a/monitor/monitor_base.hpp b/monitor/monitor_base.hpp new file mode 100644 index 00000000..3434b5eb --- /dev/null +++ b/monitor/monitor_base.hpp @@ -0,0 +1,72 @@ +// Copyright (C) 2018 Bluzelle +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License, version 3, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +#pragma once + +#include +#include + +namespace bzn +{ + enum class statistic + { + hash_computed, + hash_computed_bytes, + signature_computed, + signature_computed_bytes, + signature_verified, + signature_verified_bytes, + signature_rejected, + + session_opened, + message_sent, + message_sent_bytes, + + pbft_no_primary, + pbft_failure_detected, + pbft_commit_conflict, + pbft_primary_conflict, + + request_latency + }; + + class monitor_base + { + public: + + /* + * Start timing for some statistic (e.g. a request completion). Will be ignored if this is a duplicate of some + * other recent timer. + * @instance_id some globally unique name for this instance of the timer (such as the request hash) + */ + virtual void start_timer(std::string instance_id) = 0; + + /* + * Finish timing for some statistic (e.g. a request completion). Will be ignored if no such timer has been + * started. + * @instance_id some globally unique name for this instance of the timer (such as the request hash) + * @stat the statistic under which to send the measurement + */ + virtual void finish_timer(statistic stat, std::string instance_id) = 0; + + /* + * Send a counter statistic + * @stat statistic to send + * @amount amount of that statistic that has been observed + */ + virtual void send_counter(statistic stat, uint64_t amount = 1) = 0; + + virtual ~monitor_base() = default; + }; +} diff --git a/monitor/test/CMakeLists.txt b/monitor/test/CMakeLists.txt new file mode 100644 index 00000000..070e18fa --- /dev/null +++ b/monitor/test/CMakeLists.txt @@ -0,0 +1,4 @@ +set(test_srcs monitor_test.cpp) +set(test_libs monitor options) + +add_gmock_test(monitor) diff --git a/monitor/test/monitor_test.cpp b/monitor/test/monitor_test.cpp new file mode 100644 index 00000000..b2bbb553 --- /dev/null +++ b/monitor/test/monitor_test.cpp @@ -0,0 +1,187 @@ +// Copyright (C) 2018 Bluzelle +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License, version 3, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +using namespace ::testing; + +class monitor_test : public Test +{ +public: + std::shared_ptr io_context = std::make_shared(); + std::shared_ptr clock = std::make_shared(); + std::shared_ptr options = std::make_shared(); + std::shared_ptr monitor; + + uint64_t current_time = 0; + std::vector sent_messages; + + boost::asio::ip::udp::endpoint ep; + bool send_works = true; + + monitor_test() + { + EXPECT_CALL(*(this->options), get_uuid()).WillRepeatedly(Return("uuid")); + EXPECT_CALL(*(this->options), get_monitor_endpoint(_)).WillRepeatedly(Invoke([&](auto){return this->ep;})); + + EXPECT_CALL(*(this->clock), microseconds_since_epoch()).WillRepeatedly(Invoke( + [&]() + { + return this->current_time; + } + )); + + EXPECT_CALL(*(this->io_context), make_unique_udp_socket()).WillRepeatedly(Invoke( + [&]() + { + auto socket = std::make_unique(); + EXPECT_CALL(*socket, async_send_to(_, _, _)).WillRepeatedly(Invoke( + [&](auto buffer, auto /*endpoint*/, auto handler) + { + sent_messages.emplace_back(reinterpret_cast(buffer.data()), buffer.size()); + if(this->send_works) + { + handler(boost::system::error_code{}, buffer.size()); + } + else + { + handler(boost::asio::error::connection_refused, 0); + } + } + )); + return socket; + } + )); + + this->monitor = std::make_shared(this->options, this->io_context, this->clock); + } + + std::pair parse_counter(const std::string& message) + { + std::regex counter_regex("^(.*):(\\d*)\\|(.*)$"); + std::smatch result; + + EXPECT_TRUE(std::regex_match(message, result, counter_regex)); + EXPECT_EQ(result[3].str(), "c"); + + return std::pair(result[1].str(), std::stoull(result[2].str())); + }; + + std::pair parse_timer(const std::string& message) + { + std::regex timer_regex("^(.*):(\\d*)\\|(.*)$"); + std::smatch result; + + EXPECT_TRUE(std::regex_match(message, result, timer_regex)); + EXPECT_EQ(result[3].str(), "us"); + + return std::pair(result[1].str(), std::stoull(result[2].str())); + }; +}; + +TEST_F(monitor_test, test_that_counters_emit_metric) +{ + this->monitor->send_counter(bzn::statistic::message_sent); + this->monitor->send_counter(bzn::statistic::message_sent_bytes, 15u); + + EXPECT_EQ(this->parse_counter(this->sent_messages.at(0)).second, 1u); + EXPECT_EQ(this->parse_counter(this->sent_messages.at(1)).second, 15u); +} + +TEST_F(monitor_test, test_that_timers_emit_metric) +{ + this->monitor->start_timer("hash"); + this->current_time = 10; + this->monitor->finish_timer(bzn::statistic::request_latency, "hash"); + + auto res = this->parse_timer(this->sent_messages.at(0)); + + EXPECT_EQ(res.second, 10u); +} + +TEST_F(monitor_test, test_concurrent_timers) +{ + this->current_time = 10; + this->monitor->start_timer("A"); + this->current_time = 20; + this->monitor->start_timer("B"); + + this->current_time = 100; + this->monitor->finish_timer(bzn::statistic::request_latency, "A"); + this->current_time = 1000; + this->monitor->finish_timer(bzn::statistic::request_latency, "B"); + + EXPECT_EQ(this->parse_timer(this->sent_messages.at(0)).second, 100u - 10u); + EXPECT_EQ(this->parse_timer(this->sent_messages.at(1)).second, 1000u - 20u); +} + +TEST_F(monitor_test, test_timer_with_duplicate_triggers) +{ + this->current_time = 10; + this->monitor->start_timer("A"); + this->current_time = 21; + this->monitor->start_timer("A"); + + this->current_time = 30; + this->monitor->finish_timer(bzn::statistic::request_latency, "A"); + this->current_time = 43; + this->monitor->finish_timer(bzn::statistic::request_latency, "A"); + + EXPECT_EQ(this->parse_timer(this->sent_messages.at(0)).second, 30u - 10u); + EXPECT_EQ(this->sent_messages.size(), 1u); +} + +TEST_F(monitor_test, test_no_endpoint) +{ + auto options2 = std::make_shared(); + EXPECT_CALL(*options2, get_monitor_endpoint(_)).WillRepeatedly(Return(std::nullopt)); + auto monitor2 = std::make_shared(options2, io_context, clock); + + monitor2->send_counter(bzn::statistic::message_sent); + monitor2->start_timer("a"); + monitor2->finish_timer(bzn::statistic::request_latency, "a"); + + EXPECT_EQ(this->sent_messages.size(), 0u); +} + +TEST_F(monitor_test, test_timers_cleanup) +{ + for(int i=0; i<12000; /*more than we remember at once*/ i++) + { + this->monitor->start_timer(std::to_string(i)); + } + + this->monitor->finish_timer(bzn::statistic::request_latency, std::to_string(0)); + + EXPECT_EQ(this->sent_messages.size(), 0u); +} + +TEST_F(monitor_test, test_send_fails) +{ + this->send_works = false; + monitor->send_counter(bzn::statistic::message_sent); + monitor->start_timer("a"); + monitor->finish_timer(bzn::statistic::request_latency, "a"); + + // just testing for no crash +} \ No newline at end of file diff --git a/node/node.cpp b/node/node.cpp index 0d69fb68..1b883baf 100644 --- a/node/node.cpp +++ b/node/node.cpp @@ -27,13 +27,14 @@ namespace node::node(std::shared_ptr io_context, std::shared_ptr websocket, std::shared_ptr chaos, - const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr crypto, std::shared_ptr options) + const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr crypto, std::shared_ptr options, std::shared_ptr monitor) : tcp_acceptor(io_context->make_unique_tcp_acceptor(ep)) , io_context(std::move(io_context)) , websocket(std::move(websocket)) , chaos(std::move(chaos)) , crypto(std::move(crypto)) , options(std::move(options)) + , monitor(std::move(monitor)) { } @@ -99,7 +100,8 @@ node::do_accept() , std::bind(&node::priv_protobuf_handler, self, std::placeholders::_1, std::placeholders::_2) , self->options->get_ws_idle_timeout() , [](){} - , self->crypto); + , self->crypto + , self->monitor); session->accept(std::move(ws)); @@ -164,7 +166,8 @@ node::find_session(const boost::asio::ip::tcp::endpoint& ep) , std::bind(&node::priv_protobuf_handler, shared_from_this(), std::placeholders::_1, std::placeholders::_2) , this->options->get_ws_idle_timeout() , std::bind(&node::priv_session_shutdown_handler, shared_from_this(), key) - , this->crypto); + , this->crypto + , this->monitor); session->open(this->websocket); sessions.insert_or_assign(key, session); } diff --git a/node/node.hpp b/node/node.hpp index 35b2d1ab..0edc285e 100644 --- a/node/node.hpp +++ b/node/node.hpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -36,7 +37,7 @@ namespace bzn { public: node(std::shared_ptr io_context, std::shared_ptr websocket, std::shared_ptr chaos, - const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr crypto, std::shared_ptr options); + const boost::asio::ip::tcp::endpoint& ep, std::shared_ptr crypto, std::shared_ptr options, std::shared_ptr monitor); bool register_for_message(const bzn_envelope::PayloadCase type, bzn::protobuf_handler msg_handler) override; @@ -82,6 +83,7 @@ namespace bzn std::shared_ptr crypto; std::shared_ptr options; + std::shared_ptr monitor; }; } // bzn diff --git a/node/session.cpp b/node/session.cpp index be49fd4c..7f1a89fd 100644 --- a/node/session.cpp +++ b/node/session.cpp @@ -27,7 +27,8 @@ session::session( bzn::protobuf_handler proto_handler, std::chrono::milliseconds ws_idle_timeout, bzn::session_shutdown_handler shutdown_handler, - std::shared_ptr crypto + std::shared_ptr crypto, + std::shared_ptr monitor ) : session_id(session_id) , ep(std::move(ep)) @@ -39,6 +40,7 @@ session::session( , ws_idle_timeout(std::move(ws_idle_timeout)) , write_buffer(nullptr, 0) , crypto(std::move(crypto)) + , monitor(std::move(monitor)) { LOG(debug) << "creating session " << std::to_string(session_id); } @@ -97,6 +99,7 @@ session::open(std::shared_ptr ws_factory) return; } + self->monitor->send_counter(statistic::session_opened); self->do_read(); self->do_write(); }); @@ -121,6 +124,7 @@ session::accept(std::shared_ptr ws) return; } + self->monitor->send_counter(statistic::session_opened); self->do_read(); self->do_write(); } @@ -199,10 +203,16 @@ session::do_write() this->websocket->binary(true); this->write_buffer = boost::asio::buffer(*msg); this->websocket->async_write(this->write_buffer, - [self = shared_from_this(), msg](boost::system::error_code ec, auto /*bytes_transferred*/) + [self = shared_from_this(), msg](boost::system::error_code ec, auto bytes_transferred) { self->activity = true; + if (!ec) + { + self->monitor->send_counter(statistic::message_sent); + } + self->monitor->send_counter(statistic::message_sent_bytes, bytes_transferred); + if(ec) { // don't log close of websocket... diff --git a/node/session.hpp b/node/session.hpp index c438f9a0..8e853154 100644 --- a/node/session.hpp +++ b/node/session.hpp @@ -41,7 +41,8 @@ namespace bzn bzn::protobuf_handler proto_handler, std::chrono::milliseconds ws_idle_timeout, bzn::session_shutdown_handler shutdown_handler, - std::shared_ptr crypto); + std::shared_ptr crypto, + std::shared_ptr monitor); ~session(); @@ -88,6 +89,7 @@ namespace bzn boost::asio::mutable_buffers_1 write_buffer; std::shared_ptr crypto; + std::shared_ptr monitor; }; } // blz diff --git a/node/test/node_test.cpp b/node/test/node_test.cpp index 2b478603..c8ae3fb6 100644 --- a/node/test/node_test.cpp +++ b/node/test/node_test.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -47,8 +48,9 @@ class node_test2 : public Test std::shared_ptr mock_chaos = std::make_shared>(); std::shared_ptr options = std::make_shared(); std::shared_ptr crypto = std::shared_ptr(); + std::shared_ptr monitor = std::make_shared>(); bzn::smart_mock_io mock; - std::shared_ptr node = std::make_shared(mock.io_context, mock.websocket, mock_chaos, TEST_ENDPOINT, crypto, options); + std::shared_ptr node = std::make_shared(mock.io_context, mock.websocket, mock_chaos, TEST_ENDPOINT, crypto, options, monitor); bzn_envelope db_msg; uint callback_invoked = 0; @@ -75,10 +77,11 @@ namespace bzn auto mock_chaos = std::make_shared>(); auto options = std::shared_ptr(); auto crypto = std::shared_ptr(); + std::shared_ptr monitor = std::make_shared>(); EXPECT_THROW( bzn::node(io_context, nullptr, mock_chaos, - boost::asio::ip::tcp::endpoint{boost::asio::ip::address_v4::from_string("8.8.8.8"), 8080}, crypto, options), + boost::asio::ip::tcp::endpoint{boost::asio::ip::address_v4::from_string("8.8.8.8"), 8080}, crypto, options, monitor), std::exception ); } @@ -163,7 +166,8 @@ namespace bzn auto mock_io_context = std::make_shared>(); auto options = std::shared_ptr(); auto crypto = std::shared_ptr(); - auto node = std::make_shared(mock_io_context, nullptr, mock_chaos, TEST_ENDPOINT, crypto, options); + auto monitor = std::make_shared>(); + auto node = std::make_shared(mock_io_context, nullptr, mock_chaos, TEST_ENDPOINT, crypto, options, monitor); // test that nulls are rejected... ASSERT_FALSE(node->register_for_message(bzn_envelope::kDatabaseMsg, nullptr)); @@ -181,9 +185,10 @@ namespace bzn auto mock_io_context = std::make_shared>(); auto options = std::make_shared(); options->get_mutable_simple_options().set(bzn::option_names::CRYPTO_ENABLED_INCOMING, "true"); - auto crypto = std::make_shared(options); + auto monitor = std::make_shared>(); + auto crypto = std::make_shared(options, monitor); auto mock_session = std::make_shared(); - auto node = std::make_shared(mock_io_context, nullptr, mock_chaos, TEST_ENDPOINT, crypto, options); + auto node = std::make_shared(mock_io_context, nullptr, mock_chaos, TEST_ENDPOINT, crypto, options, monitor); // Add our test callback... unsigned int callback_execute = 0u; diff --git a/node/test/session_test.cpp b/node/test/session_test.cpp index e95c1f9f..2c090723 100644 --- a/node/test/session_test.cpp +++ b/node/test/session_test.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -36,6 +37,7 @@ class session_test : public Test public: std::shared_ptr io_context = std::make_shared(); std::shared_ptr mock_chaos = std::make_shared>(); + std::shared_ptr monitor = std::make_shared>(); bzn::asio::wait_handler timer_expiry; @@ -61,12 +63,13 @@ class session_test2 : public Test public: bzn::smart_mock_io mock; std::shared_ptr mock_chaos = std::make_shared>(); + std::shared_ptr monitor = std::make_shared>(); uint handler_called = 0; std::shared_ptr session; session_test2() { - session = std::make_shared(mock.io_context, 0, TEST_ENDPOINT, this->mock_chaos, [&](auto, auto){this->handler_called++;}, TEST_TIMEOUT, [](){}, nullptr); + session = std::make_shared(mock.io_context, 0, TEST_ENDPOINT, this->mock_chaos, [&](auto, auto){this->handler_called++;}, TEST_TIMEOUT, [](){}, nullptr, this->monitor); } void yield() @@ -93,7 +96,7 @@ namespace bzn EXPECT_CALL(*mock_websocket_stream, async_read(_,_)); - auto session = std::make_shared(this->io_context, bzn::session_id(1), TEST_ENDPOINT, this->mock_chaos, [](auto, auto){}, TEST_TIMEOUT, [](){}, nullptr); + auto session = std::make_shared(this->io_context, bzn::session_id(1), TEST_ENDPOINT, this->mock_chaos, [](auto, auto){}, TEST_TIMEOUT, [](){}, nullptr, this->monitor); session->accept(mock_websocket_stream); accept_handler(boost::system::error_code{}); @@ -146,7 +149,7 @@ namespace bzn bzn::smart_mock_io mock; mock.tcp_connect_works = false; - auto session = std::make_shared(mock.io_context, 0, TEST_ENDPOINT, this->mock_chaos, [](auto, auto){}, TEST_TIMEOUT, [](){}, nullptr); + auto session = std::make_shared(mock.io_context, 0, TEST_ENDPOINT, this->mock_chaos, [](auto, auto){}, TEST_TIMEOUT, [](){}, nullptr, this->monitor); session->open(mock.websocket); this->yield(); diff --git a/pbft/pbft.cpp b/pbft/pbft.cpp index 11e2faac..276f538d 100644 --- a/pbft/pbft.cpp +++ b/pbft/pbft.cpp @@ -41,6 +41,7 @@ pbft::pbft( , std::shared_ptr failure_detector , std::shared_ptr crypto , std::shared_ptr operation_manager + , std::shared_ptr monitor ) : node(std::move(node)) , uuid(options->get_uuid()) @@ -53,6 +54,7 @@ pbft::pbft( , join_retry_timer(this->io_context->make_unique_steady_timer()) , crypto(std::move(crypto)) , operation_manager(std::move(operation_manager)) + , monitor(std::move(monitor)) { if (peers.empty()) { @@ -299,6 +301,8 @@ pbft::handle_request(const bzn_envelope& request_env, const std::shared_ptrmonitor->start_timer(hash); + if (!this->is_primary()) { this->forward_request_to_primary(request_env); @@ -1073,6 +1077,7 @@ pbft::handle_database_response_message(const bzn_envelope& msg, std::shared_ptr< session_it != this->sessions_waiting_on_forwarded_requests.end()) { session_it->second->send_message(std::make_shared(msg.SerializeAsString())); + this->monitor->finish_timer(bzn::statistic::request_latency, db_msg.header().request_hash()); return; } diff --git a/pbft/pbft.hpp b/pbft/pbft.hpp index 9d01339c..7bf1fea8 100644 --- a/pbft/pbft.hpp +++ b/pbft/pbft.hpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -68,6 +69,7 @@ namespace bzn , std::shared_ptr failure_detector , std::shared_ptr crypto , std::shared_ptr operation_manager + , std::shared_ptr monitor ); void start() override; @@ -272,6 +274,7 @@ namespace bzn std::shared_ptr saved_newview; std::shared_ptr operation_manager; + std::shared_ptr monitor; FRIEND_TEST(pbft_viewchange_test, pbft_with_invalid_view_drops_messages); FRIEND_TEST(pbft_viewchange_test, test_make_signed_envelope); diff --git a/pbft/test/pbft_join_leave_test.cpp b/pbft/test/pbft_join_leave_test.cpp index 33829bff..35c9c9f3 100644 --- a/pbft/test/pbft_join_leave_test.cpp +++ b/pbft/test/pbft_join_leave_test.cpp @@ -100,7 +100,8 @@ namespace bzn preprepare.set_sequence(1); preprepare.set_type(PBFT_MSG_PREPREPARE); preprepare.set_allocated_request(new bzn_envelope(*req)); - auto crypto = std::make_shared(std::make_shared()); + auto monitor = std::make_shared>(); + auto crypto = std::make_shared(std::make_shared(), monitor); auto expect_hash = crypto->hash(preprepare.request()); preprepare.set_request_hash(expect_hash); @@ -372,8 +373,9 @@ namespace bzn .WillOnce(Invoke([&](){return std::move(join_retry_timer2);})); EXPECT_CALL(*mock_options, get_uuid()).WillRepeatedly(Return("uuid2")); EXPECT_CALL(*mock_options, get_simple_options()).WillRepeatedly(ReturnRef(this->options->get_simple_options())); + auto monitor = std::make_shared>(); auto pbft2 = std::make_shared(mock_node2, mock_io_context2, TEST_PEER_LIST, mock_options, mock_service2, - this->mock_failure_detector, this->crypto, manager2); + this->mock_failure_detector, this->crypto, manager2, monitor); pbft2->set_audit_enabled(false); pbft2->start(); @@ -492,7 +494,7 @@ namespace bzn EXPECT_CALL(*mock_options3, get_uuid()).WillRepeatedly(Return("uuid3")); EXPECT_CALL(*mock_options3, get_simple_options()).WillRepeatedly(ReturnRef(this->options->get_simple_options())); auto pbft3 = std::make_shared(mock_node3, mock_io_context3, TEST_PEER_LIST, mock_options3, mock_service3, - this->mock_failure_detector, this->crypto, manager3); + this->mock_failure_detector, this->crypto, manager3, monitor); pbft3->set_audit_enabled(false); pbft3->start(); @@ -671,6 +673,7 @@ namespace bzn TEST_F(pbft_join_leave_test, pbft_join_swarm_does_not_bail_on_good_peers_list) { this->options->get_mutable_simple_options().set("uuid", "current_uuid"); + auto monitor = std::make_shared>(); this->pbft = std::make_shared( this->mock_node , this->mock_io_context @@ -680,6 +683,7 @@ namespace bzn , this->mock_failure_detector , this->crypto , this->operation_manager + , monitor ); this->pbft->set_audit_enabled(false); EXPECT_NO_THROW({ @@ -691,6 +695,7 @@ namespace bzn TEST_F(pbft_join_leave_test, pbft_join_swarm_bails_on_bad_peers_list_with_127_0_0_1_and_same_listen_port) { this->options->get_mutable_simple_options().set("uuid", "current_uuid"); + auto monitor = std::make_shared>(); this->pbft = std::make_shared( this->mock_node , this->mock_io_context @@ -700,6 +705,7 @@ namespace bzn , this->mock_failure_detector , this->crypto , this->operation_manager + , monitor ); this->pbft->set_audit_enabled(false); EXPECT_THROW({this->pbft->start(); }, std::runtime_error); diff --git a/pbft/test/pbft_test_common.cpp b/pbft/test/pbft_test_common.cpp index 357588fe..ede398cf 100644 --- a/pbft/test/pbft_test_common.cpp +++ b/pbft/test/pbft_test_common.cpp @@ -137,6 +137,7 @@ namespace bzn::test , this->mock_failure_detector , this->crypto , this->operation_manager + , this->monitor ); this->pbft->set_audit_enabled(false); this->pbft->start(); diff --git a/pbft/test/pbft_test_common.hpp b/pbft/test/pbft_test_common.hpp index cc20a732..28025fb0 100644 --- a/pbft/test/pbft_test_common.hpp +++ b/pbft/test/pbft_test_common.hpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -83,7 +84,8 @@ namespace bzn::test std::make_shared(storage); std::shared_ptr options = std::make_shared(); - std::shared_ptr crypto = std::make_shared(options); + std::shared_ptr monitor = std::make_shared>(); + std::shared_ptr crypto = std::make_shared(options, monitor); std::shared_ptr pbft; diff --git a/pbft/test/pbft_viewchange_test.cpp b/pbft/test/pbft_viewchange_test.cpp index 5128d471..2caab5ad 100644 --- a/pbft/test/pbft_viewchange_test.cpp +++ b/pbft/test/pbft_viewchange_test.cpp @@ -352,7 +352,8 @@ namespace bzn EXPECT_CALL(*mock_options, get_uuid()).WillRepeatedly(Invoke([](){return "uuid2";})); auto manager2 = std::make_shared(); - auto pbft2 = std::make_shared(mock_node2, mock_io_context2, TEST_PEER_LIST, mock_options, mock_service2, this->mock_failure_detector, this->crypto, manager2); + auto monitor = std::make_shared>(); + auto pbft2 = std::make_shared(mock_node2, mock_io_context2, TEST_PEER_LIST, mock_options, mock_service2, this->mock_failure_detector, this->crypto, manager2, monitor); pbft2->set_audit_enabled(false); pbft2->start(); diff --git a/swarm/CMakeLists.txt b/swarm/CMakeLists.txt index ea6b39ce..374648dc 100644 --- a/swarm/CMakeLists.txt +++ b/swarm/CMakeLists.txt @@ -1,4 +1,4 @@ add_executable(swarm main.cpp) add_dependencies(swarm boost jsoncpp rocksdb) target_include_directories(swarm PRIVATE ${JSONCPP_INCLUDE_DIRS} ${ROCKSDB_INCLUDE_DIRS}) -target_link_libraries(swarm node pbft audit crud chaos options ethereum bootstrap storage crypto proto ${Protobuf_LIBRARIES} status ${ROCKSDB_LIBRARIES} ${Boost_LIBRARIES} ${JSONCPP_LIBRARIES} pthread) +target_link_libraries(swarm node pbft audit crud chaos options ethereum bootstrap storage crypto monitor proto ${Protobuf_LIBRARIES} status ${ROCKSDB_LIBRARIES} ${Boost_LIBRARIES} ${JSONCPP_LIBRARIES} pthread) diff --git a/swarm/main.cpp b/swarm/main.cpp index a5737fc5..0ec8ab8a 100644 --- a/swarm/main.cpp +++ b/swarm/main.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -247,10 +248,11 @@ main(int argc, const char* argv[]) }); // startup... - auto crypto = std::make_shared(options); + auto monitor = std::make_shared(options, io_context, std::make_shared()); + auto crypto = std::make_shared(options, monitor); auto chaos = std::make_shared(io_context, options); auto websocket = std::make_shared(); - auto node = std::make_shared(io_context, websocket, chaos, boost::asio::ip::tcp::endpoint{options->get_listener()}, crypto, options); + auto node = std::make_shared(io_context, websocket, chaos, boost::asio::ip::tcp::endpoint{options->get_listener()}, crypto, options, monitor); auto audit = std::make_shared(io_context, node, options->get_monitor_endpoint(io_context), options->get_uuid(), options->get_audit_mem_size()); std::shared_ptr status; @@ -280,7 +282,7 @@ main(int argc, const char* argv[]) auto pbft = std::make_shared(node, io_context, peers.get_peers(), options, std::make_shared(io_context, unstable_storage, crud, options->get_uuid()) - ,failure_detector , crypto, operation_manager); + ,failure_detector , crypto, operation_manager, monitor); pbft->set_audit_enabled(options->get_simple_options().get(bzn::option_names::AUDIT_ENABLED));