From 3f746d5471ec9cee3997af9ebac42bd241dc2ad5 Mon Sep 17 00:00:00 2001 From: Tranz5 Date: Tue, 22 Jul 2014 19:36:05 -0400 Subject: [PATCH] Use per-message send buffer, rather than per connection This can be combined with 69948a6f589f378662e9ee08608f4e9c16079134 --- src/main.cpp | 13 +++++---- src/net.cpp | 64 ++++++++++++++++++++++++++--------------- src/net.h | 76 ++++++++++++++++++++----------------------------- src/protocol.h | 3 +- src/serialize.h | 8 +++++- 5 files changed, 89 insertions(+), 75 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index 5605bbb..9416259 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -3136,7 +3136,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv) // Change version pfrom->PushMessage("verack"); - pfrom->vSend.SetVersion(min(pfrom->nVersion, PROTOCOL_VERSION)); + pfrom->ssSend.SetVersion(min(pfrom->nVersion, PROTOCOL_VERSION)); if (!pfrom->fInbound) { @@ -3747,9 +3747,9 @@ bool ProcessMessages(CNode* pfrom) bool fOk = true; std::deque::iterator it = pfrom->vRecvMsg.begin(); - while (it != pfrom->vRecvMsg.end()) { + while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) { // Don't bother if send buffer is too full to respond anyway - if (pfrom->vSend.size() >= SendBufferSize()) + if (pfrom->nSendSize >= SendBufferSize()) break; @@ -3839,7 +3839,10 @@ bool ProcessMessages(CNode* pfrom) printf("ProcessMessage(%s, %u bytes) FAILED\n", strCommand.c_str(), nMessageSize); } - pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it); + // In case the connection got shut down, its receive buffer was wiped + if (!pfrom->fDisconnect) + pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it); + return fOk; } @@ -3854,7 +3857,7 @@ bool SendMessages(CNode* pto, bool fSendTrickle) // Keep-alive ping. We send a nonce of zero because we don't use it anywhere // right now. - if (pto->nLastSend && GetTime() - pto->nLastSend > 30 * 60 && pto->vSend.empty()) { + if (pto->nLastSend && GetTime() - pto->nLastSend > 30 * 60 && pto->vSendMsg.empty()) { uint64 nonce = 0; if (pto->nVersion > BIP0031_VERSION) pto->PushMessage("ping", nonce); diff --git a/src/net.cpp b/src/net.cpp index 246970d..b55a280 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -716,26 +716,43 @@ int CNetMessage::readData(const char *pch, unsigned int nBytes) // requires LOCK(cs_vSend) void SocketSendData(CNode *pnode) { - CDataStream& vSend = pnode->vSend; - if (vSend.empty()) - return; - - int nBytes = send(pnode->hSocket, &vSend[0], vSend.size(), MSG_NOSIGNAL | MSG_DONTWAIT); - if (nBytes > 0) - { - vSend.erase(vSend.begin(), vSend.begin() + nBytes); - pnode->nLastSend = GetTime(); - } - else if (nBytes < 0) - { - // error - int nErr = WSAGetLastError(); - if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) - { - printf("socket send error %d\n", nErr); - pnode->CloseSocketDisconnect(); - } - } + std::deque::iterator it = pnode->vSendMsg.begin(); + + while (it != pnode->vSendMsg.end()) { + const CSerializeData &data = *it; + assert(data.size() > pnode->nSendOffset); + int nBytes = send(pnode->hSocket, &data[pnode->nSendOffset], data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT); + if (nBytes > 0) { + pnode->nLastSend = GetTime(); + pnode->nSendOffset += nBytes; + if (pnode->nSendOffset == data.size()) { + pnode->nSendOffset = 0; + pnode->nSendSize -= data.size(); + it++; + } else { + // could not send full message; stop sending more + break; + } + } else { + if (nBytes < 0) { + // error + int nErr = WSAGetLastError(); + if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) + { + printf("socket send error %d\n", nErr); + pnode->CloseSocketDisconnect(); + } + } + // couldn't send anything at all + break; + } + } + + if (it == pnode->vSendMsg.end()) { + assert(pnode->nSendOffset == 0); + assert(pnode->nSendSize == 0); + } + pnode->vSendMsg.erase(pnode->vSendMsg.begin(), it); } void ThreadSocketHandler(void* parg) @@ -777,7 +794,7 @@ void ThreadSocketHandler2(void* parg) BOOST_FOREACH(CNode* pnode, vNodesCopy) { if (pnode->fDisconnect || - (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->vSend.empty())) + (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0 && pnode->ssSend.empty())) { // remove from vNodes vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); @@ -867,13 +884,14 @@ void ThreadSocketHandler2(void* parg) TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) { // do not read, if draining write queue - if (!pnode->vSend.empty()) + if (!pnode->vSendMsg.empty()) FD_SET(pnode->hSocket, &fdsetSend); else FD_SET(pnode->hSocket, &fdsetRecv); FD_SET(pnode->hSocket, &fdsetError); hSocketMax = max(hSocketMax, pnode->hSocket); have_fds = true; + } } } } @@ -1033,7 +1051,7 @@ void ThreadSocketHandler2(void* parg) // // Inactivity checking // - if (pnode->vSend.empty()) + if (pnode->vSendMsg.empty()) pnode->nLastSendEmpty = GetTime(); if (GetTime() - pnode->nTimeConnected > 60) { diff --git a/src/net.h b/src/net.h index 5fa8733..ed0efc3 100644 --- a/src/net.h +++ b/src/net.h @@ -201,7 +201,10 @@ class CNode // socket uint64 nServices; SOCKET hSocket; - CDataStream vSend; + CDataStream ssSend; + size_t nSendSize; // total size of all vSendMsg entries + size_t nSendOffset; // offset inside the first vSendMsg already sent + std::deque vSendMsg; CCriticalSection cs_vSend; std::deque vRecvMsg; @@ -215,8 +218,6 @@ class CNode uint64 nBlocksRequested; uint64 nRecvBytes; uint64 nSendBytes; - int nHeaderStart; - unsigned int nMessageStart; CAddress addr; std::string addrName; CService addrLocal; @@ -259,7 +260,7 @@ class CNode CCriticalSection cs_inventory; std::multimap mapAskFor; - CNode(SOCKET hSocketIn, CAddress addrIn, std::string addrNameIn = "", bool fInboundIn=false) : vSend(SER_NETWORK, MIN_PROTO_VERSION) + CNode(SOCKET hSocketIn, CAddress addrIn, std::string addrNameIn = "", bool fInboundIn=false) : ssSend(SER_NETWORK, MIN_PROTO_VERSION) { nServices = 0; hSocket = hSocketIn; @@ -271,8 +272,6 @@ class CNode nSendBytes = 0; nRecvBytes = 0; nBlocksRequested = 0; - nHeaderStart = -1; - nMessageStart = -1; addr = addrIn; addrName = addrNameIn == "" ? addr.ToStringIPPort() : addrNameIn; nVersion = 0; @@ -284,6 +283,8 @@ class CNode fSuccessfullyConnected = false; fDisconnect = false; nRefCount = 0; + nSendSize = 0; + nSendOffset = 0; hashContinue = 0; pindexLastGetBlocksBegin = 0; hashLastGetBlocksEnd = 0; @@ -409,22 +410,16 @@ class CNode void BeginMessage(const char* pszCommand) { ENTER_CRITICAL_SECTION(cs_vSend); - if (nHeaderStart != -1) - AbortMessage(); - nHeaderStart = vSend.size(); - vSend << CMessageHeader(pszCommand, 0); - nMessageStart = vSend.size(); + assert(ssSend.size() == 0); + ssSend << CMessageHeader(pszCommand, 0); if (fDebug) printf("sending: %s ", pszCommand); } void AbortMessage() { - if (nHeaderStart < 0) - return; - vSend.resize(nHeaderStart); - nHeaderStart = -1; - nMessageStart = -1; + ssSend.clear(); + LEAVE_CRITICAL_SECTION(cs_vSend); if (fDebug) @@ -440,44 +435,35 @@ class CNode return; } - if (nHeaderStart < 0) + if (ssSend.size() == 0) return; // Set the size - unsigned int nSize = vSend.size() - nMessageStart; - memcpy((char*)&vSend[nHeaderStart] + CMessageHeader::MESSAGE_SIZE_OFFSET, &nSize, sizeof(nSize)); + unsigned int nSize = ssSend.size() - CMessageHeader::HEADER_SIZE; + memcpy((char*)&ssSend[CMessageHeader::MESSAGE_SIZE_OFFSET], &nSize, sizeof(nSize)); // Set the checksum - uint256 hash = Hash(vSend.begin() + nMessageStart, vSend.end()); + uint256 hash = Hash(ssSend.begin() + CMessageHeader::HEADER_SIZE, ssSend.end()); unsigned int nChecksum = 0; memcpy(&nChecksum, &hash, sizeof(nChecksum)); - assert(nMessageStart - nHeaderStart >= CMessageHeader::CHECKSUM_OFFSET + sizeof(nChecksum)); - memcpy((char*)&vSend[nHeaderStart] + CMessageHeader::CHECKSUM_OFFSET, &nChecksum, sizeof(nChecksum)); + assert(ssSend.size () >= CMessageHeader::CHECKSUM_OFFSET + sizeof(nChecksum)); + memcpy((char*)&ssSend[CMessageHeader::CHECKSUM_OFFSET], &nChecksum, sizeof(nChecksum)); if (fDebug) { printf("(%d bytes)\n", nSize); } + std::deque::iterator it = vSendMsg.insert(vSendMsg.end(), CSerializeData()); + ssSend.GetAndClear(*it); + nSendSize += (*it).size(); + // If write queue empty, attempt "optimistic write" - if (nHeaderStart == 0) + if (it == vSendMsg.begin()) SocketSendData(this); - nHeaderStart = -1; - nMessageStart = -1; LEAVE_CRITICAL_SECTION(cs_vSend); } - void EndMessageAbortIfEmpty() - { - if (nHeaderStart < 0) - return; - int nSize = vSend.size() - nMessageStart; - if (nSize > 0) - EndMessage(); - else - AbortMessage(); - } - void PushVersion(); @@ -503,7 +489,7 @@ class CNode try { BeginMessage(pszCommand); - vSend << a1; + ssSend << a1; EndMessage(); } catch (...) @@ -519,7 +505,7 @@ class CNode try { BeginMessage(pszCommand); - vSend << a1 << a2; + ssSend << a1 << a2; EndMessage(); } catch (...) @@ -535,7 +521,7 @@ class CNode try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3; + ssSend << a1 << a2 << a3; EndMessage(); } catch (...) @@ -551,7 +537,7 @@ class CNode try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4; + ssSend << a1 << a2 << a3 << a4; EndMessage(); } catch (...) @@ -567,7 +553,7 @@ class CNode try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4 << a5; + ssSend << a1 << a2 << a3 << a4 << a5; EndMessage(); } catch (...) @@ -583,7 +569,7 @@ class CNode try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4 << a5 << a6; + ssSend << a1 << a2 << a3 << a4 << a5 << a6; EndMessage(); } catch (...) @@ -599,7 +585,7 @@ class CNode try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7; + ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7; EndMessage(); } catch (...) @@ -615,7 +601,7 @@ class CNode try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8; + ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8; EndMessage(); } catch (...) @@ -631,7 +617,7 @@ class CNode try { BeginMessage(pszCommand); - vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8 << a9; + ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8 << a9; EndMessage(); } catch (...) diff --git a/src/protocol.h b/src/protocol.h index 6491da8..08dd840 100644 --- a/src/protocol.h +++ b/src/protocol.h @@ -56,7 +56,8 @@ class CMessageHeader CHECKSUM_SIZE=sizeof(int), MESSAGE_SIZE_OFFSET=MESSAGE_START_SIZE+COMMAND_SIZE, - CHECKSUM_OFFSET=MESSAGE_SIZE_OFFSET+MESSAGE_SIZE_SIZE + CHECKSUM_OFFSET=MESSAGE_SIZE_OFFSET+MESSAGE_SIZE_SIZE, + HEADER_SIZE=MESSAGE_START_SIZE+COMMAND_SIZE+MESSAGE_SIZE_SIZE+CHECKSUM_SIZE }; char pchMessageStart[MESSAGE_START_SIZE]; char pchCommand[COMMAND_SIZE]; diff --git a/src/serialize.h b/src/serialize.h index 270275e..43ab63f 100644 --- a/src/serialize.h +++ b/src/serialize.h @@ -706,6 +706,7 @@ struct ser_streamplaceholder +typedef std::vector > CSerializeData; /** Double ended buffer combining vector and stream-like interfaces. * @@ -715,7 +716,7 @@ struct ser_streamplaceholder class CDataStream { protected: - typedef std::vector > vector_type; + typedef CSerializeData vector_type; vector_type vch; unsigned int nReadPos; short state; @@ -997,6 +998,11 @@ class CDataStream ::Unserialize(*this, obj, nType, nVersion); return (*this); } + + void GetAndClear(CSerializeData &data) { + vch.swap(data); + CSerializeData().swap(vch); + } };