Skip to content

Commit 1fa6411

Browse files
committed
Merge #28165: net: transport abstraction
8a3b6f3 refactor: make Transport::ReceivedBytes just return success/fail (Pieter Wuille) bb4aab9 net: move message conversion to wire bytes from PushMessage to SocketSendData (Pieter Wuille) a1a1060 net: measure send buffer fullness based on memory usage (Pieter Wuille) 009ff8d fuzz: add bidirectional fragmented transport test (Pieter Wuille) fb2c5ed net: make V1Transport implicitly use current chainparams (Pieter Wuille) 0de48fe net: abstract sending side of transport serialization further (Pieter Wuille) 649a83c refactor: rename Transport class receive functions (Pieter Wuille) 27f9ba2 net: add V1Transport lock protecting receive state (Pieter Wuille) 93594e4 refactor: merge transport serializer and deserializer into Transport class (Pieter Wuille) Pull request description: This PR furthers the P2P message serialization/deserialization abstraction introduced in #16202 and #16562, in preparation for introducing the BIP324 v2 transport (making this part of #27634). However, nothing in this PR is BIP324-specific, and it contains a number of independently useful improvements. The overall idea is to have a single object in every `CNode` (called `m_transport`) that is responsible for converting sent messages to wire bytes, and for converting received wire bytes back to messages, while having as little as possible knowledge about this conversion process in higher-level net code. To accomplish that, there is an abstract `Transport` class with (currently) a single `V1Transport` implementation. Structurally, the above is accomplished by: * Merging the `TransportDeserializer` and `TransportSerializer` classes into a single `Transport` class, which encompasses both the sending and receiving side. For `V1Transport` these two sides are entirely separate, but this assumption doesn't hold for the BIP324 transport where e.g. the sending encryption key depends on the DH key negotiation data received from the other side. Merging the two means a future `V2Transport` can handle all this interaction without callers needing to be aware. * Removing the assumption that each message is sent using a computed header followed by (unmodified) data bytes. To achieve that, the sending side of `Transport` mirrors what the receiver side does: callers can set a message to be sent, then ask what bytes must be sent out, and then allowing them to transition to the next message. * Adding internal locks to protect the sending and receiving state of the `V1Transport` implementation. I believe these aren't strictly needed (opinions welcome) as there is no real way to use `Transport` objects in a multi-threaded fashion without some form of external synchronization (e.g. "get next bytes to send" isn't meaningful to call from multiple threads at the same time without mechanism to control the order they'll actually get sent). Still, I feel it's cleaner to make the object responsible for its own consistency (as we definitely do not want the entire object to be under a single external GUARDED_BY, as that'd prevent simultaneous sending and receiving). * Moving the conversion of messages to bytes on the sending side from `PushMessage` to `SocketSendData`, which is needed to deal with the fact that a transport may not immediately be able to send messages. This PR is not a refactor, though some commits are. Among the semantic changes are: * Changing the send buffer pushback mechanism to trigger based on the memory usage of the buffer rather than the amount of bytes to be sent. This is both closer to the desired behavior, and makes the buffering independent from transport details (which is why it's included here). * When optimistic send is not applicable, the V1 message checksum calculation now runs in the net thread rather than the message handling thread. I believe that's generally an improvement, as the message handling thread is far more computationally bottlenecked already. * The checksum calculation now runs under the `CNode::cs_vSend` lock, which does mean no two checksum calculations for messages sent to the same node can run in parallel, even if running in separate threads. Despite that limitation, having the checksum for non-optimistic sends moved in the net thread is still an improvement, I believe. * Statistics for per-message-type sent bytes are now updated when the bytes are actually handed to the OS rather than in `PushMessage`. This is because the actual serialized sizes aren't known until they've gone through the transport object. A fuzz test of the entire `V1Transport` is included. More elaborate rationale for each of the changes can be found in the commit messages. ACKs for top commit: theStack: re-ACK 8a3b6f3 vasild: ACK 8a3b6f3 dergoegge: Code review ACK 8a3b6f3 Tree-SHA512: 26e9a6df47f1dd3e3f3edb4874edf365728e5a8bbc9d0d4d71fb6000cb2dfde5574902c47ffcf825af6743922f2ff9d31a5a38942a196f4ca6669122e15e42e4
2 parents 083316c + 8a3b6f3 commit 1fa6411

8 files changed

+578
-138
lines changed

src/init.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ void SetupServerArgs(ArgsManager& argsman)
490490
argsman.AddArg("-listenonion", strprintf("Automatically create Tor onion service (default: %d)", DEFAULT_LISTEN_ONION), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
491491
argsman.AddArg("-maxconnections=<n>", strprintf("Maintain at most <n> connections to peers (default: %u). This limit does not apply to connections manually added via -addnode or the addnode RPC, which have a separate limit of %u.", DEFAULT_MAX_PEER_CONNECTIONS, MAX_ADDNODE_CONNECTIONS), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
492492
argsman.AddArg("-maxreceivebuffer=<n>", strprintf("Maximum per-connection receive buffer, <n>*1000 bytes (default: %u)", DEFAULT_MAXRECEIVEBUFFER), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
493-
argsman.AddArg("-maxsendbuffer=<n>", strprintf("Maximum per-connection send buffer, <n>*1000 bytes (default: %u)", DEFAULT_MAXSENDBUFFER), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
493+
argsman.AddArg("-maxsendbuffer=<n>", strprintf("Maximum per-connection memory usage for the send buffer, <n>*1000 bytes (default: %u)", DEFAULT_MAXSENDBUFFER), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
494494
argsman.AddArg("-maxtimeadjustment", strprintf("Maximum allowed median peer time offset adjustment. Local perspective of time may be influenced by outbound peers forward or backward by this amount (default: %u seconds).", DEFAULT_MAX_TIME_ADJUSTMENT), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
495495
argsman.AddArg("-maxuploadtarget=<n>", strprintf("Tries to keep outbound traffic under the given target per 24h. Limit does not apply to peers with 'download' permission or blocks created within past week. 0 = no limit (default: %s). Optional suffix units [k|K|m|M|g|G|t|T] (default: M). Lowercase is 1000 base while uppercase is 1024 base", DEFAULT_MAX_UPLOAD_TARGET), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
496496
argsman.AddArg("-onion=<ip:port>", "Use separate SOCKS5 proxy to reach peers via Tor onion services, set -noonion to disable (default: -proxy)", ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);

src/net.cpp

+146-52
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <crypto/sha256.h>
2020
#include <i2p.h>
2121
#include <logging.h>
22+
#include <memusage.h>
2223
#include <net_permissions.h>
2324
#include <netaddress.h>
2425
#include <netbase.h>
@@ -116,6 +117,14 @@ std::map<CNetAddr, LocalServiceInfo> mapLocalHost GUARDED_BY(g_maplocalhost_mute
116117
static bool vfLimited[NET_MAX] GUARDED_BY(g_maplocalhost_mutex) = {};
117118
std::string strSubVersion;
118119

120+
size_t CSerializedNetMsg::GetMemoryUsage() const noexcept
121+
{
122+
// Don't count the dynamic memory used for the m_type string, by assuming it fits in the
123+
// "small string" optimization area (which stores data inside the object itself, up to some
124+
// size; 15 bytes in modern libstdc++).
125+
return sizeof(*this) + memusage::DynamicUsage(data);
126+
}
127+
119128
void CConnman::AddAddrFetch(const std::string& strDest)
120129
{
121130
LOCK(m_addr_fetches_mutex);
@@ -681,16 +690,15 @@ bool CNode::ReceiveMsgBytes(Span<const uint8_t> msg_bytes, bool& complete)
681690
nRecvBytes += msg_bytes.size();
682691
while (msg_bytes.size() > 0) {
683692
// absorb network data
684-
int handled = m_deserializer->Read(msg_bytes);
685-
if (handled < 0) {
686-
// Serious header problem, disconnect from the peer.
693+
if (!m_transport->ReceivedBytes(msg_bytes)) {
694+
// Serious transport problem, disconnect from the peer.
687695
return false;
688696
}
689697

690-
if (m_deserializer->Complete()) {
698+
if (m_transport->ReceivedMessageComplete()) {
691699
// decompose a transport agnostic CNetMessage from the deserializer
692700
bool reject_message{false};
693-
CNetMessage msg = m_deserializer->GetMessage(time, reject_message);
701+
CNetMessage msg = m_transport->GetReceivedMessage(time, reject_message);
694702
if (reject_message) {
695703
// Message deserialization failed. Drop the message but don't disconnect the peer.
696704
// store the size of the corrupt message
@@ -717,8 +725,18 @@ bool CNode::ReceiveMsgBytes(Span<const uint8_t> msg_bytes, bool& complete)
717725
return true;
718726
}
719727

720-
int V1TransportDeserializer::readHeader(Span<const uint8_t> msg_bytes)
728+
V1Transport::V1Transport(const NodeId node_id, int nTypeIn, int nVersionIn) noexcept :
729+
m_node_id(node_id), hdrbuf(nTypeIn, nVersionIn), vRecv(nTypeIn, nVersionIn)
721730
{
731+
assert(std::size(Params().MessageStart()) == std::size(m_magic_bytes));
732+
std::copy(std::begin(Params().MessageStart()), std::end(Params().MessageStart()), m_magic_bytes);
733+
LOCK(m_recv_mutex);
734+
Reset();
735+
}
736+
737+
int V1Transport::readHeader(Span<const uint8_t> msg_bytes)
738+
{
739+
AssertLockHeld(m_recv_mutex);
722740
// copy data to temporary parsing buffer
723741
unsigned int nRemaining = CMessageHeader::HEADER_SIZE - nHdrPos;
724742
unsigned int nCopy = std::min<unsigned int>(nRemaining, msg_bytes.size());
@@ -740,7 +758,7 @@ int V1TransportDeserializer::readHeader(Span<const uint8_t> msg_bytes)
740758
}
741759

742760
// Check start string, network magic
743-
if (memcmp(hdr.pchMessageStart, m_chain_params.MessageStart(), CMessageHeader::MESSAGE_START_SIZE) != 0) {
761+
if (memcmp(hdr.pchMessageStart, m_magic_bytes, CMessageHeader::MESSAGE_START_SIZE) != 0) {
744762
LogPrint(BCLog::NET, "Header error: Wrong MessageStart %s received, peer=%d\n", HexStr(hdr.pchMessageStart), m_node_id);
745763
return -1;
746764
}
@@ -757,8 +775,9 @@ int V1TransportDeserializer::readHeader(Span<const uint8_t> msg_bytes)
757775
return nCopy;
758776
}
759777

760-
int V1TransportDeserializer::readData(Span<const uint8_t> msg_bytes)
778+
int V1Transport::readData(Span<const uint8_t> msg_bytes)
761779
{
780+
AssertLockHeld(m_recv_mutex);
762781
unsigned int nRemaining = hdr.nMessageSize - nDataPos;
763782
unsigned int nCopy = std::min<unsigned int>(nRemaining, msg_bytes.size());
764783

@@ -774,19 +793,22 @@ int V1TransportDeserializer::readData(Span<const uint8_t> msg_bytes)
774793
return nCopy;
775794
}
776795

777-
const uint256& V1TransportDeserializer::GetMessageHash() const
796+
const uint256& V1Transport::GetMessageHash() const
778797
{
779-
assert(Complete());
798+
AssertLockHeld(m_recv_mutex);
799+
assert(CompleteInternal());
780800
if (data_hash.IsNull())
781801
hasher.Finalize(data_hash);
782802
return data_hash;
783803
}
784804

785-
CNetMessage V1TransportDeserializer::GetMessage(const std::chrono::microseconds time, bool& reject_message)
805+
CNetMessage V1Transport::GetReceivedMessage(const std::chrono::microseconds time, bool& reject_message)
786806
{
807+
AssertLockNotHeld(m_recv_mutex);
787808
// Initialize out parameter
788809
reject_message = false;
789810
// decompose a single CNetMessage from the TransportDeserializer
811+
LOCK(m_recv_mutex);
790812
CNetMessage msg(std::move(vRecv));
791813

792814
// store message type string, time, and sizes
@@ -819,53 +841,122 @@ CNetMessage V1TransportDeserializer::GetMessage(const std::chrono::microseconds
819841
return msg;
820842
}
821843

822-
void V1TransportSerializer::prepareForTransport(CSerializedNetMsg& msg, std::vector<unsigned char>& header) const
844+
bool V1Transport::SetMessageToSend(CSerializedNetMsg& msg) noexcept
823845
{
846+
AssertLockNotHeld(m_send_mutex);
847+
// Determine whether a new message can be set.
848+
LOCK(m_send_mutex);
849+
if (m_sending_header || m_bytes_sent < m_message_to_send.data.size()) return false;
850+
824851
// create dbl-sha256 checksum
825852
uint256 hash = Hash(msg.data);
826853

827854
// create header
828-
CMessageHeader hdr(Params().MessageStart(), msg.m_type.c_str(), msg.data.size());
855+
CMessageHeader hdr(m_magic_bytes, msg.m_type.c_str(), msg.data.size());
829856
memcpy(hdr.pchChecksum, hash.begin(), CMessageHeader::CHECKSUM_SIZE);
830857

831858
// serialize header
832-
header.reserve(CMessageHeader::HEADER_SIZE);
833-
CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, header, 0, hdr};
859+
m_header_to_send.clear();
860+
CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, m_header_to_send, 0, hdr};
861+
862+
// update state
863+
m_message_to_send = std::move(msg);
864+
m_sending_header = true;
865+
m_bytes_sent = 0;
866+
return true;
867+
}
868+
869+
Transport::BytesToSend V1Transport::GetBytesToSend() const noexcept
870+
{
871+
AssertLockNotHeld(m_send_mutex);
872+
LOCK(m_send_mutex);
873+
if (m_sending_header) {
874+
return {Span{m_header_to_send}.subspan(m_bytes_sent),
875+
// We have more to send after the header if the message has payload.
876+
!m_message_to_send.data.empty(),
877+
m_message_to_send.m_type
878+
};
879+
} else {
880+
return {Span{m_message_to_send.data}.subspan(m_bytes_sent),
881+
// We never have more to send after this message's payload.
882+
false,
883+
m_message_to_send.m_type
884+
};
885+
}
886+
}
887+
888+
void V1Transport::MarkBytesSent(size_t bytes_sent) noexcept
889+
{
890+
AssertLockNotHeld(m_send_mutex);
891+
LOCK(m_send_mutex);
892+
m_bytes_sent += bytes_sent;
893+
if (m_sending_header && m_bytes_sent == m_header_to_send.size()) {
894+
// We're done sending a message's header. Switch to sending its data bytes.
895+
m_sending_header = false;
896+
m_bytes_sent = 0;
897+
} else if (!m_sending_header && m_bytes_sent == m_message_to_send.data.size()) {
898+
// We're done sending a message's data. Wipe the data vector to reduce memory consumption.
899+
m_message_to_send.data.clear();
900+
m_message_to_send.data.shrink_to_fit();
901+
m_bytes_sent = 0;
902+
}
903+
}
904+
905+
size_t V1Transport::GetSendMemoryUsage() const noexcept
906+
{
907+
AssertLockNotHeld(m_send_mutex);
908+
LOCK(m_send_mutex);
909+
// Don't count sending-side fields besides m_message_to_send, as they're all small and bounded.
910+
return m_message_to_send.GetMemoryUsage();
834911
}
835912

836913
std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
837914
{
838915
auto it = node.vSendMsg.begin();
839916
size_t nSentSize = 0;
840-
841-
while (it != node.vSendMsg.end()) {
842-
const auto& data = *it;
843-
assert(data.size() > node.nSendOffset);
917+
bool data_left{false}; //!< second return value (whether unsent data remains)
918+
919+
while (true) {
920+
if (it != node.vSendMsg.end()) {
921+
// If possible, move one message from the send queue to the transport. This fails when
922+
// there is an existing message still being sent.
923+
size_t memusage = it->GetMemoryUsage();
924+
if (node.m_transport->SetMessageToSend(*it)) {
925+
// Update memory usage of send buffer (as *it will be deleted).
926+
node.m_send_memusage -= memusage;
927+
++it;
928+
}
929+
}
930+
const auto& [data, more, msg_type] = node.m_transport->GetBytesToSend();
931+
data_left = !data.empty(); // will be overwritten on next loop if all of data gets sent
844932
int nBytes = 0;
845-
{
933+
if (!data.empty()) {
846934
LOCK(node.m_sock_mutex);
935+
// There is no socket in case we've already disconnected, or in test cases without
936+
// real connections. In these cases, we bail out immediately and just leave things
937+
// in the send queue and transport.
847938
if (!node.m_sock) {
848939
break;
849940
}
850941
int flags = MSG_NOSIGNAL | MSG_DONTWAIT;
851942
#ifdef MSG_MORE
852-
if (it + 1 != node.vSendMsg.end()) {
943+
// We have more to send if either the transport itself has more, or if we have more
944+
// messages to send.
945+
if (more || it != node.vSendMsg.end()) {
853946
flags |= MSG_MORE;
854947
}
855948
#endif
856-
nBytes = node.m_sock->Send(reinterpret_cast<const char*>(data.data()) + node.nSendOffset, data.size() - node.nSendOffset, flags);
949+
nBytes = node.m_sock->Send(reinterpret_cast<const char*>(data.data()), data.size(), flags);
857950
}
858951
if (nBytes > 0) {
859952
node.m_last_send = GetTime<std::chrono::seconds>();
860953
node.nSendBytes += nBytes;
861-
node.nSendOffset += nBytes;
954+
// Notify transport that bytes have been processed.
955+
node.m_transport->MarkBytesSent(nBytes);
956+
// Update statistics per message type.
957+
node.AccountForSentBytes(msg_type, nBytes);
862958
nSentSize += nBytes;
863-
if (node.nSendOffset == data.size()) {
864-
node.nSendOffset = 0;
865-
node.nSendSize -= data.size();
866-
node.fPauseSend = node.nSendSize > nSendBufferMaxSize;
867-
it++;
868-
} else {
959+
if ((size_t)nBytes != data.size()) {
869960
// could not send full message; stop sending more
870961
break;
871962
}
@@ -878,17 +969,17 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
878969
node.CloseSocketDisconnect();
879970
}
880971
}
881-
// couldn't send anything at all
882972
break;
883973
}
884974
}
885975

976+
node.fPauseSend = node.m_send_memusage + node.m_transport->GetSendMemoryUsage() > nSendBufferMaxSize;
977+
886978
if (it == node.vSendMsg.end()) {
887-
assert(node.nSendOffset == 0);
888-
assert(node.nSendSize == 0);
979+
assert(node.m_send_memusage == 0);
889980
}
890981
node.vSendMsg.erase(node.vSendMsg.begin(), it);
891-
return {nSentSize, !node.vSendMsg.empty()};
982+
return {nSentSize, data_left};
892983
}
893984

894985
/** Try to find a connection to evict when the node is full.
@@ -1227,7 +1318,14 @@ Sock::EventsPerSock CConnman::GenerateWaitSockets(Span<CNode* const> nodes)
12271318

12281319
for (CNode* pnode : nodes) {
12291320
bool select_recv = !pnode->fPauseRecv;
1230-
bool select_send = WITH_LOCK(pnode->cs_vSend, return !pnode->vSendMsg.empty());
1321+
bool select_send;
1322+
{
1323+
LOCK(pnode->cs_vSend);
1324+
// Sending is possible if either there are bytes to send right now, or if there will be
1325+
// once a potential message from vSendMsg is handed to the transport.
1326+
const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend();
1327+
select_send = !to_send.empty() || !pnode->vSendMsg.empty();
1328+
}
12311329
if (!select_recv && !select_send) continue;
12321330

12331331
LOCK(pnode->m_sock_mutex);
@@ -2822,8 +2920,7 @@ CNode::CNode(NodeId idIn,
28222920
ConnectionType conn_type_in,
28232921
bool inbound_onion,
28242922
CNodeOptions&& node_opts)
2825-
: m_deserializer{std::make_unique<V1TransportDeserializer>(V1TransportDeserializer(Params(), idIn, SER_NETWORK, INIT_PROTO_VERSION))},
2826-
m_serializer{std::make_unique<V1TransportSerializer>(V1TransportSerializer())},
2923+
: m_transport{std::make_unique<V1Transport>(idIn, SER_NETWORK, INIT_PROTO_VERSION)},
28272924
m_permission_flags{node_opts.permission_flags},
28282925
m_sock{sock},
28292926
m_connected{GetTime<std::chrono::seconds>()},
@@ -2906,27 +3003,24 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
29063003
msg.data.data()
29073004
);
29083005

2909-
// make sure we use the appropriate network transport format
2910-
std::vector<unsigned char> serializedHeader;
2911-
pnode->m_serializer->prepareForTransport(msg, serializedHeader);
2912-
size_t nTotalSize = nMessageSize + serializedHeader.size();
2913-
29143006
size_t nBytesSent = 0;
29153007
{
29163008
LOCK(pnode->cs_vSend);
2917-
bool optimisticSend(pnode->vSendMsg.empty());
3009+
const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend();
3010+
const bool queue_was_empty{to_send.empty() && pnode->vSendMsg.empty()};
29183011

2919-
//log total amount of bytes per message type
2920-
pnode->AccountForSentBytes(msg.m_type, nTotalSize);
2921-
pnode->nSendSize += nTotalSize;
3012+
// Update memory usage of send buffer.
3013+
pnode->m_send_memusage += msg.GetMemoryUsage();
3014+
if (pnode->m_send_memusage + pnode->m_transport->GetSendMemoryUsage() > nSendBufferMaxSize) pnode->fPauseSend = true;
3015+
// Move message to vSendMsg queue.
3016+
pnode->vSendMsg.push_back(std::move(msg));
29223017

2923-
if (pnode->nSendSize > nSendBufferMaxSize) pnode->fPauseSend = true;
2924-
pnode->vSendMsg.push_back(std::move(serializedHeader));
2925-
if (nMessageSize) pnode->vSendMsg.push_back(std::move(msg.data));
2926-
2927-
// If write queue empty, attempt "optimistic write"
2928-
bool data_left;
2929-
if (optimisticSend) std::tie(nBytesSent, data_left) = SocketSendData(*pnode);
3018+
// If there was nothing to send before, attempt "optimistic write":
3019+
// because the poll/select loop may pause for SELECT_TIMEOUT_MILLISECONDS before actually
3020+
// doing a send, try sending from the calling thread if the queue was empty before.
3021+
if (queue_was_empty) {
3022+
std::tie(nBytesSent, std::ignore) = SocketSendData(*pnode);
3023+
}
29303024
}
29313025
if (nBytesSent) RecordBytesSent(nBytesSent);
29323026
}

0 commit comments

Comments
 (0)