Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 122 additions & 27 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,16 @@ uint32_t nBlockSequenceId = 1;
// Sources of received blocks, to be able to send them reject messages or ban
// them, if processing happens afterwards. Protected by cs_main.
map<uint256, NodeId> mapBlockSource;

// Blocks that are in flight, and that are in the queue to be downloaded.
// Protected by cs_main.
struct QueuedBlock {
uint256 hash;
int64_t nTime; // Time of "getdata" request in microseconds.
int nQueuedBefore; // Number of blocks in flight at the time of request.
};
map<uint256, pair<NodeId, list<QueuedBlock>::iterator> > mapBlocksInFlight;
map<uint256, pair<NodeId, list<uint256>::iterator> > mapBlocksToDownload;
}

//////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -195,10 +205,20 @@ struct CNodeState {
std::string name;
// List of asynchronously-determined block rejections to notify this peer about.
std::vector<CBlockReject> rejects;
list<QueuedBlock> vBlocksInFlight;
int nBlocksInFlight;
list<uint256> vBlocksToDownload;
int nBlocksToDownload;
int64_t nLastBlockReceive;
int64_t nLastBlockProcess;

CNodeState() {
nMisbehavior = 0;
fShouldBan = false;
nBlocksToDownload = 0;
nBlocksInFlight = 0;
nLastBlockReceive = 0;
nLastBlockProcess = 0;
}
};

Expand Down Expand Up @@ -227,8 +247,71 @@ void InitializeNode(NodeId nodeid, const CNode *pnode) {

void FinalizeNode(NodeId nodeid) {
LOCK(cs_main);
CNodeState *state = State(nodeid);

BOOST_FOREACH(const QueuedBlock& entry, state->vBlocksInFlight)
mapBlocksInFlight.erase(entry.hash);
BOOST_FOREACH(const uint256& hash, state->vBlocksToDownload)
mapBlocksToDownload.erase(hash);

mapNodeState.erase(nodeid);
}

// Requires cs_main.
void MarkBlockAsReceived(const uint256 &hash, NodeId nodeFrom = -1) {
map<uint256, pair<NodeId, list<uint256>::iterator> >::iterator itToDownload = mapBlocksToDownload.find(hash);
if (itToDownload != mapBlocksToDownload.end()) {
CNodeState *state = State(itToDownload->second.first);
state->vBlocksToDownload.erase(itToDownload->second.second);
state->nBlocksToDownload--;
mapBlocksToDownload.erase(itToDownload);
}

map<uint256, pair<NodeId, list<QueuedBlock>::iterator> >::iterator itInFlight = mapBlocksInFlight.find(hash);
if (itInFlight != mapBlocksInFlight.end()) {
CNodeState *state = State(itInFlight->second.first);
state->vBlocksInFlight.erase(itInFlight->second.second);
state->nBlocksInFlight--;
if (itInFlight->second.first == nodeFrom)
state->nLastBlockReceive = GetTimeMicros();
mapBlocksInFlight.erase(itInFlight);
}

}

// Requires cs_main.
bool AddBlockToQueue(NodeId nodeid, const uint256 &hash) {
if (mapBlocksToDownload.count(hash) || mapBlocksInFlight.count(hash))
return false;

CNodeState *state = State(nodeid);
if (state == NULL)
return false;

list<uint256>::iterator it = state->vBlocksToDownload.insert(state->vBlocksToDownload.end(), hash);
state->nBlocksToDownload++;
if (state->nBlocksToDownload > 5000)
Misbehaving(nodeid, 10);
mapBlocksToDownload[hash] = std::make_pair(nodeid, it);
return true;
}

// Requires cs_main.
void MarkBlockAsInFlight(NodeId nodeid, const uint256 &hash) {
CNodeState *state = State(nodeid);
assert(state != NULL);

// Make sure it's not listed somewhere already.
MarkBlockAsReceived(hash);
Copy link
Contributor

@rebroad rebroad Oct 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really don't like the use of functions that clearly don't do what their name suggests they do.

What parts of this function are needed? And what would the function be called if it did what it is needing to do here?


QueuedBlock newentry = {hash, GetTimeMicros(), state->nBlocksInFlight};
if (state->nBlocksInFlight == 0)
state->nLastBlockReceive = newentry.nTime; // Reset when a first request is sent.
list<QueuedBlock>::iterator it = state->vBlocksInFlight.insert(state->vBlocksInFlight.end(), newentry);
state->nBlocksInFlight++;
mapBlocksInFlight[hash] = std::make_pair(nodeid, it);
}

}

bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) {
Expand Down Expand Up @@ -1299,6 +1382,7 @@ void CheckForkWarningConditionsOnNewFork(CBlockIndex* pindexNewForkTip)
CheckForkWarningConditions();
}

// Requires cs_main.
void Misbehaving(NodeId pnode, int howmuch)
{
if (howmuch == 0)
Expand Down Expand Up @@ -2021,7 +2105,6 @@ bool AddToBlockIndex(CBlock& block, CValidationState& state, const CDiskBlockPos
pindexNew->nSequenceId = nBlockSequenceId++;
}
assert(pindexNew);
mapAlreadyAskedFor.erase(CInv(MSG_BLOCK, hash));
map<uint256, CBlockIndex*>::iterator mi = mapBlockIndex.insert(make_pair(hash, pindexNew)).first;
pindexNew->phashBlock = &((*mi).first);
map<uint256, CBlockIndex*>::iterator miPrev = mapBlockIndex.find(block.hashPrevBlock);
Expand Down Expand Up @@ -2367,11 +2450,8 @@ bool ProcessBlock(CValidationState &state, CNode* pfrom, CBlock* pblock, CDiskBl
return state.Invalid(error("ProcessBlock() : already have block (orphan) %s", hash.ToString()), 0, "duplicate");

// Preliminary checks
if (!CheckBlock(*pblock, state)) {
if (state.CorruptionPossible())
mapAlreadyAskedFor.erase(CInv(MSG_BLOCK, hash));
if (!CheckBlock(*pblock, state))
return error("ProcessBlock() : CheckBlock FAILED");
}

CBlockIndex* pcheckpoint = Checkpoints::GetLastCheckpoint(mapBlockIndex);
if (pcheckpoint && pblock->hashPrevBlock != (chainActive.Tip() ? chainActive.Tip()->GetBlockHash() : uint256(0)))
Expand Down Expand Up @@ -3223,7 +3303,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
return true;
}


State(pfrom->GetId())->nLastBlockProcess = GetTimeMicros();



Expand Down Expand Up @@ -3426,15 +3506,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
return error("message inv size() = %"PRIszu"", vInv.size());
}

// find last block in inv vector
unsigned int nLastBlock = (unsigned int)(-1);
for (unsigned int nInv = 0; nInv < vInv.size(); nInv++) {
if (vInv[vInv.size() - 1 - nInv].type == MSG_BLOCK) {
nLastBlock = vInv.size() - 1 - nInv;
break;
}
}

LOCK(cs_main);

for (unsigned int nInv = 0; nInv < vInv.size(); nInv++)
Expand All @@ -3448,17 +3519,14 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
LogPrint("net", " got inventory: %s %s\n", inv.ToString(), fAlreadyHave ? "have" : "new");

if (!fAlreadyHave) {
if (!fImporting && !fReindex)
pfrom->AskFor(inv);
if (!fImporting && !fReindex) {
if (inv.type == MSG_BLOCK)
AddBlockToQueue(pfrom->GetId(), inv.hash);
else
pfrom->AskFor(inv);
}
} else if (inv.type == MSG_BLOCK && mapOrphanBlocks.count(inv.hash)) {
PushGetBlocks(pfrom, chainActive.Tip(), GetOrphanRoot(inv.hash));
} else if (nInv == nLastBlock) {
// In case we are on a very long side-chain, it is possible that we already have
// the last block in an inv bundle sent in response to getblocks. Try to detect
// this situation and push another getblocks to continue.
PushGetBlocks(pfrom, mapBlockIndex[inv.hash], uint256(0));
if (fDebug)
LogPrintf("force request: %s\n", inv.ToString());
}

// Track requests for our stuff
Expand Down Expand Up @@ -3665,6 +3733,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
LOCK(cs_main);
// Remember who we got this block from.
mapBlockSource[inv.hash] = pfrom->GetId();
MarkBlockAsReceived(inv.hash, pfrom->GetId());

CValidationState state;
ProcessBlock(state, pfrom, &block);
Expand Down Expand Up @@ -4192,12 +4261,38 @@ bool SendMessages(CNode* pto, bool fSendTrickle)
pto->PushMessage("inv", vInv);


// Detect stalled peers. Require that blocks are in flight, we haven't
// received a (requested) block in one minute, and that all blocks are
// in flight for over two minutes, since we first had a chance to
// process an incoming block.
int64_t nNow = GetTimeMicros();
if (!pto->fDisconnect && state.nBlocksInFlight &&
state.nLastBlockReceive < state.nLastBlockProcess - BLOCK_DOWNLOAD_TIMEOUT*1000000 &&
state.vBlocksInFlight.front().nTime < state.nLastBlockProcess - 2*BLOCK_DOWNLOAD_TIMEOUT*1000000) {
LogPrintf("Peer %s is stalling block download, disconnecting\n", state.name.c_str());
pto->fDisconnect = true;
}

//
// Message: getdata
// Message: getdata (blocks)
//
vector<CInv> vGetData;
int64_t nNow = GetTime() * 1000000;
while (!pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow)
while (!pto->fDisconnect && state.nBlocksToDownload && state.nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
uint256 hash = state.vBlocksToDownload.front();
vGetData.push_back(CInv(MSG_BLOCK, hash));
MarkBlockAsInFlight(pto->GetId(), hash);
LogPrint("net", "Requesting block %s from %s\n", hash.ToString().c_str(), state.name.c_str());
if (vGetData.size() >= 1000)
{
pto->PushMessage("getdata", vGetData);
vGetData.clear();
}
}

//
// Message: getdata (non-blocks)
//
while (!pto->fDisconnect && !pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow)
{
const CInv& inv = (*pto->mapAskFor.begin()).second;
if (!AlreadyHave(inv))
Expand Down
8 changes: 8 additions & 0 deletions src/main.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ static const int COINBASE_MATURITY = 100;
static const unsigned int LOCKTIME_THRESHOLD = 500000000; // Tue Nov 5 00:53:20 1985 UTC
/** Maximum number of script-checking threads allowed */
static const int MAX_SCRIPTCHECK_THREADS = 16;
/** Number of blocks that can be requested at any given time from a single peer. */
static const int MAX_BLOCKS_IN_TRANSIT_PER_PEER = 128;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Magic number... why so high? In the later stages of block download I'd have thought a number close to 1 would be appropriate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be a further optimization, yes.

/** Timeout in seconds before considering a block download peer unresponsive. */
static const unsigned int BLOCK_DOWNLOAD_TIMEOUT = 60;

#ifdef USE_UPNP
static const int fHaveUPnP = true;
#else
Expand Down Expand Up @@ -182,6 +187,9 @@ bool VerifySignature(const CCoins& txFrom, const CTransaction& txTo, unsigned in
bool AbortNode(const std::string &msg);
/** Get statistics from node state */
bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats);
/** Increase a node's misbehavior score. */
void Misbehaving(NodeId nodeid, int howmuch);


/** (try to) add transaction to memory pool **/
bool AcceptToMemoryPool(CTxMemPool& pool, CValidationState &state, const CTransaction &tx, bool fLimitFree,
Expand Down
2 changes: 1 addition & 1 deletion src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ class CNode
LogPrint("net", "askfor %s %"PRId64" (%s)\n", inv.ToString().c_str(), nRequestTime, DateTimeStrFormat("%H:%M:%S", nRequestTime/1000000).c_str());

// Make sure not to reuse time indexes to keep things in the same order
int64_t nNow = (GetTime() - 1) * 1000000;
int64_t nNow = GetTimeMicros() - 1000000;
static int64_t nLastTime;
++nLastTime;
nNow = std::max(nNow, nLastTime);
Expand Down
1 change: 0 additions & 1 deletion src/test/DoS_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
// Tests this internal-to-main.cpp method:
extern bool AddOrphanTx(const CTransaction& tx);
extern unsigned int LimitOrphanTxSize(unsigned int nMaxOrphans);
extern void Misbehaving(NodeId nodeid, int howmuch);
extern std::map<uint256, CTransaction> mapOrphanTransactions;
extern std::map<uint256, std::set<uint256> > mapOrphanTransactionsByPrev;

Expand Down