Skip to content

Commit

Permalink
Merge pull request #708 from denravonska/deadlock-fixes
Browse files Browse the repository at this point in the history
Deadlock fixes
  • Loading branch information
denravonska committed Nov 4, 2017
2 parents 826c8e5 + 0da4d51 commit ad739d4
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 155 deletions.
297 changes: 154 additions & 143 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6860,7 +6860,13 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,

// requires LOCK(cs_vRecvMsg)
bool ProcessMessages(CNode* pfrom)
{
{
LOCK(cs_main);

TRY_LOCK(pfrom->cs_vRecvMsg, lockRecv);
if (!lockRecv)
return true;

//
// Message format
// (4) message start
Expand Down Expand Up @@ -6928,10 +6934,7 @@ bool ProcessMessages(CNode* pfrom)
bool fRet = false;
try
{
{
LOCK(cs_main);
fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime);
}
fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime);
if (fShutdown)
break;
}
Expand Down Expand Up @@ -7632,184 +7635,192 @@ void TrackRequests(CNode* pfrom,std::string sRequestType)

bool SendMessages(CNode* pto, bool fSendTrickle)
{
// Treat lock failures as send successes in case the caller disconnects
// the node based on the return value.
TRY_LOCK(cs_main, lockMain);
if (lockMain) {
// Don't send anything until we get their version message
if (pto->nVersion == 0)
return true;
if(!lockMain)
return true;

//
// Message: ping
//
bool pingSend = false;
if (pto->fPingQueued)
{
// RPC ping request by user
pingSend = true;
TRY_LOCK(pto->cs_vSend, lockSend);
if (!lockSend)
return true;

// Don't send anything until we get their version message
if (pto->nVersion == 0)
return true;

//
// Message: ping
//
bool pingSend = false;
if (pto->fPingQueued)
{
// RPC ping request by user
pingSend = true;
}
if (pto->nPingNonceSent == 0 && pto->nPingUsecStart + PING_INTERVAL * 1000000 < GetTimeMicros())
{
// Ping automatically sent as a latency probe & keepalive.
pingSend = true;
}
if (pingSend)
{
uint64_t nonce = 0;
while (nonce == 0) {
RAND_bytes((unsigned char*)&nonce, sizeof(nonce));
}
if (pto->nPingNonceSent == 0 && pto->nPingUsecStart + PING_INTERVAL * 1000000 < GetTimeMicros())
pto->fPingQueued = false;
pto->nPingUsecStart = GetTimeMicros();
if (pto->nVersion > BIP0031_VERSION)
{
// Ping automatically sent as a latency probe & keepalive.
pingSend = true;
}
if (pingSend)
pto->nPingNonceSent = nonce;
std::string acid = GetCommandNonce("ping");
pto->PushMessage("ping", nonce, acid);
} else
{
uint64_t nonce = 0;
while (nonce == 0) {
RAND_bytes((unsigned char*)&nonce, sizeof(nonce));
}
pto->fPingQueued = false;
pto->nPingUsecStart = GetTimeMicros();
if (pto->nVersion > BIP0031_VERSION)
{
pto->nPingNonceSent = nonce;
std::string acid = GetCommandNonce("ping");
pto->PushMessage("ping", nonce, acid);
} else
{
// Peer is too old to support ping command with nonce, pong will never arrive.
pto->nPingNonceSent = 0;
pto->PushMessage("ping");
}
// Peer is too old to support ping command with nonce, pong will never arrive.
pto->nPingNonceSent = 0;
pto->PushMessage("ping");
}
}

// Resend wallet transactions that haven't gotten in a block yet
ResendWalletTransactions();
// Resend wallet transactions that haven't gotten in a block yet
ResendWalletTransactions();

// Address refresh broadcast
static int64_t nLastRebroadcast;
if (!IsInitialBlockDownload() && ( GetAdjustedTime() - nLastRebroadcast > 24 * 60 * 60))
// Address refresh broadcast
static int64_t nLastRebroadcast;
if (!IsInitialBlockDownload() && ( GetAdjustedTime() - nLastRebroadcast > 24 * 60 * 60))
{
{
LOCK(cs_vNodes);
for (auto const& pnode : vNodes)
{
LOCK(cs_vNodes);
for (auto const& pnode : vNodes)
{
// Periodically clear setAddrKnown to allow refresh broadcasts
if (nLastRebroadcast)
pnode->setAddrKnown.clear();
// Periodically clear setAddrKnown to allow refresh broadcasts
if (nLastRebroadcast)
pnode->setAddrKnown.clear();

// Rebroadcast our address
if (!fNoListen)
{
CAddress addr = GetLocalAddress(&pnode->addr);
if (addr.IsRoutable())
pnode->PushAddress(addr);
}
// Rebroadcast our address
if (!fNoListen)
{
CAddress addr = GetLocalAddress(&pnode->addr);
if (addr.IsRoutable())
pnode->PushAddress(addr);
}
}
nLastRebroadcast = GetAdjustedTime();
}
nLastRebroadcast = GetAdjustedTime();
}

//
// Message: addr
//
if (fSendTrickle)
//
// Message: addr
//
if (fSendTrickle)
{
vector<CAddress> vAddr;
vAddr.reserve(pto->vAddrToSend.size());
for (auto const& addr : pto->vAddrToSend)
{
vector<CAddress> vAddr;
vAddr.reserve(pto->vAddrToSend.size());
for (auto const& addr : pto->vAddrToSend)
// returns true if wasn't already contained in the set
if (pto->setAddrKnown.insert(addr).second)
{
// returns true if wasn't already contained in the set
if (pto->setAddrKnown.insert(addr).second)
vAddr.push_back(addr);
// receiver rejects addr messages larger than 1000
if (vAddr.size() >= 1000)
{
vAddr.push_back(addr);
// receiver rejects addr messages larger than 1000
if (vAddr.size() >= 1000)
{
pto->PushMessage("gridaddr", vAddr);
vAddr.clear();
}
pto->PushMessage("gridaddr", vAddr);
vAddr.clear();
}
}
pto->vAddrToSend.clear();
if (!vAddr.empty())
pto->PushMessage("gridaddr", vAddr);
}
pto->vAddrToSend.clear();
if (!vAddr.empty())
pto->PushMessage("gridaddr", vAddr);
}


//
// Message: inventory
//
vector<CInv> vInv;
vector<CInv> vInvWait;
//
// Message: inventory
//
vector<CInv> vInv;
vector<CInv> vInvWait;
{
LOCK(pto->cs_inventory);
vInv.reserve(pto->vInventoryToSend.size());
vInvWait.reserve(pto->vInventoryToSend.size());
for (auto const& inv : pto->vInventoryToSend)
{
LOCK(pto->cs_inventory);
vInv.reserve(pto->vInventoryToSend.size());
vInvWait.reserve(pto->vInventoryToSend.size());
for (auto const& inv : pto->vInventoryToSend)
{
if (pto->setInventoryKnown.count(inv))
continue;
if (pto->setInventoryKnown.count(inv))
continue;

// trickle out tx inv to protect privacy
if (inv.type == MSG_TX && !fSendTrickle)
// trickle out tx inv to protect privacy
if (inv.type == MSG_TX && !fSendTrickle)
{
// 1/4 of tx invs blast to all immediately
static uint256 hashSalt;
if (hashSalt == 0)
hashSalt = GetRandHash();
uint256 hashRand = inv.hash ^ hashSalt;
hashRand = Hash(BEGIN(hashRand), END(hashRand));
bool fTrickleWait = ((hashRand & 3) != 0);

// always trickle our own transactions
if (!fTrickleWait)
{
// 1/4 of tx invs blast to all immediately
static uint256 hashSalt;
if (hashSalt == 0)
hashSalt = GetRandHash();
uint256 hashRand = inv.hash ^ hashSalt;
hashRand = Hash(BEGIN(hashRand), END(hashRand));
bool fTrickleWait = ((hashRand & 3) != 0);

// always trickle our own transactions
if (!fTrickleWait)
{
CWalletTx wtx;
if (GetTransaction(inv.hash, wtx))
if (wtx.fFromMe)
fTrickleWait = true;
}
CWalletTx wtx;
if (GetTransaction(inv.hash, wtx))
if (wtx.fFromMe)
fTrickleWait = true;
}

if (fTrickleWait)
{
vInvWait.push_back(inv);
continue;
}
if (fTrickleWait)
{
vInvWait.push_back(inv);
continue;
}
}

// returns true if wasn't already contained in the set
if (pto->setInventoryKnown.insert(inv).second)
// returns true if wasn't already contained in the set
if (pto->setInventoryKnown.insert(inv).second)
{
vInv.push_back(inv);
if (vInv.size() >= 1000)
{
vInv.push_back(inv);
if (vInv.size() >= 1000)
{
pto->PushMessage("inv", vInv);
vInv.clear();
}
pto->PushMessage("inv", vInv);
vInv.clear();
}
}
pto->vInventoryToSend = vInvWait;
}
if (!vInv.empty())
pto->PushMessage("inv", vInv);
pto->vInventoryToSend = vInvWait;
}
if (!vInv.empty())
pto->PushMessage("inv", vInv);


//
// Message: getdata
//
vector<CInv> vGetData;
int64_t nNow = GetAdjustedTime() * 1000000;
CTxDB txdb("r");
while (!pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow)
//
// Message: getdata
//
vector<CInv> vGetData;
int64_t nNow = GetAdjustedTime() * 1000000;
CTxDB txdb("r");
while (!pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow)
{
const CInv& inv = (*pto->mapAskFor.begin()).second;
if (!AlreadyHave(txdb, inv))
{
const CInv& inv = (*pto->mapAskFor.begin()).second;
if (!AlreadyHave(txdb, inv))
if (fDebugNet) printf("sending getdata: %s\n", inv.ToString().c_str());
vGetData.push_back(inv);
if (vGetData.size() >= 1000)
{
if (fDebugNet) printf("sending getdata: %s\n", inv.ToString().c_str());
vGetData.push_back(inv);
if (vGetData.size() >= 1000)
{
pto->PushMessage("getdata", vGetData);
vGetData.clear();
}
mapAlreadyAskedFor[inv] = nNow;
pto->PushMessage("getdata", vGetData);
vGetData.clear();
}
pto->mapAskFor.erase(pto->mapAskFor.begin());
mapAlreadyAskedFor[inv] = nNow;
}
if (!vGetData.empty())
pto->PushMessage("getdata", vGetData);
pto->mapAskFor.erase(pto->mapAskFor.begin());
}
if (!vGetData.empty())
pto->PushMessage("getdata", vGetData);

return true;
}

Expand Down
17 changes: 5 additions & 12 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1982,26 +1982,19 @@ void ThreadMessageHandler2(void* parg)
pnodeTrickle = vNodesCopy[GetRand(vNodesCopy.size())];
for (auto const& pnode : vNodesCopy)
{

if (pnode->fDisconnect)
continue;

//11-25-2015
// Receive messages
{
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
if (lockRecv)
if (!ProcessMessages(pnode))
pnode->CloseSocketDisconnect();
}
if (!ProcessMessages(pnode))
pnode->CloseSocketDisconnect();

if (fShutdown)
return;

// Send messages
{
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend)
SendMessages(pnode, pnode == pnodeTrickle);
}
SendMessages(pnode, pnode == pnodeTrickle);
if (fShutdown)
return;
}
Expand Down

0 comments on commit ad739d4

Please sign in to comment.