diff --git a/src/main.cpp b/src/main.cpp index d468339ce6..17e790b3c6 100755 --- a/src/main.cpp +++ b/src/main.cpp @@ -6860,7 +6860,13 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, // requires LOCK(cs_vRecvMsg) bool ProcessMessages(CNode* pfrom) -{ +{ + LOCK(cs_main); + + TRY_LOCK(pfrom->cs_vRecvMsg, lockRecv); + if (!lockRecv) + return true; + // // Message format // (4) message start @@ -6928,10 +6934,7 @@ bool ProcessMessages(CNode* pfrom) bool fRet = false; try { - { - LOCK(cs_main); - fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime); - } + fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime); if (fShutdown) break; } @@ -7632,184 +7635,192 @@ void TrackRequests(CNode* pfrom,std::string sRequestType) bool SendMessages(CNode* pto, bool fSendTrickle) { + // Treat lock failures as send successes in case the caller disconnects + // the node based on the return value. TRY_LOCK(cs_main, lockMain); - if (lockMain) { - // Don't send anything until we get their version message - if (pto->nVersion == 0) - return true; + if(!lockMain) + return true; - // - // Message: ping - // - bool pingSend = false; - if (pto->fPingQueued) - { - // RPC ping request by user - pingSend = true; + TRY_LOCK(pto->cs_vSend, lockSend); + if (!lockSend) + return true; + + // Don't send anything until we get their version message + if (pto->nVersion == 0) + return true; + + // + // Message: ping + // + bool pingSend = false; + if (pto->fPingQueued) + { + // RPC ping request by user + pingSend = true; + } + if (pto->nPingNonceSent == 0 && pto->nPingUsecStart + PING_INTERVAL * 1000000 < GetTimeMicros()) + { + // Ping automatically sent as a latency probe & keepalive. + pingSend = true; + } + if (pingSend) + { + uint64_t nonce = 0; + while (nonce == 0) { + RAND_bytes((unsigned char*)&nonce, sizeof(nonce)); } - if (pto->nPingNonceSent == 0 && pto->nPingUsecStart + PING_INTERVAL * 1000000 < GetTimeMicros()) + pto->fPingQueued = false; + pto->nPingUsecStart = GetTimeMicros(); + if (pto->nVersion > BIP0031_VERSION) { - // Ping automatically sent as a latency probe & keepalive. - pingSend = true; - } - if (pingSend) + pto->nPingNonceSent = nonce; + std::string acid = GetCommandNonce("ping"); + pto->PushMessage("ping", nonce, acid); + } else { - uint64_t nonce = 0; - while (nonce == 0) { - RAND_bytes((unsigned char*)&nonce, sizeof(nonce)); - } - pto->fPingQueued = false; - pto->nPingUsecStart = GetTimeMicros(); - if (pto->nVersion > BIP0031_VERSION) - { - pto->nPingNonceSent = nonce; - std::string acid = GetCommandNonce("ping"); - pto->PushMessage("ping", nonce, acid); - } else - { - // Peer is too old to support ping command with nonce, pong will never arrive. - pto->nPingNonceSent = 0; - pto->PushMessage("ping"); - } + // Peer is too old to support ping command with nonce, pong will never arrive. + pto->nPingNonceSent = 0; + pto->PushMessage("ping"); } + } - // Resend wallet transactions that haven't gotten in a block yet - ResendWalletTransactions(); + // Resend wallet transactions that haven't gotten in a block yet + ResendWalletTransactions(); - // Address refresh broadcast - static int64_t nLastRebroadcast; - if (!IsInitialBlockDownload() && ( GetAdjustedTime() - nLastRebroadcast > 24 * 60 * 60)) + // Address refresh broadcast + static int64_t nLastRebroadcast; + if (!IsInitialBlockDownload() && ( GetAdjustedTime() - nLastRebroadcast > 24 * 60 * 60)) + { { + LOCK(cs_vNodes); + for (auto const& pnode : vNodes) { - LOCK(cs_vNodes); - for (auto const& pnode : vNodes) - { - // Periodically clear setAddrKnown to allow refresh broadcasts - if (nLastRebroadcast) - pnode->setAddrKnown.clear(); + // Periodically clear setAddrKnown to allow refresh broadcasts + if (nLastRebroadcast) + pnode->setAddrKnown.clear(); - // Rebroadcast our address - if (!fNoListen) - { - CAddress addr = GetLocalAddress(&pnode->addr); - if (addr.IsRoutable()) - pnode->PushAddress(addr); - } + // Rebroadcast our address + if (!fNoListen) + { + CAddress addr = GetLocalAddress(&pnode->addr); + if (addr.IsRoutable()) + pnode->PushAddress(addr); } } - nLastRebroadcast = GetAdjustedTime(); } + nLastRebroadcast = GetAdjustedTime(); + } - // - // Message: addr - // - if (fSendTrickle) + // + // Message: addr + // + if (fSendTrickle) + { + vector vAddr; + vAddr.reserve(pto->vAddrToSend.size()); + for (auto const& addr : pto->vAddrToSend) { - vector vAddr; - vAddr.reserve(pto->vAddrToSend.size()); - for (auto const& addr : pto->vAddrToSend) + // returns true if wasn't already contained in the set + if (pto->setAddrKnown.insert(addr).second) { - // returns true if wasn't already contained in the set - if (pto->setAddrKnown.insert(addr).second) + vAddr.push_back(addr); + // receiver rejects addr messages larger than 1000 + if (vAddr.size() >= 1000) { - vAddr.push_back(addr); - // receiver rejects addr messages larger than 1000 - if (vAddr.size() >= 1000) - { - pto->PushMessage("gridaddr", vAddr); - vAddr.clear(); - } + pto->PushMessage("gridaddr", vAddr); + vAddr.clear(); } } - pto->vAddrToSend.clear(); - if (!vAddr.empty()) - pto->PushMessage("gridaddr", vAddr); } + pto->vAddrToSend.clear(); + if (!vAddr.empty()) + pto->PushMessage("gridaddr", vAddr); + } - // - // Message: inventory - // - vector vInv; - vector vInvWait; + // + // Message: inventory + // + vector vInv; + vector vInvWait; + { + LOCK(pto->cs_inventory); + vInv.reserve(pto->vInventoryToSend.size()); + vInvWait.reserve(pto->vInventoryToSend.size()); + for (auto const& inv : pto->vInventoryToSend) { - LOCK(pto->cs_inventory); - vInv.reserve(pto->vInventoryToSend.size()); - vInvWait.reserve(pto->vInventoryToSend.size()); - for (auto const& inv : pto->vInventoryToSend) - { - if (pto->setInventoryKnown.count(inv)) - continue; + if (pto->setInventoryKnown.count(inv)) + continue; - // trickle out tx inv to protect privacy - if (inv.type == MSG_TX && !fSendTrickle) + // trickle out tx inv to protect privacy + if (inv.type == MSG_TX && !fSendTrickle) + { + // 1/4 of tx invs blast to all immediately + static uint256 hashSalt; + if (hashSalt == 0) + hashSalt = GetRandHash(); + uint256 hashRand = inv.hash ^ hashSalt; + hashRand = Hash(BEGIN(hashRand), END(hashRand)); + bool fTrickleWait = ((hashRand & 3) != 0); + + // always trickle our own transactions + if (!fTrickleWait) { - // 1/4 of tx invs blast to all immediately - static uint256 hashSalt; - if (hashSalt == 0) - hashSalt = GetRandHash(); - uint256 hashRand = inv.hash ^ hashSalt; - hashRand = Hash(BEGIN(hashRand), END(hashRand)); - bool fTrickleWait = ((hashRand & 3) != 0); - - // always trickle our own transactions - if (!fTrickleWait) - { - CWalletTx wtx; - if (GetTransaction(inv.hash, wtx)) - if (wtx.fFromMe) - fTrickleWait = true; - } + CWalletTx wtx; + if (GetTransaction(inv.hash, wtx)) + if (wtx.fFromMe) + fTrickleWait = true; + } - if (fTrickleWait) - { - vInvWait.push_back(inv); - continue; - } + if (fTrickleWait) + { + vInvWait.push_back(inv); + continue; } + } - // returns true if wasn't already contained in the set - if (pto->setInventoryKnown.insert(inv).second) + // returns true if wasn't already contained in the set + if (pto->setInventoryKnown.insert(inv).second) + { + vInv.push_back(inv); + if (vInv.size() >= 1000) { - vInv.push_back(inv); - if (vInv.size() >= 1000) - { - pto->PushMessage("inv", vInv); - vInv.clear(); - } + pto->PushMessage("inv", vInv); + vInv.clear(); } } - pto->vInventoryToSend = vInvWait; } - if (!vInv.empty()) - pto->PushMessage("inv", vInv); + pto->vInventoryToSend = vInvWait; + } + if (!vInv.empty()) + pto->PushMessage("inv", vInv); - // - // Message: getdata - // - vector vGetData; - int64_t nNow = GetAdjustedTime() * 1000000; - CTxDB txdb("r"); - while (!pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow) + // + // Message: getdata + // + vector vGetData; + int64_t nNow = GetAdjustedTime() * 1000000; + CTxDB txdb("r"); + while (!pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow) + { + const CInv& inv = (*pto->mapAskFor.begin()).second; + if (!AlreadyHave(txdb, inv)) { - const CInv& inv = (*pto->mapAskFor.begin()).second; - if (!AlreadyHave(txdb, inv)) + if (fDebugNet) printf("sending getdata: %s\n", inv.ToString().c_str()); + vGetData.push_back(inv); + if (vGetData.size() >= 1000) { - if (fDebugNet) printf("sending getdata: %s\n", inv.ToString().c_str()); - vGetData.push_back(inv); - if (vGetData.size() >= 1000) - { - pto->PushMessage("getdata", vGetData); - vGetData.clear(); - } - mapAlreadyAskedFor[inv] = nNow; + pto->PushMessage("getdata", vGetData); + vGetData.clear(); } - pto->mapAskFor.erase(pto->mapAskFor.begin()); + mapAlreadyAskedFor[inv] = nNow; } - if (!vGetData.empty()) - pto->PushMessage("getdata", vGetData); + pto->mapAskFor.erase(pto->mapAskFor.begin()); } + if (!vGetData.empty()) + pto->PushMessage("getdata", vGetData); + return true; } diff --git a/src/net.cpp b/src/net.cpp index 9709bc77d2..7364f7252e 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1982,26 +1982,19 @@ void ThreadMessageHandler2(void* parg) pnodeTrickle = vNodesCopy[GetRand(vNodesCopy.size())]; for (auto const& pnode : vNodesCopy) { - if (pnode->fDisconnect) continue; + //11-25-2015 // Receive messages - { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv) - if (!ProcessMessages(pnode)) - pnode->CloseSocketDisconnect(); - } + if (!ProcessMessages(pnode)) + pnode->CloseSocketDisconnect(); + if (fShutdown) return; // Send messages - { - TRY_LOCK(pnode->cs_vSend, lockSend); - if (lockSend) - SendMessages(pnode, pnode == pnodeTrickle); - } + SendMessages(pnode, pnode == pnodeTrickle); if (fShutdown) return; }