Skip to content

Commit

Permalink
net: add have_next_message argument to Transport::GetBytesToSend()
Browse files Browse the repository at this point in the history
Before this commit, there are only two possibly outcomes for the "more" prediction
in Transport::GetBytesToSend():
* true: the transport itself has more to send, so the answer is certainly yes.
* false: the transport has nothing further to send, but if vSendMsg has more message(s)
         left, that still will result in more wire bytes after the next
         SetMessageToSend().

For the BIP324 v2 transport, there will arguably be a third state:
* definitely not: the transport has nothing further to send, but even if vSendMsg has
                  more messages left, they can't be sent (right now). This happens
                  before the handshake is complete.

To implement this, we move the entire decision logic to the Transport, by adding a
boolean to GetBytesToSend(), called have_next_message, which informs the transport
whether more messages are available. The return values are still true and false, but
they mean "definitely yes" and "definitely no", rather than "yes" and "maybe".
  • Loading branch information
sipa committed Sep 7, 2023
1 parent 8e0d979 commit c3fad1f
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 37 deletions.
43 changes: 28 additions & 15 deletions src/net.cpp
Expand Up @@ -867,20 +867,22 @@ bool V1Transport::SetMessageToSend(CSerializedNetMsg& msg) noexcept
return true;
}

Transport::BytesToSend V1Transport::GetBytesToSend() const noexcept
Transport::BytesToSend V1Transport::GetBytesToSend(bool have_next_message) const noexcept
{
AssertLockNotHeld(m_send_mutex);
LOCK(m_send_mutex);
if (m_sending_header) {
return {Span{m_header_to_send}.subspan(m_bytes_sent),
// We have more to send after the header if the message has payload.
!m_message_to_send.data.empty(),
// We have more to send after the header if the message has payload, or if there
// is a next message after that.
have_next_message || !m_message_to_send.data.empty(),
m_message_to_send.m_type
};
} else {
return {Span{m_message_to_send.data}.subspan(m_bytes_sent),
// We never have more to send after this message's payload.
false,
// We only have more to send after this message's payload if there is another
// message.
have_next_message,
m_message_to_send.m_type
};
}
Expand Down Expand Up @@ -916,6 +918,7 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
auto it = node.vSendMsg.begin();
size_t nSentSize = 0;
bool data_left{false}; //!< second return value (whether unsent data remains)
std::optional<bool> expected_more;

while (true) {
if (it != node.vSendMsg.end()) {
Expand All @@ -928,7 +931,12 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
++it;
}
}
const auto& [data, more, msg_type] = node.m_transport->GetBytesToSend();
const auto& [data, more, msg_type] = node.m_transport->GetBytesToSend(it != node.vSendMsg.end());
// We rely on the 'more' value returned by GetBytesToSend to correctly predict whether more
// bytes are still to be sent, to correctly set the MSG_MORE flag. As a sanity check,
// verify that the previously returned 'more' was correct.
if (expected_more.has_value()) Assume(!data.empty() == *expected_more);
expected_more = more;
data_left = !data.empty(); // will be overwritten on next loop if all of data gets sent
int nBytes = 0;
if (!data.empty()) {
Expand All @@ -941,9 +949,7 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
}
int flags = MSG_NOSIGNAL | MSG_DONTWAIT;
#ifdef MSG_MORE
// We have more to send if either the transport itself has more, or if we have more
// messages to send.
if (more || it != node.vSendMsg.end()) {
if (more) {
flags |= MSG_MORE;
}
#endif
Expand Down Expand Up @@ -1323,9 +1329,10 @@ Sock::EventsPerSock CConnman::GenerateWaitSockets(Span<CNode* const> nodes)
{
LOCK(pnode->cs_vSend);
// Sending is possible if either there are bytes to send right now, or if there will be
// once a potential message from vSendMsg is handed to the transport.
const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend();
select_send = !to_send.empty() || !pnode->vSendMsg.empty();
// once a potential message from vSendMsg is handed to the transport. GetBytesToSend
// determines both of these in a single call.
const auto& [to_send, more, _msg_type] = pnode->m_transport->GetBytesToSend(!pnode->vSendMsg.empty());
select_send = !to_send.empty() || more;
}
if (!select_recv && !select_send) continue;

Expand Down Expand Up @@ -3007,7 +3014,10 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
size_t nBytesSent = 0;
{
LOCK(pnode->cs_vSend);
const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend();
// Check if the transport still has unsent bytes, and indicate to it that we're about to
// give it a message to send.
const auto& [to_send, more, _msg_type] =
pnode->m_transport->GetBytesToSend(/*have_next_message=*/true);
const bool queue_was_empty{to_send.empty() && pnode->vSendMsg.empty()};

// Update memory usage of send buffer.
Expand All @@ -3016,10 +3026,13 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
// Move message to vSendMsg queue.
pnode->vSendMsg.push_back(std::move(msg));

// If there was nothing to send before, attempt "optimistic write":
// If there was nothing to send before, and there is now (predicted by the "more" value
// returned by the GetBytesToSend call above), attempt "optimistic write":
// because the poll/select loop may pause for SELECT_TIMEOUT_MILLISECONDS before actually
// doing a send, try sending from the calling thread if the queue was empty before.
if (queue_was_empty) {
// With a V1Transport, more will always be true here, because adding a message always
// results in sendable bytes there.
if (queue_was_empty && more) {
std::tie(nBytesSent, std::ignore) = SocketSendData(*pnode);
}
}
Expand Down
37 changes: 29 additions & 8 deletions src/net.h
Expand Up @@ -308,19 +308,40 @@ class Transport {
const std::string& /*m_type*/
>;

/** Get bytes to send on the wire.
/** Get bytes to send on the wire, if any, along with other information about it.
*
* As a const function, it does not modify the transport's observable state, and is thus safe
* to be called multiple times.
*
* The bytes returned by this function act as a stream which can only be appended to. This
* means that with the exception of MarkBytesSent, operations on the transport can only append
* to what is being returned.
* @param[in] have_next_message If true, the "more" return value reports whether more will
* be sendable after a SetMessageToSend call. It is set by the caller when they know
* they have another message ready to send, and only care about what happens
* after that. The have_next_message argument only affects this "more" return value
* and nothing else.
*
* Note that m_type and to_send refer to data that is internal to the transport, and calling
* any non-const function on this object may invalidate them.
* Effectively, there are three possible outcomes about whether there are more bytes
* to send:
* - Yes: the transport itself has more bytes to send later. For example, for
* V1Transport this happens during the sending of the header of a
* message, when there is a non-empty payload that follows.
* - No: the transport itself has no more bytes to send, but will have bytes to
* send if handed a message through SetMessageToSend. In V1Transport this
* happens when sending the payload of a message.
* - Blocked: the transport itself has no more bytes to send, and is also incapable
* of sending anything more at all now, if it were handed another
* message to send.
*
* The boolean 'more' is true for Yes, false for Blocked, and have_next_message
* controls what is returned for No.
*
* @return a BytesToSend object. The to_send member returned acts as a stream which is only
* ever appended to. This means that with the exception of MarkBytesSent (which pops
* bytes off the front of later to_sends), operations on the transport can only append
* to what is being returned. Also note that m_type and to_send refer to data that is
* internal to the transport, and calling any non-const function on this object may
* invalidate them.
*/
virtual BytesToSend GetBytesToSend() const noexcept = 0;
virtual BytesToSend GetBytesToSend(bool have_next_message) const noexcept = 0;

/** Report how many bytes returned by the last GetBytesToSend() have been sent.
*
Expand Down Expand Up @@ -416,7 +437,7 @@ class V1Transport final : public Transport
CNetMessage GetReceivedMessage(std::chrono::microseconds time, bool& reject_message) override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex);

bool SetMessageToSend(CSerializedNetMsg& msg) noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
BytesToSend GetBytesToSend() const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
BytesToSend GetBytesToSend(bool have_next_message) const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
void MarkBytesSent(size_t bytes_sent) noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
size_t GetSendMemoryUsage() const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
};
Expand Down
4 changes: 2 additions & 2 deletions src/test/denialofservice_tests.cpp
Expand Up @@ -86,7 +86,7 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction)

{
LOCK(dummyNode1.cs_vSend);
const auto& [to_send, _more, _msg_type] = dummyNode1.m_transport->GetBytesToSend();
const auto& [to_send, _more, _msg_type] = dummyNode1.m_transport->GetBytesToSend(false);
BOOST_CHECK(!to_send.empty());
}
connman.FlushSendBuffer(dummyNode1);
Expand All @@ -97,7 +97,7 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction)
BOOST_CHECK(peerman.SendMessages(&dummyNode1)); // should result in getheaders
{
LOCK(dummyNode1.cs_vSend);
const auto& [to_send, _more, _msg_type] = dummyNode1.m_transport->GetBytesToSend();
const auto& [to_send, _more, _msg_type] = dummyNode1.m_transport->GetBytesToSend(false);
BOOST_CHECK(!to_send.empty());
}
// Wait 3 more minutes
Expand Down
34 changes: 24 additions & 10 deletions src/test/fuzz/p2p_transport_serialization.cpp
Expand Up @@ -92,7 +92,7 @@ FUZZ_TARGET(p2p_transport_serialization, .init = initialize_p2p_transport_serial
assert(queued);
std::optional<bool> known_more;
while (true) {
const auto& [to_send, more, _msg_type] = send_transport.GetBytesToSend();
const auto& [to_send, more, _msg_type] = send_transport.GetBytesToSend(false);
if (known_more) assert(!to_send.empty() == *known_more);
if (to_send.empty()) break;
send_transport.MarkBytesSent(to_send.size());
Expand Down Expand Up @@ -124,11 +124,13 @@ void SimulationTest(Transport& initiator, Transport& responder, R& rng, FuzzedDa
// Vectors with bytes last returned by GetBytesToSend() on transport[i].
std::array<std::vector<uint8_t>, 2> to_send;

// Last returned 'more' values (if still relevant) by transport[i]->GetBytesToSend().
std::array<std::optional<bool>, 2> last_more;
// Last returned 'more' values (if still relevant) by transport[i]->GetBytesToSend(), for
// both have_next_message false and true.
std::array<std::optional<bool>, 2> last_more, last_more_next;

// Whether more bytes to be sent are expected on transport[i].
std::array<std::optional<bool>, 2> expect_more;
// Whether more bytes to be sent are expected on transport[i], before and after
// SetMessageToSend().
std::array<std::optional<bool>, 2> expect_more, expect_more_next;

// Function to consume a message type.
auto msg_type_fn = [&]() {
Expand Down Expand Up @@ -177,18 +179,27 @@ void SimulationTest(Transport& initiator, Transport& responder, R& rng, FuzzedDa

// Wrapper around transport[i]->GetBytesToSend() that performs sanity checks.
auto bytes_to_send_fn = [&](int side) -> Transport::BytesToSend {
const auto& [bytes, more, msg_type] = transports[side]->GetBytesToSend();
// Invoke GetBytesToSend twice (for have_next_message = {false, true}). This function does
// not modify state (it's const), and only the "more" return value should differ between
// the calls.
const auto& [bytes, more_nonext, msg_type] = transports[side]->GetBytesToSend(false);
const auto& [bytes_next, more_next, msg_type_next] = transports[side]->GetBytesToSend(true);
// Compare with expected more.
if (expect_more[side].has_value()) assert(!bytes.empty() == *expect_more[side]);
// Verify consistency between the two results.
assert(bytes == bytes_next);
assert(msg_type == msg_type_next);
if (more_nonext) assert(more_next);
// Compare with previously reported output.
assert(to_send[side].size() <= bytes.size());
assert(to_send[side] == Span{bytes}.first(to_send[side].size()));
to_send[side].resize(bytes.size());
std::copy(bytes.begin(), bytes.end(), to_send[side].begin());
// Remember 'more' result.
last_more[side] = {more};
// Remember 'more' results.
last_more[side] = {more_nonext};
last_more_next[side] = {more_next};
// Return.
return {bytes, more, msg_type};
return {bytes, more_nonext, msg_type};
};

// Function to make side send a new message.
Expand All @@ -199,7 +210,8 @@ void SimulationTest(Transport& initiator, Transport& responder, R& rng, FuzzedDa
CSerializedNetMsg msg = next_msg[side].Copy();
bool queued = transports[side]->SetMessageToSend(msg);
// Update expected more data.
expect_more[side] = std::nullopt;
expect_more[side] = expect_more_next[side];
expect_more_next[side] = std::nullopt;
// Verify consistency of GetBytesToSend after SetMessageToSend
bytes_to_send_fn(/*side=*/side);
if (queued) {
Expand All @@ -223,6 +235,7 @@ void SimulationTest(Transport& initiator, Transport& responder, R& rng, FuzzedDa
// If all to-be-sent bytes were sent, move last_more data to expect_more data.
if (send_now == bytes.size()) {
expect_more[side] = last_more[side];
expect_more_next[side] = last_more_next[side];
}
// Remove the bytes from the last reported to-be-sent vector.
assert(to_send[side].size() >= send_now);
Expand Down Expand Up @@ -251,6 +264,7 @@ void SimulationTest(Transport& initiator, Transport& responder, R& rng, FuzzedDa
// Clear cached expected 'more' information: if certainly no more data was to be sent
// before, receiving bytes makes this uncertain.
if (expect_more[!side] == false) expect_more[!side] = std::nullopt;
if (expect_more_next[!side] == false) expect_more_next[!side] = std::nullopt;
// Verify consistency of GetBytesToSend after ReceivedBytes
bytes_to_send_fn(/*side=*/!side);
bool progress = to_recv.size() < old_len;
Expand Down
4 changes: 2 additions & 2 deletions src/test/util/net.cpp
Expand Up @@ -78,7 +78,7 @@ void ConnmanTestMsg::FlushSendBuffer(CNode& node) const
node.vSendMsg.clear();
node.m_send_memusage = 0;
while (true) {
const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend();
const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(false);
if (to_send.empty()) break;
node.m_transport->MarkBytesSent(to_send.size());
}
Expand All @@ -90,7 +90,7 @@ bool ConnmanTestMsg::ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) co
assert(queued);
bool complete{false};
while (true) {
const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend();
const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(false);
if (to_send.empty()) break;
NodeReceiveMsgBytes(node, to_send, complete);
node.m_transport->MarkBytesSent(to_send.size());
Expand Down

0 comments on commit c3fad1f

Please sign in to comment.