Skip to content

Commit

Permalink
Use per-message send buffer, rather than per connection
Browse files Browse the repository at this point in the history
This can be combined with 69948a6
  • Loading branch information
Tranz5 committed Jul 22, 2014
1 parent d548cba commit 3f746d5
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 75 deletions.
13 changes: 8 additions & 5 deletions src/main.cpp
Expand Up @@ -3136,7 +3136,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)

// Change version
pfrom->PushMessage("verack");
pfrom->vSend.SetVersion(min(pfrom->nVersion, PROTOCOL_VERSION));
pfrom->ssSend.SetVersion(min(pfrom->nVersion, PROTOCOL_VERSION));

if (!pfrom->fInbound)
{
Expand Down Expand Up @@ -3747,9 +3747,9 @@ bool ProcessMessages(CNode* pfrom)
bool fOk = true;

std::deque<CNetMessage>::iterator it = pfrom->vRecvMsg.begin();
while (it != pfrom->vRecvMsg.end()) {
while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) {
// Don't bother if send buffer is too full to respond anyway
if (pfrom->vSend.size() >= SendBufferSize())
if (pfrom->nSendSize >= SendBufferSize())
break;


Expand Down Expand Up @@ -3839,7 +3839,10 @@ bool ProcessMessages(CNode* pfrom)
printf("ProcessMessage(%s, %u bytes) FAILED\n", strCommand.c_str(), nMessageSize);
}

pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it);
// In case the connection got shut down, its receive buffer was wiped
if (!pfrom->fDisconnect)
pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it);

return fOk;
}

Expand All @@ -3854,7 +3857,7 @@ bool SendMessages(CNode* pto, bool fSendTrickle)

// Keep-alive ping. We send a nonce of zero because we don't use it anywhere
// right now.
if (pto->nLastSend && GetTime() - pto->nLastSend > 30 * 60 && pto->vSend.empty()) {
if (pto->nLastSend && GetTime() - pto->nLastSend > 30 * 60 && pto->vSendMsg.empty()) {
uint64 nonce = 0;
if (pto->nVersion > BIP0031_VERSION)
pto->PushMessage("ping", nonce);
Expand Down
64 changes: 41 additions & 23 deletions src/net.cpp
Expand Up @@ -716,26 +716,43 @@ int CNetMessage::readData(const char *pch, unsigned int nBytes)
// requires LOCK(cs_vSend)
void SocketSendData(CNode *pnode)
{
CDataStream& vSend = pnode->vSend;
if (vSend.empty())
return;

int nBytes = send(pnode->hSocket, &vSend[0], vSend.size(), MSG_NOSIGNAL | MSG_DONTWAIT);
if (nBytes > 0)
{
vSend.erase(vSend.begin(), vSend.begin() + nBytes);
pnode->nLastSend = GetTime();
}
else if (nBytes < 0)
{
// error
int nErr = WSAGetLastError();
if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
{
printf("socket send error %d\n", nErr);
pnode->CloseSocketDisconnect();
}
}
std::deque<CSerializeData>::iterator it = pnode->vSendMsg.begin();

while (it != pnode->vSendMsg.end()) {
const CSerializeData &data = *it;
assert(data.size() > pnode->nSendOffset);
int nBytes = send(pnode->hSocket, &data[pnode->nSendOffset], data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT);
if (nBytes > 0) {
pnode->nLastSend = GetTime();
pnode->nSendOffset += nBytes;
if (pnode->nSendOffset == data.size()) {
pnode->nSendOffset = 0;
pnode->nSendSize -= data.size();
it++;
} else {
// could not send full message; stop sending more
break;
}
} else {
if (nBytes < 0) {
// error
int nErr = WSAGetLastError();
if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
{
printf("socket send error %d\n", nErr);
pnode->CloseSocketDisconnect();
}
}
// couldn't send anything at all
break;
}
}

if (it == pnode->vSendMsg.end()) {
assert(pnode->nSendOffset == 0);
assert(pnode->nSendSize == 0);
}
pnode->vSendMsg.erase(pnode->vSendMsg.begin(), it);
}

void ThreadSocketHandler(void* parg)
Expand Down Expand Up @@ -777,7 +794,7 @@ void ThreadSocketHandler2(void* parg)
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
if (pnode->fDisconnect ||
(pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->vSend.empty()))
(pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0 && pnode->ssSend.empty()))
{
// remove from vNodes
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
Expand Down Expand Up @@ -867,13 +884,14 @@ void ThreadSocketHandler2(void* parg)
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend) {
// do not read, if draining write queue
if (!pnode->vSend.empty())
if (!pnode->vSendMsg.empty())
FD_SET(pnode->hSocket, &fdsetSend);
else
FD_SET(pnode->hSocket, &fdsetRecv);
FD_SET(pnode->hSocket, &fdsetError);
hSocketMax = max(hSocketMax, pnode->hSocket);
have_fds = true;
}
}
}
}
Expand Down Expand Up @@ -1033,7 +1051,7 @@ void ThreadSocketHandler2(void* parg)
//
// Inactivity checking
//
if (pnode->vSend.empty())
if (pnode->vSendMsg.empty())
pnode->nLastSendEmpty = GetTime();
if (GetTime() - pnode->nTimeConnected > 60)
{
Expand Down
76 changes: 31 additions & 45 deletions src/net.h
Expand Up @@ -201,7 +201,10 @@ class CNode
// socket
uint64 nServices;
SOCKET hSocket;
CDataStream vSend;
CDataStream ssSend;
size_t nSendSize; // total size of all vSendMsg entries
size_t nSendOffset; // offset inside the first vSendMsg already sent
std::deque<CSerializeData> vSendMsg;
CCriticalSection cs_vSend;

std::deque<CNetMessage> vRecvMsg;
Expand All @@ -215,8 +218,6 @@ class CNode
uint64 nBlocksRequested;
uint64 nRecvBytes;
uint64 nSendBytes;
int nHeaderStart;
unsigned int nMessageStart;
CAddress addr;
std::string addrName;
CService addrLocal;
Expand Down Expand Up @@ -259,7 +260,7 @@ class CNode
CCriticalSection cs_inventory;
std::multimap<int64, CInv> mapAskFor;

CNode(SOCKET hSocketIn, CAddress addrIn, std::string addrNameIn = "", bool fInboundIn=false) : vSend(SER_NETWORK, MIN_PROTO_VERSION)
CNode(SOCKET hSocketIn, CAddress addrIn, std::string addrNameIn = "", bool fInboundIn=false) : ssSend(SER_NETWORK, MIN_PROTO_VERSION)
{
nServices = 0;
hSocket = hSocketIn;
Expand All @@ -271,8 +272,6 @@ class CNode
nSendBytes = 0;
nRecvBytes = 0;
nBlocksRequested = 0;
nHeaderStart = -1;
nMessageStart = -1;
addr = addrIn;
addrName = addrNameIn == "" ? addr.ToStringIPPort() : addrNameIn;
nVersion = 0;
Expand All @@ -284,6 +283,8 @@ class CNode
fSuccessfullyConnected = false;
fDisconnect = false;
nRefCount = 0;
nSendSize = 0;
nSendOffset = 0;
hashContinue = 0;
pindexLastGetBlocksBegin = 0;
hashLastGetBlocksEnd = 0;
Expand Down Expand Up @@ -409,22 +410,16 @@ class CNode
void BeginMessage(const char* pszCommand)
{
ENTER_CRITICAL_SECTION(cs_vSend);
if (nHeaderStart != -1)
AbortMessage();
nHeaderStart = vSend.size();
vSend << CMessageHeader(pszCommand, 0);
nMessageStart = vSend.size();
assert(ssSend.size() == 0);
ssSend << CMessageHeader(pszCommand, 0);
if (fDebug)
printf("sending: %s ", pszCommand);
}

void AbortMessage()
{
if (nHeaderStart < 0)
return;
vSend.resize(nHeaderStart);
nHeaderStart = -1;
nMessageStart = -1;
ssSend.clear();

LEAVE_CRITICAL_SECTION(cs_vSend);

if (fDebug)
Expand All @@ -440,44 +435,35 @@ class CNode
return;
}

if (nHeaderStart < 0)
if (ssSend.size() == 0)
return;

// Set the size
unsigned int nSize = vSend.size() - nMessageStart;
memcpy((char*)&vSend[nHeaderStart] + CMessageHeader::MESSAGE_SIZE_OFFSET, &nSize, sizeof(nSize));
unsigned int nSize = ssSend.size() - CMessageHeader::HEADER_SIZE;
memcpy((char*)&ssSend[CMessageHeader::MESSAGE_SIZE_OFFSET], &nSize, sizeof(nSize));

// Set the checksum
uint256 hash = Hash(vSend.begin() + nMessageStart, vSend.end());
uint256 hash = Hash(ssSend.begin() + CMessageHeader::HEADER_SIZE, ssSend.end());
unsigned int nChecksum = 0;
memcpy(&nChecksum, &hash, sizeof(nChecksum));
assert(nMessageStart - nHeaderStart >= CMessageHeader::CHECKSUM_OFFSET + sizeof(nChecksum));
memcpy((char*)&vSend[nHeaderStart] + CMessageHeader::CHECKSUM_OFFSET, &nChecksum, sizeof(nChecksum));
assert(ssSend.size () >= CMessageHeader::CHECKSUM_OFFSET + sizeof(nChecksum));
memcpy((char*)&ssSend[CMessageHeader::CHECKSUM_OFFSET], &nChecksum, sizeof(nChecksum));

if (fDebug) {
printf("(%d bytes)\n", nSize);
}

std::deque<CSerializeData>::iterator it = vSendMsg.insert(vSendMsg.end(), CSerializeData());
ssSend.GetAndClear(*it);
nSendSize += (*it).size();

// If write queue empty, attempt "optimistic write"
if (nHeaderStart == 0)
if (it == vSendMsg.begin())
SocketSendData(this);

nHeaderStart = -1;
nMessageStart = -1;
LEAVE_CRITICAL_SECTION(cs_vSend);
}

void EndMessageAbortIfEmpty()
{
if (nHeaderStart < 0)
return;
int nSize = vSend.size() - nMessageStart;
if (nSize > 0)
EndMessage();
else
AbortMessage();
}



void PushVersion();
Expand All @@ -503,7 +489,7 @@ class CNode
try
{
BeginMessage(pszCommand);
vSend << a1;
ssSend << a1;
EndMessage();
}
catch (...)
Expand All @@ -519,7 +505,7 @@ class CNode
try
{
BeginMessage(pszCommand);
vSend << a1 << a2;
ssSend << a1 << a2;
EndMessage();
}
catch (...)
Expand All @@ -535,7 +521,7 @@ class CNode
try
{
BeginMessage(pszCommand);
vSend << a1 << a2 << a3;
ssSend << a1 << a2 << a3;
EndMessage();
}
catch (...)
Expand All @@ -551,7 +537,7 @@ class CNode
try
{
BeginMessage(pszCommand);
vSend << a1 << a2 << a3 << a4;
ssSend << a1 << a2 << a3 << a4;
EndMessage();
}
catch (...)
Expand All @@ -567,7 +553,7 @@ class CNode
try
{
BeginMessage(pszCommand);
vSend << a1 << a2 << a3 << a4 << a5;
ssSend << a1 << a2 << a3 << a4 << a5;
EndMessage();
}
catch (...)
Expand All @@ -583,7 +569,7 @@ class CNode
try
{
BeginMessage(pszCommand);
vSend << a1 << a2 << a3 << a4 << a5 << a6;
ssSend << a1 << a2 << a3 << a4 << a5 << a6;
EndMessage();
}
catch (...)
Expand All @@ -599,7 +585,7 @@ class CNode
try
{
BeginMessage(pszCommand);
vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7;
ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7;
EndMessage();
}
catch (...)
Expand All @@ -615,7 +601,7 @@ class CNode
try
{
BeginMessage(pszCommand);
vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8;
ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8;
EndMessage();
}
catch (...)
Expand All @@ -631,7 +617,7 @@ class CNode
try
{
BeginMessage(pszCommand);
vSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8 << a9;
ssSend << a1 << a2 << a3 << a4 << a5 << a6 << a7 << a8 << a9;
EndMessage();
}
catch (...)
Expand Down
3 changes: 2 additions & 1 deletion src/protocol.h
Expand Up @@ -56,7 +56,8 @@ class CMessageHeader
CHECKSUM_SIZE=sizeof(int),

MESSAGE_SIZE_OFFSET=MESSAGE_START_SIZE+COMMAND_SIZE,
CHECKSUM_OFFSET=MESSAGE_SIZE_OFFSET+MESSAGE_SIZE_SIZE
CHECKSUM_OFFSET=MESSAGE_SIZE_OFFSET+MESSAGE_SIZE_SIZE,
HEADER_SIZE=MESSAGE_START_SIZE+COMMAND_SIZE+MESSAGE_SIZE_SIZE+CHECKSUM_SIZE
};
char pchMessageStart[MESSAGE_START_SIZE];
char pchCommand[COMMAND_SIZE];
Expand Down
8 changes: 7 additions & 1 deletion src/serialize.h
Expand Up @@ -706,6 +706,7 @@ struct ser_streamplaceholder



typedef std::vector<char, zero_after_free_allocator<char> > CSerializeData;

/** Double ended buffer combining vector and stream-like interfaces.
*
Expand All @@ -715,7 +716,7 @@ struct ser_streamplaceholder
class CDataStream
{
protected:
typedef std::vector<char, zero_after_free_allocator<char> > vector_type;
typedef CSerializeData vector_type;
vector_type vch;
unsigned int nReadPos;
short state;
Expand Down Expand Up @@ -997,6 +998,11 @@ class CDataStream
::Unserialize(*this, obj, nType, nVersion);
return (*this);
}

void GetAndClear(CSerializeData &data) {
vch.swap(data);
CSerializeData().swap(vch);
}
};


Expand Down

0 comments on commit 3f746d5

Please sign in to comment.