Skip to content
Closed

. #33976

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 20 additions & 26 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3060,26 +3060,12 @@ void CConnman::ThreadMessageHandler()
bool fMoreWork = false;

{
// Randomize the order in which we process messages from/to our peers.
// This prevents attacks in which an attacker exploits having multiple
// consecutive connections in the m_nodes list.
const NodesSnapshot snap{*this, /*shuffle=*/true};

for (CNode* pnode : snap.Nodes()) {
if (pnode->fDisconnect)
continue;

// Receive messages
bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode, flagInterruptMsgProc);
fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
if (flagInterruptMsgProc)
return;
// Send messages
m_msgproc->SendMessages(pnode);

if (flagInterruptMsgProc)
return;
}
// Delegate message processing to PeerManager, which now iterates over Peers
// instead of CNodes. This is Phase 2 of the net split refactoring.
fMoreWork = m_msgproc->ProcessAllPeers(flagInterruptMsgProc, [this](NodeId id) -> NodeInterface* {
LOCK(m_nodes_mutex);
return GetNode(id);
});
}

WAIT_LOCK(mutexMsgProc, lock);
Expand Down Expand Up @@ -3939,17 +3925,25 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
if (nBytesSent) RecordBytesSent(nBytesSent);
}

CNode* CConnman::GetNode(NodeId id) const
{
for (auto&& pnode : m_nodes) {
if (pnode->GetId() == id) {
return pnode;
}
}
return nullptr;
}

bool CConnman::ForNode(NodeId id, std::function<bool(CNode* pnode)> func)
{
CNode* found = nullptr;
LOCK(m_nodes_mutex);
for (auto&& pnode : m_nodes) {
if(pnode->GetId() == id) {
found = pnode;
break;
}
found = GetNode(id);
if (found && NodeFullyConnected(found)) {
return func(found);
}
return found != nullptr && NodeFullyConnected(found) && func(found);
return false;
}

CSipHasher CConnman::GetDeterministicRandomizer(uint64_t id) const
Expand Down
46 changes: 40 additions & 6 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ struct CNodeOptions
};

/** Information about a peer */
class CNode
class CNode : public NodeInterface
{
public:
/** Transport serializer/deserializer. The receive side functions are only called under cs_vRecv, while
Expand Down Expand Up @@ -953,7 +953,13 @@ class CNode

void CopyStats(CNodeStats& stats) EXCLUSIVE_LOCKS_REQUIRED(!m_subver_mutex, !m_addr_local_mutex, !cs_vSend, !cs_vRecv);

std::string ConnectionTypeAsString() const { return ::ConnectionTypeAsString(m_conn_type); }
// NodeInterface implementation
std::optional<std::pair<CNetMessage, bool>> PollMessage() override
EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex);
const CAddress& GetAddress() const override { return addr; }
const std::string& GetAddrName() const override { return m_addr_name; }
std::string ConnectionTypeAsString() const override { return ::ConnectionTypeAsString(m_conn_type); }
void PushMessage(CSerializedNetMsg&& msg) override;

/**
* Helper function to optionally log the IP address.
Expand Down Expand Up @@ -1009,6 +1015,32 @@ class CNode
std::unique_ptr<i2p::sam::Session> m_i2p_sam_session GUARDED_BY(m_sock_mutex);
};

/**
* Interface for operations on a node that PeerManager needs.
* This abstracts away CNode implementation details.
*/
class NodeInterface
{
public:
virtual ~NodeInterface() = default;

/** Poll the next message from the processing queue of this connection. */
virtual std::optional<std::pair<CNetMessage, bool>> PollMessage()
EXCLUSIVE_LOCKS_REQUIRED(!m_msg_process_queue_mutex) = 0;

/** Get the node's address. */
virtual const CAddress& GetAddress() const = 0;

/** Get the node's address name. */
virtual const std::string& GetAddrName() const = 0;

/** Get the connection type as a string. */
virtual std::string ConnectionTypeAsString() const = 0;

/** Send a message to this node. */
virtual void PushMessage(CSerializedNetMsg&& msg) = 0;
};

/**
* Interface for message handling
*/
Expand All @@ -1033,19 +1065,19 @@ class NetEventsInterface
/**
* Process protocol messages received from a given node
*
* @param[in] pnode The node which we have received messages from.
* @param[in] node_id The node id which we have received messages from.
* @param[in] interrupt Interrupt condition for processing threads
* @return True if there is more work to be done
*/
virtual bool ProcessMessages(CNode* pnode, std::atomic<bool>& interrupt) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) = 0;
virtual bool ProcessMessages(NodeId node_id, std::atomic<bool>& interrupt) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) = 0;

/**
* Send queued protocol messages to a given node.
*
* @param[in] pnode The node which we are sending messages to.
* @param[in] node_id The node id which we are sending messages to.
* @return True if there is more work to be done
*/
virtual bool SendMessages(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) = 0;
virtual bool SendMessages(NodeId node_id) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex) = 0;


protected:
Expand Down Expand Up @@ -1178,6 +1210,8 @@ class CConnman
RecursiveMutex& GetNodesMutex() const LOCK_RETURNED(m_nodes_mutex);

bool ForNode(NodeId id, std::function<bool(CNode* pnode)> func);
/** Get CNode pointer from NodeId. Returns nullptr if not found. */
CNode* GetNode(NodeId id) const EXCLUSIVE_LOCKS_REQUIRED(m_nodes_mutex);

void PushMessage(CNode* pnode, CSerializedNetMsg&& msg) EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex);

Expand Down
Loading