diff --git a/src/net.cpp b/src/net.cpp index 7c120ff66833b..34773354ec885 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -4652,31 +4652,79 @@ void CNode::MarkReceivedMsgsForProcessing() { AssertLockNotHeld(m_msg_process_queue_mutex); - size_t nSizeAdded = 0; - for (const auto& msg : vRecvMsg) { + size_t nQuorumSizeAdded = 0; + size_t nNormalSizeAdded = 0; + std::list quorumMsgs; + std::list normalMsgs; + + // Classify messages into quorum-priority and normal queues + for (auto it = vRecvMsg.begin(); it != vRecvMsg.end();) { + auto& msg = *it; // vRecvMsg contains only completed CNetMessage // the single possible partially deserialized message are held by TransportDeserializer - nSizeAdded += msg.m_raw_message_size; + if (IsQuorumPriorityMessage(msg.m_type)) { + quorumMsgs.splice(quorumMsgs.end(), vRecvMsg, it++); + nQuorumSizeAdded += msg.m_raw_message_size; + } else { + normalMsgs.splice(normalMsgs.end(), vRecvMsg, it++); + nNormalSizeAdded += msg.m_raw_message_size; + } } LOCK(m_msg_process_queue_mutex); - m_msg_process_queue.splice(m_msg_process_queue.end(), vRecvMsg); - m_msg_process_queue_size += nSizeAdded; - fPauseRecv = m_msg_process_queue_size > m_recv_flood_size; + // Splice classified messages into appropriate queues + m_msg_quorum_queue.splice(m_msg_quorum_queue.end(), quorumMsgs); + m_msg_quorum_queue_size += nQuorumSizeAdded; + m_msg_process_queue.splice(m_msg_process_queue.end(), normalMsgs); + m_msg_process_queue_size += nNormalSizeAdded; + // Compute backpressure over combined size of both queues + fPauseRecv = (m_msg_quorum_queue_size + m_msg_process_queue_size) > m_recv_flood_size; } std::optional> CNode::PollMessage() { LOCK(m_msg_process_queue_mutex); + + // Ratio-based processing: process N quorum messages for every 1 normal message + // This ensures forward progress for both queues while strongly prioritizing quorum messages + // However, if normal queue is empty, process quorum messages in bursts (like old algorithm) + constexpr size_t QUORUM_TO_NORMAL_RATIO = 100; + + // Check if we should process normal queue for forward progress + // Only apply ratio when both queues have messages to allow burst processing when normal queue is empty + bool skip_quorum_processing = !m_msg_process_queue.empty() && + m_quorum_msg_count_since_normal >= QUORUM_TO_NORMAL_RATIO; + + // Prioritize quorum queue: pop from it first if non-empty and ratio not reached + // If normal queue is empty, process quorum messages without ratio limit (burst mode) + if (!m_msg_quorum_queue.empty() && !skip_quorum_processing) { + std::list msgs; + // Just take one message from quorum queue + msgs.splice(msgs.begin(), m_msg_quorum_queue, m_msg_quorum_queue.begin()); + m_msg_quorum_queue_size -= msgs.front().m_raw_message_size; + // Only increment counter if normal queue has messages (to track ratio) + // If normal queue is empty, don't increment so we can process bursts quickly + if (!m_msg_process_queue.empty()) { + ++m_quorum_msg_count_since_normal; + } + // Compute backpressure over combined size of both queues + fPauseRecv = (m_msg_quorum_queue_size + m_msg_process_queue_size) > m_recv_flood_size; + // Return true for 'more' if either queue has remaining messages + return std::make_pair(std::move(msgs.front()), !m_msg_quorum_queue.empty() || !m_msg_process_queue.empty()); + } + + // Process normal queue (either because quorum queue is empty or ratio reached) if (m_msg_process_queue.empty()) return std::nullopt; std::list msgs; // Just take one message msgs.splice(msgs.begin(), m_msg_process_queue, m_msg_process_queue.begin()); m_msg_process_queue_size -= msgs.front().m_raw_message_size; - fPauseRecv = m_msg_process_queue_size > m_recv_flood_size; + m_quorum_msg_count_since_normal = 0; // Reset counter after processing normal message + // Compute backpressure over combined size of both queues + fPauseRecv = (m_msg_quorum_queue_size + m_msg_process_queue_size) > m_recv_flood_size; - return std::make_pair(std::move(msgs.front()), !m_msg_process_queue.empty()); + return std::make_pair(std::move(msgs.front()), !m_msg_quorum_queue.empty() || !m_msg_process_queue.empty()); } bool CConnman::NodeFullyConnected(const CNode* pnode) diff --git a/src/net.h b/src/net.h index b2e457fef01d8..f49f93c45449c 100644 --- a/src/net.h +++ b/src/net.h @@ -1094,6 +1094,9 @@ class CNode Mutex m_msg_process_queue_mutex; std::list m_msg_process_queue GUARDED_BY(m_msg_process_queue_mutex); size_t m_msg_process_queue_size GUARDED_BY(m_msg_process_queue_mutex){0}; + std::list m_msg_quorum_queue GUARDED_BY(m_msg_process_queue_mutex); + size_t m_msg_quorum_queue_size GUARDED_BY(m_msg_process_queue_mutex){0}; + size_t m_quorum_msg_count_since_normal GUARDED_BY(m_msg_process_queue_mutex){0}; // Our address, as reported by the peer CService addrLocal GUARDED_BY(m_addr_local_mutex); @@ -2002,4 +2005,24 @@ class CExplicitNetCleanup static void callCleanup(); }; +// Helper function to determine if a message type should be prioritized in the quorum queue +inline bool IsQuorumPriorityMessage(const std::string& msg_type) +{ + // LLMQ signing/data messages + if (msg_type == NetMsgType::QSIGSHARE || + msg_type == NetMsgType::QBSIGSHARES || + msg_type == NetMsgType::QSIGSHARESINV || + msg_type == NetMsgType::QGETSIGSHARES || + msg_type == NetMsgType::QSIGSESANN || + msg_type == NetMsgType::QSIGREC) { + return true; + } + // High-level lock messages (ChainLocks, InstantSend locks) + if (msg_type == NetMsgType::CLSIG || + msg_type == NetMsgType::ISDLOCK) { + return true; + } + return false; +} + #endif // BITCOIN_NET_H