Skip to content

Commit

Permalink
net: move message serialization from PushMessage to SocketSendData
Browse files Browse the repository at this point in the history
This furthers transport abstraction by removing the assumption that a message
can always immediately be converted to wire bytes. This assumption does not hold
for the v2 transport proposed by BIP324, as no messages can be sent before the
handshake completes.

This is not a pure refactor, and has the following effects even for the current
v1 transport:

* Checksum calculation now happens in SocketSendData rather than PushMessage.
  For non-optimistic-send messages, that means this computation now happens in
  the network thread rather than the message handler thread (generally a good
  thing, as the message handler thread is more of a computational bottleneck).
* Checksum calculation now happens while holding the cs_vSend lock. This is
  technically unnecessary for the v1 transport, as messages are encoded
  independent from one another, but is untenable for the v2 transport anyway.
* Statistics updates about per-message sent bytes now happen when those bytes
  are actually handed to the OS, rather than at PushMessage time.
  • Loading branch information
sipa committed Jul 31, 2023
1 parent 1937a5f commit 68e48a0
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 40 deletions.
60 changes: 27 additions & 33 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -897,37 +897,39 @@ size_t CConnman::SocketSendData(CNode& node) const
auto it = node.vSendMsg.begin();
size_t nSentSize = 0;

while (it != node.vSendMsg.end()) {
const auto& data = *it;
assert(data.size() > node.nSendOffset);
while (true) {
bool progress = false;
if (it != node.vSendMsg.end() && node.m_transport->DoneSendingMessage()) {
// If possible, move one message from the send queue to the transport.
node.m_send_memusage -= it->GetMemoryUsage();
node.m_transport->SetMessageToSend(std::move(*it));
++it;
progress = true;
}
const auto& [data, more, msg_type] = node.m_transport->GetBytesToSend();
int nBytes = 0;
{
if (!data.empty()) {
LOCK(node.m_sock_mutex);
if (!node.m_sock) {
break;
}
int flags = MSG_NOSIGNAL | MSG_DONTWAIT;
#ifdef MSG_MORE
if (it + 1 != node.vSendMsg.end()) {
if (more || it != node.vSendMsg.end()) {
flags |= MSG_MORE;
}
#endif
nBytes = node.m_sock->Send(reinterpret_cast<const char*>(data.data()) + node.nSendOffset, data.size() - node.nSendOffset, flags);
nBytes = node.m_sock->Send(reinterpret_cast<const char*>(data.data()), data.size(), flags);
}
if (nBytes > 0) {
node.m_last_send = GetTime<std::chrono::seconds>();
node.nSendBytes += nBytes;
node.nSendOffset += nBytes;
node.m_transport->MarkBytesSent(nBytes);
node.AccountForSentBytes(msg_type, nBytes);
nSentSize += nBytes;
if (node.nSendOffset == data.size()) {
node.nSendOffset = 0;
node.m_send_memusage -= sizeof(data) + memusage::DynamicUsage(data);
it++;
} else {
// could not send full message; stop sending more
break;
}
} else {
progress = true;
}
if (nBytes < (ssize_t)data.size()) {
if (nBytes < 0) {
// error
int nErr = WSAGetLastError();
Expand All @@ -936,15 +938,16 @@ size_t CConnman::SocketSendData(CNode& node) const
node.CloseSocketDisconnect();
}
}
// couldn't send anything at all
// could not send full buffer (or nothing at all); stop sending more
break;
}
// If no progress was made (no bytes sent, no message moved to transport), stop.
if (!progress) break;
}

node.fPauseSend = node.m_send_memusage + node.m_transport->GetSendMemoryUsage() > nSendBufferMaxSize;

if (it == node.vSendMsg.end()) {
assert(node.nSendOffset == 0);
assert(node.m_send_memusage == 0);
}
node.vSendMsg.erase(node.vSendMsg.begin(), it);
Expand Down Expand Up @@ -1298,7 +1301,9 @@ Sock::EventsPerSock CConnman::GenerateWaitSockets(Span<CNode* const> nodes)
bool select_send;
{
LOCK(pnode->cs_vSend);
select_send = !pnode->vSendMsg.empty();
// This relies on optimistic send to make sure the transport always has a message to
// send if there are any.
select_send = pnode->m_transport->HaveBytesToSend();
}

LOCK(pnode->m_sock_mutex);
Expand Down Expand Up @@ -2931,23 +2936,12 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
size_t nBytesSent = 0;
{
LOCK(pnode->cs_vSend);
bool optimisticSend(pnode->vSendMsg.empty());

assert(pnode->m_transport->DoneSendingMessage());
pnode->m_transport->SetMessageToSend(std::move(msg));
bool optimisticSend{pnode->vSendMsg.empty() && pnode->m_transport->DoneSendingMessage()};

while (true) {
const auto& [bytes, more, msg_type] = pnode->m_transport->GetBytesToSend();
if (bytes.empty()) break;
pnode->AccountForSentBytes(msg_type, bytes.size());
pnode->vSendMsg.push_back({bytes.begin(), bytes.end()});
pnode->m_send_memusage += sizeof(pnode->vSendMsg.back()) + memusage::DynamicUsage(pnode->vSendMsg.back());
pnode->m_transport->MarkBytesSent(bytes.size());
}
pnode->m_send_memusage += msg.GetMemoryUsage();
pnode->vSendMsg.push_back(std::move(msg));
if (pnode->m_send_memusage + pnode->m_transport->GetSendMemoryUsage() > nSendBufferMaxSize) pnode->fPauseSend = true;

assert(pnode->m_transport->DoneSendingMessage());

// If write queue empty, attempt "optimistic write"
if (optimisticSend) nBytesSent = SocketSendData(*pnode);
}
Expand Down
9 changes: 4 additions & 5 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -417,13 +417,12 @@ class CNode
*/
std::shared_ptr<Sock> m_sock GUARDED_BY(m_sock_mutex);

/** Total memory usage of vSendMsg (counting the vectors and their dynamic usage, but not the
* deque overhead). */
/** Sum of GetMemoryUsage of all vSendMsg entries. */
size_t m_send_memusage GUARDED_BY(cs_vSend){0};
/** Offset inside the first vSendMsg already sent */
size_t nSendOffset GUARDED_BY(cs_vSend){0};
/** Total number of bytes sent on the wire to this peer. */
uint64_t nSendBytes GUARDED_BY(cs_vSend){0};
std::deque<std::vector<unsigned char>> vSendMsg GUARDED_BY(cs_vSend);
/** Messages still to be fed to m_transport->SetMessageToSend. */
std::deque<CSerializedNetMsg> vSendMsg GUARDED_BY(cs_vSend);
Mutex cs_vSend;
Mutex m_sock_mutex;
Mutex cs_vRecv;
Expand Down
11 changes: 9 additions & 2 deletions src/test/denialofservice_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,15 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction)

{
LOCK(dummyNode1.cs_vSend);
BOOST_CHECK(dummyNode1.vSendMsg.size() > 0);
BOOST_CHECK(!dummyNode1.vSendMsg.empty() || dummyNode1.m_transport->HaveBytesToSend());
// Clear messages in vSendMsg.
dummyNode1.vSendMsg.clear();
dummyNode1.m_send_memusage = 0;
// Clear data from m_transport.
while (dummyNode1.m_transport->HaveBytesToSend()) {
const auto& [data, _more, _msg_type] = dummyNode1.m_transport->GetBytesToSend();
dummyNode1.m_transport->MarkBytesSent(data.size());
}
}

int64_t nStartTime = GetTime();
Expand All @@ -97,7 +104,7 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction)
BOOST_CHECK(peerman.SendMessages(&dummyNode1)); // should result in getheaders
{
LOCK(dummyNode1.cs_vSend);
BOOST_CHECK(dummyNode1.vSendMsg.size() > 0);
BOOST_CHECK(!dummyNode1.vSendMsg.empty() || dummyNode1.m_transport->HaveBytesToSend());
}
// Wait 3 more minutes
SetMockTime(nStartTime+24*60);
Expand Down
6 changes: 6 additions & 0 deletions src/test/util/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_by

bool ConnmanTestMsg::ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) const
{
/* Flush out any unsent bytes from previous messages. */
while (node.m_transport->HaveBytesToSend()) {
const auto& [data, _more, _msg_type] = node.m_transport->GetBytesToSend();
node.m_transport->MarkBytesSent(data.size());
}

assert(node.m_transport->DoneSendingMessage());
node.m_transport->SetMessageToSend(std::move(ser_msg));

Expand Down

0 comments on commit 68e48a0

Please sign in to comment.