Permalink
Browse files

Further updates to CNetMessage processing

These could be combined with the previous commit
  • Loading branch information...
Tranz5 committed Jul 22, 2014
1 parent ec2a4ac commit ba85423ba59801e1ba6d9a4a8130cc7a7a8d059a
Showing with 30 additions and 21 deletions.
  1. +15 −14 src/main.cpp
  2. +10 −2 src/net.cpp
  3. +5 −5 src/net.h
View
@@ -3733,8 +3733,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
// requires LOCK(cs_vRecvMsg)
bool ProcessMessages(CNode* pfrom)
{
if (pfrom->vRecvMsg.empty())
return true;
// if (fDebug)
// printf("ProcessMessages(%zu messages)\n", pfrom->vRecvMsg.size());
@@ -3746,30 +3744,36 @@ bool ProcessMessages(CNode* pfrom)
// (4) checksum
// (x) data
//
bool fOk = true;
unsigned int nMsgPos = 0;
for (; nMsgPos < pfrom->vRecvMsg.size(); nMsgPos++)
{
std::deque<CNetMessage>::iterator it = pfrom->vRecvMsg.begin();
while (it != pfrom->vRecvMsg.end()) {
// Don't bother if send buffer is too full to respond anyway
if (pfrom->vSend.size() >= SendBufferSize())
break;
// get next message; end, if an incomplete message is found
CNetMessage& msg = pfrom->vRecvMsg[nMsgPos];
// get next message
CNetMessage& msg = *it;
//if (fDebug)
// printf("ProcessMessages(message %u msgsz, %zu bytes, complete:%s)\n",
// msg.hdr.nMessageSize, msg.vRecv.size(),
// msg.complete() ? "Y" : "N");
// end, if an incomplete message is found
if (!msg.complete())
break;
// at this point, any failure means we can delete the current message
it++;
// Scan for message start
if (memcmp(msg.hdr.pchMessageStart, pchMessageStart, sizeof(pchMessageStart)) != 0) {
printf("\n\nPROCESSMESSAGE: INVALID MESSAGESTART\n\n");
return false;
fOk = false;
break;
}
// Read header
@@ -3806,7 +3810,7 @@ bool ProcessMessages(CNode* pfrom)
fRet = ProcessMessage(pfrom, strCommand, vRecv);
}
if (fShutdown)
return true;
break;
}
catch (std::ios_base::failure& e)
{
@@ -3835,11 +3839,8 @@ bool ProcessMessages(CNode* pfrom)
printf("ProcessMessage(%s, %u bytes) FAILED\n", strCommand.c_str(), nMessageSize);
}
// remove processed messages; one incomplete message may remain
if (nMsgPos > 0)
pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(),
pfrom->vRecvMsg.begin() + nMsgPos);
return true;
pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it);
return fOk;
}
View
@@ -534,8 +534,13 @@ void CNode::CloseSocketDisconnect()
printf("disconnecting node %s\n", addrName.c_str());
closesocket(hSocket);
hSocket = INVALID_SOCKET;
vRecvMsg.clear();
// in case this fails, we'll empty the recv buffer when the CNode is deleted
TRY_LOCK(cs_vRecvMsg, lockRecv);
if (lockRecv)
vRecvMsg.clear();
}
}
void CNode::Cleanup()
@@ -634,7 +639,7 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes)
while (nBytes > 0) {
// get current incomplete message, or create a new one
if (vRecvMsg.size() == 0 ||
if (vRecvMsg.empty() ||
vRecvMsg.back().complete())
vRecvMsg.push_back(CNetMessage(SER_NETWORK, nRecvVersion));
@@ -1686,6 +1691,9 @@ void ThreadMessageHandler2(void* parg)
pnodeTrickle = vNodesCopy[GetRand(vNodesCopy.size())];
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
if (pnode->fDisconnect)
continue;
// Receive messages
{
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
View
@@ -203,7 +203,7 @@ class CNode
CDataStream vSend;
CCriticalSection cs_vSend;
std::vector<CNetMessage> vRecvMsg;
std::deque<CNetMessage> vRecvMsg;
CCriticalSection cs_vRecvMsg;
int nRecvVersion;
@@ -322,8 +322,8 @@ class CNode
unsigned int GetTotalRecvSize()
{
unsigned int total = 0;
for (unsigned int i = 0; i < vRecvMsg.size(); i++)
total += vRecvMsg[i].vRecv.size();
BOOST_FOREACH(const CNetMessage &msg, vRecvMsg)
total += msg.vRecv.size() + 24;
return total;
}
@@ -334,8 +334,8 @@ class CNode
void SetRecvVersion(int nVersionIn)
{
nRecvVersion = nVersionIn;
for (unsigned int i = 0; i < vRecvMsg.size(); i++)
vRecvMsg[i].SetVersion(nVersionIn);
BOOST_FOREACH(CNetMessage &msg, vRecvMsg)
msg.SetVersion(nVersionIn);
}
CNode* AddRef()

0 comments on commit ba85423

Please sign in to comment.