Skip to content

Commit 432a62c

Browse files
committed
net: reconnect with V1Transport under certain conditions
When an outbound v2 connection is disconnected without receiving anything, but at least 24 bytes of our pubkey were sent out (enough to constitute an invalid v1 header), add them to a queue of reconnections to be tried. The reconnections are in a queue rather than performed immediately, because we should not block the socket handler thread with connection creation (a blocking operation that can take multiple seconds).
1 parent 4d265d0 commit 432a62c

File tree

2 files changed

+119
-4
lines changed

2 files changed

+119
-4
lines changed

Diff for: src/net.cpp

+79
Original file line numberDiff line numberDiff line change
@@ -1546,13 +1546,33 @@ void V2Transport::MarkBytesSent(size_t bytes_sent) noexcept
15461546

15471547
m_send_pos += bytes_sent;
15481548
Assume(m_send_pos <= m_send_buffer.size());
1549+
if (m_send_pos >= CMessageHeader::HEADER_SIZE) {
1550+
m_sent_v1_header_worth = true;
1551+
}
15491552
// Wipe the buffer when everything is sent.
15501553
if (m_send_pos == m_send_buffer.size()) {
15511554
m_send_pos = 0;
15521555
ClearShrink(m_send_buffer);
15531556
}
15541557
}
15551558

1559+
bool V2Transport::ShouldReconnectV1() const noexcept
1560+
{
1561+
AssertLockNotHeld(m_send_mutex);
1562+
AssertLockNotHeld(m_recv_mutex);
1563+
// Only outgoing connections need reconnection.
1564+
if (!m_initiating) return false;
1565+
1566+
LOCK(m_recv_mutex);
1567+
// We only reconnect in the very first state and when the receive buffer is empty. Together
1568+
// these conditions imply nothing has been received so far.
1569+
if (m_recv_state != RecvState::KEY) return false;
1570+
if (!m_recv_buffer.empty()) return false;
1571+
// Check if we've sent enough for the other side to disconnect us (if it was V1).
1572+
LOCK(m_send_mutex);
1573+
return m_sent_v1_header_worth;
1574+
}
1575+
15561576
size_t V2Transport::GetSendMemoryUsage() const noexcept
15571577
{
15581578
AssertLockNotHeld(m_send_mutex);
@@ -1868,6 +1888,13 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ
18681888

18691889
void CConnman::DisconnectNodes()
18701890
{
1891+
AssertLockNotHeld(m_nodes_mutex);
1892+
AssertLockNotHeld(m_reconnections_mutex);
1893+
1894+
// Use a temporary variable to accumulate desired reconnections, so we don't need
1895+
// m_reconnections_mutex while holding m_nodes_mutex.
1896+
decltype(m_reconnections) reconnections_to_add;
1897+
18711898
{
18721899
LOCK(m_nodes_mutex);
18731900

@@ -1890,6 +1917,19 @@ void CConnman::DisconnectNodes()
18901917
// remove from m_nodes
18911918
m_nodes.erase(remove(m_nodes.begin(), m_nodes.end(), pnode), m_nodes.end());
18921919

1920+
// Add to reconnection list if appropriate. We don't reconnect right here, because
1921+
// the creation of a connection is a blocking operation (up to several seconds),
1922+
// and we don't want to hold up the socket handler thread for that long.
1923+
if (pnode->m_transport->ShouldReconnectV1()) {
1924+
reconnections_to_add.push_back({
1925+
.addr_connect = pnode->addr,
1926+
.grant = std::move(pnode->grantOutbound),
1927+
.destination = pnode->m_dest,
1928+
.conn_type = pnode->m_conn_type,
1929+
.use_v2transport = false});
1930+
LogPrint(BCLog::NET, "retrying with v1 transport protocol for peer=%d\n", pnode->GetId());
1931+
}
1932+
18931933
// release outbound grant (if any)
18941934
pnode->grantOutbound.Release();
18951935

@@ -1917,6 +1957,11 @@ void CConnman::DisconnectNodes()
19171957
}
19181958
}
19191959
}
1960+
{
1961+
// Move entries from reconnections_to_add to m_reconnections.
1962+
LOCK(m_reconnections_mutex);
1963+
m_reconnections.splice(m_reconnections.end(), std::move(reconnections_to_add));
1964+
}
19201965
}
19211966

19221967
void CConnman::NotifyNumConnectionsChanged()
@@ -2389,6 +2434,7 @@ bool CConnman::MaybePickPreferredNetwork(std::optional<Network>& network)
23892434
void CConnman::ThreadOpenConnections(const std::vector<std::string> connect)
23902435
{
23912436
AssertLockNotHeld(m_unused_i2p_sessions_mutex);
2437+
AssertLockNotHeld(m_reconnections_mutex);
23922438
FastRandomContext rng;
23932439
// Connect to specific addresses
23942440
if (!connect.empty())
@@ -2432,6 +2478,8 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect)
24322478
if (!interruptNet.sleep_for(std::chrono::milliseconds(500)))
24332479
return;
24342480

2481+
PerformReconnections();
2482+
24352483
CSemaphoreGrant grant(*semOutbound);
24362484
if (interruptNet)
24372485
return;
@@ -2778,6 +2826,7 @@ std::vector<AddedNodeInfo> CConnman::GetAddedNodeInfo() const
27782826
void CConnman::ThreadOpenAddedConnections()
27792827
{
27802828
AssertLockNotHeld(m_unused_i2p_sessions_mutex);
2829+
AssertLockNotHeld(m_reconnections_mutex);
27812830
while (true)
27822831
{
27832832
CSemaphoreGrant grant(*semAddnode);
@@ -2800,6 +2849,8 @@ void CConnman::ThreadOpenAddedConnections()
28002849
// Retry every 60 seconds if a connection was attempted, otherwise two seconds
28012850
if (!interruptNet.sleep_for(std::chrono::seconds(tried ? 60 : 2)))
28022851
return;
2852+
// See if any reconnections are desired.
2853+
PerformReconnections();
28032854
}
28042855
}
28052856

@@ -3613,6 +3664,7 @@ CNode::CNode(NodeId idIn,
36133664
addr{addrIn},
36143665
addrBind{addrBindIn},
36153666
m_addr_name{addrNameIn.empty() ? addr.ToStringAddrPort() : addrNameIn},
3667+
m_dest(addrNameIn),
36163668
m_inbound_onion{inbound_onion},
36173669
m_prefer_evict{node_opts.prefer_evict},
36183670
nKeyedNetGroup{nKeyedNetGroupIn},
@@ -3743,6 +3795,33 @@ uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& address) const
37433795
return GetDeterministicRandomizer(RANDOMIZER_ID_NETGROUP).Write(vchNetGroup).Finalize();
37443796
}
37453797

3798+
void CConnman::PerformReconnections()
3799+
{
3800+
AssertLockNotHeld(m_reconnections_mutex);
3801+
AssertLockNotHeld(m_unused_i2p_sessions_mutex);
3802+
while (true) {
3803+
// Move first element of m_reconnections to todo (avoiding an allocation inside the lock).
3804+
decltype(m_reconnections) todo;
3805+
{
3806+
LOCK(m_reconnections_mutex);
3807+
if (m_reconnections.empty()) break;
3808+
todo.splice(todo.end(), m_reconnections, m_reconnections.begin());
3809+
}
3810+
3811+
auto& item = *todo.begin();
3812+
OpenNetworkConnection(item.addr_connect,
3813+
// We only reconnect if the first attempt to connect succeeded at
3814+
// connection time, but then failed after the CNode object was
3815+
// created. Since we already know connecting is possible, do not
3816+
// count failure to reconnect.
3817+
/*fCountFailure=*/false,
3818+
std::move(item.grant),
3819+
item.destination.empty() ? nullptr : item.destination.c_str(),
3820+
item.conn_type,
3821+
item.use_v2transport);
3822+
}
3823+
}
3824+
37463825
// Dump binary message to file, with timestamp.
37473826
static void CaptureMessageToFile(const CAddress& addr,
37483827
const std::string& msg_type,

Diff for: src/net.h

+40-4
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,11 @@ class Transport {
361361

362362
/** Return the memory usage of this transport attributable to buffered data to send. */
363363
virtual size_t GetSendMemoryUsage() const noexcept = 0;
364+
365+
// 3. Miscellaneous functions.
366+
367+
/** Whether upon disconnections, a reconnect with V1 is warranted. */
368+
virtual bool ShouldReconnectV1() const noexcept = 0;
364369
};
365370

366371
class V1Transport final : public Transport
@@ -440,6 +445,7 @@ class V1Transport final : public Transport
440445
BytesToSend GetBytesToSend(bool have_next_message) const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
441446
void MarkBytesSent(size_t bytes_sent) noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
442447
size_t GetSendMemoryUsage() const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
448+
bool ShouldReconnectV1() const noexcept override { return false; }
443449
};
444450

445451
class V2Transport final : public Transport
@@ -608,6 +614,8 @@ class V2Transport final : public Transport
608614
std::string m_send_type GUARDED_BY(m_send_mutex);
609615
/** Current sender state. */
610616
SendState m_send_state GUARDED_BY(m_send_mutex);
617+
/** Whether we've sent at least 24 bytes (which would trigger disconnect for V1 peers). */
618+
bool m_sent_v1_header_worth GUARDED_BY(m_send_mutex) {false};
611619

612620
/** Change the receive state. */
613621
void SetReceiveState(RecvState recv_state) noexcept EXCLUSIVE_LOCKS_REQUIRED(m_recv_mutex);
@@ -653,6 +661,9 @@ class V2Transport final : public Transport
653661
BytesToSend GetBytesToSend(bool have_next_message) const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
654662
void MarkBytesSent(size_t bytes_sent) noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
655663
size_t GetSendMemoryUsage() const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
664+
665+
// Miscellaneous functions.
666+
bool ShouldReconnectV1() const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex, !m_send_mutex);
656667
};
657668

658669
struct CNodeOptions
@@ -706,6 +717,8 @@ class CNode
706717
// Bind address of our side of the connection
707718
const CAddress addrBind;
708719
const std::string m_addr_name;
720+
/** The pszDest argument provided to ConnectNode(). Only used for reconnections. */
721+
const std::string m_dest;
709722
//! Whether this peer is an inbound onion, i.e. connected via our Tor onion service.
710723
const bool m_inbound_onion;
711724
std::atomic<int> nVersion{0};
@@ -1253,10 +1266,10 @@ class CConnman
12531266
bool Bind(const CService& addr, unsigned int flags, NetPermissionFlags permissions);
12541267
bool InitBinds(const Options& options);
12551268

1256-
void ThreadOpenAddedConnections() EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex, !m_unused_i2p_sessions_mutex);
1269+
void ThreadOpenAddedConnections() EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex, !m_unused_i2p_sessions_mutex, !m_reconnections_mutex);
12571270
void AddAddrFetch(const std::string& strDest) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex);
12581271
void ProcessAddrFetch() EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_unused_i2p_sessions_mutex);
1259-
void ThreadOpenConnections(std::vector<std::string> connect) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_added_nodes_mutex, !m_nodes_mutex, !m_unused_i2p_sessions_mutex);
1272+
void ThreadOpenConnections(std::vector<std::string> connect) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_added_nodes_mutex, !m_nodes_mutex, !m_unused_i2p_sessions_mutex, !m_reconnections_mutex);
12601273
void ThreadMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);
12611274
void ThreadI2PAcceptIncoming();
12621275
void AcceptConnection(const ListenSocket& hListenSocket);
@@ -1274,7 +1287,7 @@ class CConnman
12741287
const CAddress& addr_bind,
12751288
const CAddress& addr);
12761289

1277-
void DisconnectNodes();
1290+
void DisconnectNodes() EXCLUSIVE_LOCKS_REQUIRED(!m_reconnections_mutex, !m_nodes_mutex);
12781291
void NotifyNumConnectionsChanged();
12791292
/** Return true if the peer is inactive and should be disconnected. */
12801293
bool InactivityCheck(const CNode& node) const;
@@ -1306,7 +1319,7 @@ class CConnman
13061319
*/
13071320
void SocketHandlerListening(const Sock::EventsPerSock& events_per_sock);
13081321

1309-
void ThreadSocketHandler() EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !mutexMsgProc);
1322+
void ThreadSocketHandler() EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !mutexMsgProc, !m_nodes_mutex, !m_reconnections_mutex);
13101323
void ThreadDNSAddressSeed() EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_nodes_mutex);
13111324

13121325
uint64_t CalculateKeyedNetGroup(const CAddress& ad) const;
@@ -1537,6 +1550,29 @@ class CConnman
15371550
*/
15381551
std::queue<std::unique_ptr<i2p::sam::Session>> m_unused_i2p_sessions GUARDED_BY(m_unused_i2p_sessions_mutex);
15391552

1553+
/**
1554+
* Mutex protecting m_reconnections.
1555+
*/
1556+
Mutex m_reconnections_mutex;
1557+
1558+
/** Struct for entries in m_reconnections. */
1559+
struct ReconnectionInfo
1560+
{
1561+
CAddress addr_connect;
1562+
CSemaphoreGrant grant;
1563+
std::string destination;
1564+
ConnectionType conn_type;
1565+
bool use_v2transport;
1566+
};
1567+
1568+
/**
1569+
* List of reconnections we have to make.
1570+
*/
1571+
std::list<ReconnectionInfo> m_reconnections GUARDED_BY(m_reconnections_mutex);
1572+
1573+
/** Attempt reconnections, if m_reconnections non-empty. */
1574+
void PerformReconnections() EXCLUSIVE_LOCKS_REQUIRED(!m_reconnections_mutex, !m_unused_i2p_sessions_mutex);
1575+
15401576
/**
15411577
* Cap on the size of `m_unused_i2p_sessions`, to ensure it does not
15421578
* unexpectedly use too much memory.

0 commit comments

Comments
 (0)