Skip to content
Permalink
Browse files

Merge #9441: Net: Massive speedup. Net locks overhaul

e60360e net: remove cs_vRecvMsg (Cory Fields)
991955e net: add a flag to indicate when a node's send buffer is full (Cory Fields)
c6e8a9b net: add a flag to indicate when a node's process queue is full (Cory Fields)
4d712e3 net: add a new message queue for the message processor (Cory Fields)
c5a8b1b net: rework the way that the messagehandler sleeps (Cory Fields)
c72cc88 net: remove useless comments (Cory Fields)
ef7b5ec net: Add a simple function for waking the message handler (Cory Fields)
f5c36d1 net: record bytes written before notifying the message processor (Cory Fields)
60befa3 net: handle message accounting in ReceiveMsgBytes (Cory Fields)
56212e2 net: set message deserialization version when it's actually time to deserialize (Cory Fields)
0e973d9 net: remove redundant max sendbuffer size check (Cory Fields)
6042587 net: wait until the node is destroyed to delete its recv buffer (Cory Fields)
f6315e0 net: only disconnect if fDisconnect has been set (Cory Fields)
5b4a8ac net: make GetReceiveFloodSize public (Cory Fields)
e5bcd9c net: make vRecvMsg a list so that we can use splice() (Cory Fields)
53ad9a1 net: fix typo causing the wrong receive buffer size (Cory Fields)
  • Loading branch information...
sipa committed Jan 13, 2017
2 parents 02e5308 + e60360e commit 8b66bf74e2a349e71eaa183af81fa63eaee76ad2
Showing with 107 additions and 125 deletions.
  1. +57 −58 src/net.cpp
  2. +19 −19 src/net.h
  3. +30 −48 src/net_processing.cpp
  4. +1 −0 src/net_processing.h
@@ -437,11 +437,6 @@ void CNode::CloseSocketDisconnect()
LogPrint("net", "disconnecting peer=%d\n", id);
CloseSocket(hSocket);
}

// in case this fails, we'll empty the recv buffer when the CNode is deleted
TRY_LOCK(cs_vRecvMsg, lockRecv);
if (lockRecv)
vRecvMsg.clear();
}

void CConnman::ClearBanned()
@@ -650,16 +645,18 @@ void CNode::copyStats(CNodeStats &stats)
}
#undef X

// requires LOCK(cs_vRecvMsg)
bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete)
{
complete = false;
int64_t nTimeMicros = GetTimeMicros();
nLastRecv = nTimeMicros / 1000000;
nRecvBytes += nBytes;
while (nBytes > 0) {

// get current incomplete message, or create a new one
if (vRecvMsg.empty() ||
vRecvMsg.back().complete())
vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, nRecvVersion));
vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, INIT_PROTO_VERSION));

CNetMessage& msg = vRecvMsg.back();

@@ -691,7 +688,7 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete
assert(i != mapRecvBytesPerMsgCmd.end());
i->second += msg.hdr.nMessageSize + CMessageHeader::HEADER_SIZE;

msg.nTime = GetTimeMicros();
msg.nTime = nTimeMicros;
complete = true;
}
}
@@ -764,7 +761,7 @@ const uint256& CNetMessage::GetMessageHash() const


// requires LOCK(cs_vSend)
size_t SocketSendData(CNode *pnode)
size_t CConnman::SocketSendData(CNode *pnode)
{
auto it = pnode->vSendMsg.begin();
size_t nSentSize = 0;
@@ -781,6 +778,7 @@ size_t SocketSendData(CNode *pnode)
if (pnode->nSendOffset == data.size()) {
pnode->nSendOffset = 0;
pnode->nSendSize -= data.size();
pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize;
it++;
} else {
// could not send full message; stop sending more
@@ -1052,8 +1050,7 @@ void CConnman::ThreadSocketHandler()
std::vector<CNode*> vNodesCopy = vNodes;
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
if (pnode->fDisconnect ||
(pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0))
if (pnode->fDisconnect)
{
// remove from vNodes
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
@@ -1083,13 +1080,9 @@ void CConnman::ThreadSocketHandler()
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend)
{
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
if (lockRecv)
{
TRY_LOCK(pnode->cs_inventory, lockInv);
if (lockInv)
fDelete = true;
}
}
}
if (fDelete)
@@ -1149,15 +1142,10 @@ void CConnman::ThreadSocketHandler()
// write buffer in this case before receiving more. This avoids
// needlessly queueing received data, if the remote peer is not themselves
// receiving data. This means properly utilizing TCP flow control signalling.
// * Otherwise, if there is no (complete) message in the receive buffer,
// or there is space left in the buffer, select() for receiving data.
// * (if neither of the above applies, there is certainly one message
// in the receiver buffer ready to be processed).
// Together, that means that at least one of the following is always possible,
// so we don't deadlock:
// * We send some data.
// * We wait for data to be received (and disconnect after timeout).
// * We process a message in the buffer (message handler thread).
// * Otherwise, if there is space left in the receive buffer, select() for
// receiving data.
// * Hand off all complete messages to the processor, to be handled without
// blocking here.
{
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend) {
@@ -1168,10 +1156,7 @@ void CConnman::ThreadSocketHandler()
}
}
{
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
if (lockRecv && (
pnode->vRecvMsg.empty() || !pnode->vRecvMsg.front().complete() ||
pnode->GetTotalRecvSize() <= GetReceiveFloodSize()))
if (!pnode->fPauseRecv)
FD_SET(pnode->hSocket, &fdsetRecv);
}
}
@@ -1230,8 +1215,6 @@ void CConnman::ThreadSocketHandler()
continue;
if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError))
{
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
if (lockRecv)
{
{
// typical socket buffer is 8K-64K
@@ -1242,11 +1225,23 @@ void CConnman::ThreadSocketHandler()
bool notify = false;
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
pnode->CloseSocketDisconnect();
if(notify)
condMsgProc.notify_one();
pnode->nLastRecv = GetTime();
pnode->nRecvBytes += nBytes;
RecordBytesRecv(nBytes);
if (notify) {
size_t nSizeAdded = 0;
auto it(pnode->vRecvMsg.begin());
for (; it != pnode->vRecvMsg.end(); ++it) {
if (!it->complete())
break;
nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE;
}
{
LOCK(pnode->cs_vProcessMsg);
pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
pnode->nProcessQueueSize += nSizeAdded;
pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
}
WakeMessageHandler();
}
}
else if (nBytes == 0)
{
@@ -1280,8 +1275,9 @@ void CConnman::ThreadSocketHandler()
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend) {
size_t nBytes = SocketSendData(pnode);
if (nBytes)
if (nBytes) {
RecordBytesSent(nBytes);
}
}
}

@@ -1321,8 +1317,14 @@ void CConnman::ThreadSocketHandler()
}
}



void CConnman::WakeMessageHandler()
{
{
std::lock_guard<std::mutex> lock(mutexMsgProc);
fMsgProcWake = true;
}
condMsgProc.notify_one();
}



@@ -1858,30 +1860,16 @@ void CConnman::ThreadMessageHandler()
}
}

bool fSleep = true;
bool fMoreWork = false;

BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
if (pnode->fDisconnect)
continue;

// Receive messages
{
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
if (lockRecv)
{
if (!GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc))
pnode->CloseSocketDisconnect();

if (pnode->nSendSize < GetSendBufferSize())
{
if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg[0].complete()))
{
fSleep = false;
}
}
}
}
bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc);
fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
if (flagInterruptMsgProc)
return;

@@ -1901,10 +1889,11 @@ void CConnman::ThreadMessageHandler()
pnode->Release();
}

if (fSleep) {
std::unique_lock<std::mutex> lock(mutexMsgProc);
condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100));
std::unique_lock<std::mutex> lock(mutexMsgProc);
if (!fMoreWork) {
condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100), [this] { return fMsgProcWake; });
}
fMsgProcWake = false;
}
}

@@ -2121,7 +2110,7 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
nMaxFeeler = connOptions.nMaxFeeler;

nSendBufferMaxSize = connOptions.nSendBufferMaxSize;
nReceiveFloodSize = connOptions.nSendBufferMaxSize;
nReceiveFloodSize = connOptions.nReceiveFloodSize;

nMaxOutboundLimit = connOptions.nMaxOutboundLimit;
nMaxOutboundTimeframe = connOptions.nMaxOutboundTimeframe;
@@ -2182,6 +2171,11 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
interruptNet.reset();
flagInterruptMsgProc = false;

{
std::unique_lock<std::mutex> lock(mutexMsgProc);
fMsgProcWake = false;
}

// Send and receive from sockets, accept connections
threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));

@@ -2613,6 +2607,9 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
minFeeFilter = 0;
lastSentFeeFilter = 0;
nextSendTimeFeeFilter = 0;
fPauseRecv = false;
fPauseSend = false;
nProcessQueueSize = 0;

BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes())
mapRecvBytesPerMsgCmd[msg] = 0;
@@ -2692,6 +2689,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize;
pnode->nSendSize += nTotalSize;

if (pnode->nSendSize > nSendBufferMaxSize)
pnode->fPauseSend = true;
pnode->vSendMsg.push_back(std::move(serializedHeader));
if (nMessageSize)
pnode->vSendMsg.push_back(std::move(msg.data));
@@ -327,6 +327,7 @@ class CConnman
/** Get a unique deterministic randomizer. */
CSipHasher GetDeterministicRandomizer(uint64_t id);

unsigned int GetReceiveFloodSize() const;
private:
struct ListenSocket {
SOCKET socket;
@@ -343,6 +344,8 @@ class CConnman
void ThreadSocketHandler();
void ThreadDNSAddressSeed();

void WakeMessageHandler();

uint64_t CalculateKeyedNetGroup(const CAddress& ad);

CNode* FindNode(const CNetAddr& ip);
@@ -358,6 +361,7 @@ class CConnman

NodeId GetNewNodeId();

size_t SocketSendData(CNode *pnode);
//!check is the banlist has unwritten changes
bool BannedSetIsDirty();
//!set the "dirty" flag for the banlist
@@ -368,8 +372,6 @@ class CConnman
void DumpData();
void DumpBanlist();

unsigned int GetReceiveFloodSize() const;

// Network stats
void RecordBytesRecv(uint64_t bytes);
void RecordBytesSent(uint64_t bytes);
@@ -428,6 +430,9 @@ class CConnman
/** SipHasher seeds for deterministic randomness */
const uint64_t nSeed0, nSeed1;

/** flag for waking the message processor. */
bool fMsgProcWake;

std::condition_variable condMsgProc;
std::mutex mutexMsgProc;
std::atomic<bool> flagInterruptMsgProc;
@@ -445,7 +450,6 @@ void Discover(boost::thread_group& threadGroup);
void MapPort(bool fUseUPnP);
unsigned short GetListenPort();
bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false);
size_t SocketSendData(CNode *pnode);

struct CombinerAll
{
@@ -610,11 +614,13 @@ class CNode
std::deque<std::vector<unsigned char>> vSendMsg;
CCriticalSection cs_vSend;

CCriticalSection cs_vProcessMsg;
std::list<CNetMessage> vProcessMsg;
size_t nProcessQueueSize;

std::deque<CInv> vRecvGetData;
std::deque<CNetMessage> vRecvMsg;
CCriticalSection cs_vRecvMsg;
uint64_t nRecvBytes;
int nRecvVersion;
std::atomic<int> nRecvVersion;

int64_t nLastSend;
int64_t nLastRecv;
@@ -650,6 +656,8 @@ class CNode
const NodeId id;

const uint64_t nKeyedNetGroup;
std::atomic_bool fPauseRecv;
std::atomic_bool fPauseSend;
protected:

mapMsgCmdSize mapSendBytesPerMsgCmd;
@@ -723,6 +731,7 @@ class CNode
const ServiceFlags nLocalServices;
const int nMyStartingHeight;
int nSendVersion;
std::list<CNetMessage> vRecvMsg; // Used only by SocketHandler thread
public:

NodeId GetId() const {
@@ -743,24 +752,15 @@ class CNode
return nRefCount;
}

// requires LOCK(cs_vRecvMsg)
unsigned int GetTotalRecvSize()
{
unsigned int total = 0;
BOOST_FOREACH(const CNetMessage &msg, vRecvMsg)
total += msg.vRecv.size() + 24;
return total;
}

// requires LOCK(cs_vRecvMsg)
bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete);

// requires LOCK(cs_vRecvMsg)
void SetRecvVersion(int nVersionIn)
{
nRecvVersion = nVersionIn;
BOOST_FOREACH(CNetMessage &msg, vRecvMsg)
msg.SetVersion(nVersionIn);
}
int GetRecvVersion()
{
return nRecvVersion;
}
void SetSendVersion(int nVersionIn)
{

0 comments on commit 8b66bf7

Please sign in to comment.
You can’t perform that action at this time.