Skip to content

Commit

Permalink
debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
coranos committed May 28, 2023
1 parent bb59d44 commit 1f864f5
Show file tree
Hide file tree
Showing 17 changed files with 1,562 additions and 468 deletions.
63 changes: 63 additions & 0 deletions nano/node/transport/channel.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#include <nano/node/common.hpp>
#include <nano/node/node.hpp>
#include <nano/node/transport/channel.hpp>
#include <nano/node/transport/transport.hpp>

#include <boost/asio/ip/address.hpp>
#include <boost/asio/ip/address_v4.hpp>
#include <boost/asio/ip/address_v6.hpp>
#include <boost/format.hpp>

nano::transport::channel::channel (nano::node & node_a) :
node{ node_a }
{
set_network_version (node_a.network_params.network.protocol_version);
}

void nano::transport::channel::send (nano::message & message_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type)
{
auto buffer (message_a.to_shared_const_buffer ());
auto detail = nano::to_stat_detail (message_a.header.type);
auto is_droppable_by_limiter = (drop_policy_a == nano::transport::buffer_drop_policy::limiter);
auto should_pass (node.outbound_limiter.should_pass (buffer.size (), to_bandwidth_limit_type (traffic_type)));
if (!is_droppable_by_limiter || should_pass)
{
send_buffer (buffer, callback_a, drop_policy_a, traffic_type);
node.stats.inc (nano::stat::type::message, detail, nano::stat::dir::out);
}
else
{
if (callback_a)
{
node.background ([callback_a] () {
callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
});
}

node.stats.inc (nano::stat::type::drop, detail, nano::stat::dir::out);
if (node.config.logging.network_packet_logging ())
{
node.logger.always_log (boost::str (boost::format ("%1% of size %2% dropped") % nano::to_string (detail) % buffer.size ()));
}
}
}

void nano::transport::channel::set_peering_endpoint (nano::endpoint endpoint)
{
nano::lock_guard<nano::mutex> lock{ channel_mutex };
peering_endpoint = endpoint;
}

nano::endpoint nano::transport::channel::get_peering_endpoint () const
{
nano::unique_lock<nano::mutex> lock{ channel_mutex };
if (peering_endpoint)
{
return *peering_endpoint;
}
else
{
lock.unlock ();
return get_endpoint ();
}
}
186 changes: 186 additions & 0 deletions nano/node/transport/channel.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
#pragma once

#include <nano/lib/locks.hpp>
#include <nano/lib/stats.hpp>
#include <nano/node/bandwidth_limiter.hpp>
#include <nano/node/common.hpp>
#include <nano/node/messages.hpp>
#include <nano/node/transport/socket.hpp>

#include <boost/asio/ip/network_v6.hpp>

namespace nano::transport
{
enum class transport_type : uint8_t
{
undefined = 0,
tcp = 1,
loopback = 2,
fake = 3
};

class channel
{
public:
explicit channel (nano::node &);
virtual ~channel () = default;

virtual std::size_t hash_code () const = 0;
virtual bool operator== (nano::transport::channel const &) const = 0;

void send (nano::message & message_a,
std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a = nullptr,
nano::transport::buffer_drop_policy policy_a = nano::transport::buffer_drop_policy::limiter,
nano::transport::traffic_type = nano::transport::traffic_type::generic);

// TODO: investigate clang-tidy warning about default parameters on virtual/override functions
virtual void send_buffer (nano::shared_const_buffer const &,
std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr,
nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter,
nano::transport::traffic_type = nano::transport::traffic_type::generic)
= 0;

virtual std::string to_string () const = 0;
virtual nano::endpoint get_endpoint () const = 0;
virtual nano::tcp_endpoint get_tcp_endpoint () const = 0;
virtual nano::transport::transport_type get_type () const = 0;

virtual bool max (nano::transport::traffic_type = nano::transport::traffic_type::generic)
{
return false;
}
virtual bool alive () const
{
return true;
}

std::chrono::steady_clock::time_point get_last_bootstrap_attempt () const
{
nano::lock_guard<nano::mutex> lk (channel_mutex);
return last_bootstrap_attempt;
}

void set_last_bootstrap_attempt (std::chrono::steady_clock::time_point const time_a)
{
nano::lock_guard<nano::mutex> lk (channel_mutex);
last_bootstrap_attempt = time_a;
}

std::chrono::steady_clock::time_point get_last_packet_received () const
{
nano::lock_guard<nano::mutex> lk (channel_mutex);
return last_packet_received;
}

void set_last_packet_received (std::chrono::steady_clock::time_point const time_a)
{
nano::lock_guard<nano::mutex> lk (channel_mutex);
last_packet_received = time_a;
}

std::chrono::steady_clock::time_point get_last_packet_sent () const
{
nano::lock_guard<nano::mutex> lk (channel_mutex);
return last_packet_sent;
}

void set_last_packet_sent (std::chrono::steady_clock::time_point const time_a)
{
nano::lock_guard<nano::mutex> lk (channel_mutex);
last_packet_sent = time_a;
}

boost::optional<nano::account> get_node_id_optional () const
{
nano::lock_guard<nano::mutex> lk (channel_mutex);
return node_id;
}

nano::account get_node_id () const
{
nano::lock_guard<nano::mutex> lk (channel_mutex);
if (node_id.is_initialized ())
{
return node_id.get ();
}
else
{
return 0;
}
}

void set_node_id (nano::account node_id_a)
{
nano::lock_guard<nano::mutex> lk (channel_mutex);
node_id = node_id_a;
}

uint8_t get_network_version () const
{
return network_version;
}

void set_network_version (uint8_t network_version_a)
{
network_version = network_version_a;
}

nano::endpoint get_peering_endpoint () const;
void set_peering_endpoint (nano::endpoint endpoint);

mutable nano::mutex channel_mutex;

private:
std::chrono::steady_clock::time_point last_bootstrap_attempt{ std::chrono::steady_clock::time_point () };
std::chrono::steady_clock::time_point last_packet_received{ std::chrono::steady_clock::now () };
std::chrono::steady_clock::time_point last_packet_sent{ std::chrono::steady_clock::now () };
boost::optional<nano::account> node_id{ boost::none };
std::atomic<uint8_t> network_version{ 0 };
std::optional<nano::endpoint> peering_endpoint{};

protected:
nano::node & node;
};
}

namespace std
{
template <>
struct hash<::nano::transport::channel>
{
std::size_t operator() (::nano::transport::channel const & channel_a) const
{
return channel_a.hash_code ();
}
};
template <>
struct equal_to<std::reference_wrapper<::nano::transport::channel const>>
{
bool operator() (std::reference_wrapper<::nano::transport::channel const> const & lhs, std::reference_wrapper<::nano::transport::channel const> const & rhs) const
{
return lhs.get () == rhs.get ();
}
};
}

namespace boost
{
template <>
struct hash<::nano::transport::channel>
{
std::size_t operator() (::nano::transport::channel const & channel_a) const
{
std::hash<::nano::transport::channel> hash;
return hash (channel_a);
}
};
template <>
struct hash<std::reference_wrapper<::nano::transport::channel const>>
{
std::size_t operator() (std::reference_wrapper<::nano::transport::channel const> const & channel_a) const
{
std::hash<::nano::transport::channel> hash;
return hash (channel_a.get ());
}
};
}
6 changes: 3 additions & 3 deletions nano/node/transport/fake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ nano::transport::fake::channel::channel (nano::node & node) :

/**
* The send function behaves like a null device, it throws the data away and returns success.
*/
void nano::transport::fake::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::buffer_drop_policy drop_policy_a)
*/
void nano::transport::fake::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type)
{
//auto bytes = buffer_a.to_bytes ();
// auto bytes = buffer_a.to_bytes ();
auto size = buffer_a.size ();
if (callback_a)
{
Expand Down
11 changes: 5 additions & 6 deletions nano/node/transport/fake.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <nano/node/transport/channel.hpp>
#include <nano/node/transport/transport.hpp>

namespace nano
Expand All @@ -19,13 +20,11 @@ namespace transport
std::string to_string () const override;
std::size_t hash_code () const override;

// clang-format off
void send_buffer (
nano::shared_const_buffer const &,
std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr,
nano::buffer_drop_policy = nano::buffer_drop_policy::limiter
) override;
// clang-format on
nano::shared_const_buffer const &,
std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr,
nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter,
nano::transport::traffic_type = nano::transport::traffic_type::generic) override;

bool operator== (nano::transport::channel const &) const override;
bool operator== (nano::transport::fake::channel const & other_a) const;
Expand Down
44 changes: 32 additions & 12 deletions nano/node/transport/inproc.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include <nano/node/network.hpp>
#include <nano/node/node.hpp>
#include <nano/node/transport/inproc.hpp>
#include <nano/node/transport/message_deserializer.hpp>

#include <boost/format.hpp>

Expand Down Expand Up @@ -51,25 +53,43 @@ class message_visitor_inbound : public nano::message_visitor
* Send the buffer to the peer and call the callback function when done. The call never fails.
* Note that the inbound message visitor will be called before the callback because it is called directly whereas the callback is spawned in the background.
*/
void nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::buffer_drop_policy drop_policy_a)
void nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type)
{
// we create a temporary channel for the reply path, in case the receiver of the message wants to reply
auto remote_channel = std::make_shared<nano::transport::inproc::channel> (destination, node);
std::size_t offset{ 0 };
auto const buffer_read_fn = [&offset, buffer_v = buffer_a.to_bytes ()] (std::shared_ptr<std::vector<uint8_t>> const & data_a, std::size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a) {
debug_assert (buffer_v.size () >= (offset + size_a));
data_a->resize (size_a);
auto const copy_start = buffer_v.begin () + offset;
std::copy (copy_start, copy_start + size_a, data_a->data ());
offset += size_a;
callback_a (boost::system::errc::make_error_code (boost::system::errc::success), size_a);
};

// create an inbound message visitor class to handle incoming messages because that's what the message parser expects
message_visitor_inbound visitor{ destination.network.inbound, remote_channel };
auto const message_deserializer = std::make_shared<nano::transport::message_deserializer> (node.network_params.network, node.network.publish_filter, node.block_uniquer, node.vote_uniquer, buffer_read_fn);
message_deserializer->read (
[this] (boost::system::error_code ec_a, std::unique_ptr<nano::message> message_a) {
if (ec_a || !message_a)
{
return;
}

nano::message_parser parser{ destination.network.publish_filter, destination.block_uniquer, destination.vote_uniquer, visitor, destination.work, destination.network_params.network };
// we create a temporary channel for the reply path, in case the receiver of the message wants to reply
auto remote_channel = std::make_shared<nano::transport::inproc::channel> (destination, node);

// parse the message and action any work that needs to be done on that object via the visitor object
auto bytes = buffer_a.to_bytes ();
auto size = bytes.size ();
parser.deserialize_buffer (bytes.data (), size);
// process message
{
node.stats.inc (nano::stat::type::message, nano::to_stat_detail (message_a->header.type), nano::stat::dir::in);

// create an inbound message visitor class to handle incoming messages
message_visitor_inbound visitor{ destination.network.inbound, remote_channel };
message_a->visit (visitor);
}
});

if (callback_a)
{
node.background ([callback_a, size] () {
callback_a (boost::system::errc::make_error_code (boost::system::errc::success), size);
node.background ([callback_l = std::move (callback_a), buffer_size = buffer_a.size ()] () {
callback_l (boost::system::errc::make_error_code (boost::system::errc::success), buffer_size);
});
}
}
Expand Down
8 changes: 5 additions & 3 deletions nano/node/transport/inproc.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <nano/node/transport/channel.hpp>
#include <nano/node/transport/transport.hpp>

namespace nano
Expand All @@ -8,7 +9,7 @@ namespace transport
{
/**
* In-process transport channel. Mostly useful for unit tests
**/
**/
namespace inproc
{
class channel final : public nano::transport::channel
Expand All @@ -17,9 +18,10 @@ namespace transport
explicit channel (nano::node & node, nano::node & destination);
std::size_t hash_code () const override;
bool operator== (nano::transport::channel const &) const override;

// TODO: investigate clang-tidy warning about default parameters on virtual/override functions
//
void send_buffer (nano::shared_const_buffer const &, std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr, nano::buffer_drop_policy = nano::buffer_drop_policy::limiter) override;
void send_buffer (nano::shared_const_buffer const &, std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type = nano::transport::traffic_type::generic) override;

std::string to_string () const override;
bool operator== (nano::transport::inproc::channel const & other_a) const
{
Expand Down
Loading

0 comments on commit 1f864f5

Please sign in to comment.