Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 56 additions & 8 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4652,31 +4652,79 @@ void CNode::MarkReceivedMsgsForProcessing()
{
AssertLockNotHeld(m_msg_process_queue_mutex);

size_t nSizeAdded = 0;
for (const auto& msg : vRecvMsg) {
size_t nQuorumSizeAdded = 0;
Copy link
Collaborator

@knst knst Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/nQuorumSizeAdded/quorum_size_added/`

Code-style recommends to use snake_case for local variables

size_t nNormalSizeAdded = 0;
std::list<CNetMessage> quorumMsgs;
std::list<CNetMessage> normalMsgs;

// Classify messages into quorum-priority and normal queues
for (auto it = vRecvMsg.begin(); it != vRecvMsg.end();) {
auto& msg = *it;
// vRecvMsg contains only completed CNetMessage
// the single possible partially deserialized message are held by TransportDeserializer
nSizeAdded += msg.m_raw_message_size;
if (IsQuorumPriorityMessage(msg.m_type)) {
quorumMsgs.splice(quorumMsgs.end(), vRecvMsg, it++);
nQuorumSizeAdded += msg.m_raw_message_size;
} else {
normalMsgs.splice(normalMsgs.end(), vRecvMsg, it++);
nNormalSizeAdded += msg.m_raw_message_size;
}
}

LOCK(m_msg_process_queue_mutex);
m_msg_process_queue.splice(m_msg_process_queue.end(), vRecvMsg);
m_msg_process_queue_size += nSizeAdded;
fPauseRecv = m_msg_process_queue_size > m_recv_flood_size;
// Splice classified messages into appropriate queues
m_msg_quorum_queue.splice(m_msg_quorum_queue.end(), quorumMsgs);
m_msg_quorum_queue_size += nQuorumSizeAdded;
m_msg_process_queue.splice(m_msg_process_queue.end(), normalMsgs);
m_msg_process_queue_size += nNormalSizeAdded;
// Compute backpressure over combined size of both queues
fPauseRecv = (m_msg_quorum_queue_size + m_msg_process_queue_size) > m_recv_flood_size;
}

std::optional<std::pair<CNetMessage, bool>> CNode::PollMessage()
{
LOCK(m_msg_process_queue_mutex);

// Ratio-based processing: process N quorum messages for every 1 normal message
// This ensures forward progress for both queues while strongly prioritizing quorum messages
// However, if normal queue is empty, process quorum messages in bursts (like old algorithm)
constexpr size_t QUORUM_TO_NORMAL_RATIO = 100;

// Check if we should process normal queue for forward progress
// Only apply ratio when both queues have messages to allow burst processing when normal queue is empty
bool skip_quorum_processing = !m_msg_process_queue.empty() &&
m_quorum_msg_count_since_normal >= QUORUM_TO_NORMAL_RATIO;

// Prioritize quorum queue: pop from it first if non-empty and ratio not reached
// If normal queue is empty, process quorum messages without ratio limit (burst mode)
if (!m_msg_quorum_queue.empty() && !skip_quorum_processing) {
std::list<CNetMessage> msgs;
// Just take one message from quorum queue
msgs.splice(msgs.begin(), m_msg_quorum_queue, m_msg_quorum_queue.begin());
m_msg_quorum_queue_size -= msgs.front().m_raw_message_size;
// Only increment counter if normal queue has messages (to track ratio)
// If normal queue is empty, don't increment so we can process bursts quickly
if (!m_msg_process_queue.empty()) {
++m_quorum_msg_count_since_normal;
}
// Compute backpressure over combined size of both queues
fPauseRecv = (m_msg_quorum_queue_size + m_msg_process_queue_size) > m_recv_flood_size;
// Return true for 'more' if either queue has remaining messages
return std::make_pair(std::move(msgs.front()), !m_msg_quorum_queue.empty() || !m_msg_process_queue.empty());
}

// Process normal queue (either because quorum queue is empty or ratio reached)
if (m_msg_process_queue.empty()) return std::nullopt;

std::list<CNetMessage> msgs;
// Just take one message
msgs.splice(msgs.begin(), m_msg_process_queue, m_msg_process_queue.begin());
m_msg_process_queue_size -= msgs.front().m_raw_message_size;
fPauseRecv = m_msg_process_queue_size > m_recv_flood_size;
m_quorum_msg_count_since_normal = 0; // Reset counter after processing normal message
// Compute backpressure over combined size of both queues
fPauseRecv = (m_msg_quorum_queue_size + m_msg_process_queue_size) > m_recv_flood_size;

return std::make_pair(std::move(msgs.front()), !m_msg_process_queue.empty());
return std::make_pair(std::move(msgs.front()), !m_msg_quorum_queue.empty() || !m_msg_process_queue.empty());
}

bool CConnman::NodeFullyConnected(const CNode* pnode)
Expand Down
23 changes: 23 additions & 0 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,9 @@ class CNode
Mutex m_msg_process_queue_mutex;
std::list<CNetMessage> m_msg_process_queue GUARDED_BY(m_msg_process_queue_mutex);
size_t m_msg_process_queue_size GUARDED_BY(m_msg_process_queue_mutex){0};
std::list<CNetMessage> m_msg_quorum_queue GUARDED_BY(m_msg_process_queue_mutex);
size_t m_msg_quorum_queue_size GUARDED_BY(m_msg_process_queue_mutex){0};
size_t m_quorum_msg_count_since_normal GUARDED_BY(m_msg_process_queue_mutex){0};

// Our address, as reported by the peer
CService addrLocal GUARDED_BY(m_addr_local_mutex);
Expand Down Expand Up @@ -2002,4 +2005,24 @@ class CExplicitNetCleanup
static void callCleanup();
};

// Helper function to determine if a message type should be prioritized in the quorum queue
inline bool IsQuorumPriorityMessage(const std::string& msg_type)
{
// LLMQ signing/data messages
if (msg_type == NetMsgType::QSIGSHARE ||
msg_type == NetMsgType::QBSIGSHARES ||
msg_type == NetMsgType::QSIGSHARESINV ||
msg_type == NetMsgType::QGETSIGSHARES ||
msg_type == NetMsgType::QSIGSESANN ||
msg_type == NetMsgType::QSIGREC) {
return true;
}
// High-level lock messages (ChainLocks, InstantSend locks)
if (msg_type == NetMsgType::CLSIG ||
msg_type == NetMsgType::ISDLOCK) {
return true;
}
return false;
}

#endif // BITCOIN_NET_H
Loading