Skip to content

Commit

Permalink
Add Ping and use it for keep alive
Browse files Browse the repository at this point in the history
  • Loading branch information
Tranz5 committed Aug 23, 2014
1 parent dd189ad commit ebafde9
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 104 deletions.
1 change: 1 addition & 0 deletions src/bitcoinrpc.cpp
Expand Up @@ -198,6 +198,7 @@ static const CRPCCommand vRPCCommands[] =
{ "getbestblockhash", &getbestblockhash, true, false, false },
{ "getconnectioncount", &getconnectioncount, true, false, false },
{ "getpeerinfo", &getpeerinfo, true, false, false },
{ "ping", &ping, true, false, false },
{ "addnode", &addnode, true, true, false },
{ "getaddednodeinfo", &getaddednodeinfo, true, true, false },
{ "getdifficulty", &getdifficulty, true, false, false },
Expand Down
1 change: 1 addition & 0 deletions src/bitcoinrpc.h
Expand Up @@ -137,6 +137,7 @@ extern void EnsureWalletIsUnlocked(CWallet* pWallet = NULL);

extern json_spirit::Value getconnectioncount(CWallet* pWallet, const json_spirit::Array& params, bool fHelp); // in rpcnet.cpp
extern json_spirit::Value getpeerinfo(CWallet* pWallet, const json_spirit::Array& params, bool fHelp);
extern json_spirit::Value ping(CWallet* pWallet, const json_spirit::Array& params, bool fHelp);
extern json_spirit::Value addnode(CWallet* pWallet, const json_spirit::Array& params, bool fHelp);
extern json_spirit::Value getaddednodeinfo(CWallet* pWallet, const json_spirit::Array& params, bool fHelp);
extern json_spirit::Value dumpwallet(CWallet* pWallet, const json_spirit::Array& params, bool fHelp);
Expand Down
91 changes: 81 additions & 10 deletions src/main.cpp
Expand Up @@ -3039,7 +3039,7 @@ bool static AlreadyHave(CTxDB& txdb, const CInv& inv)
// a large 4-byte int at any alignment.
unsigned char pchMessageStart[4] = { 0xe4, 0xe8, 0xe9, 0xe5 };

bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64 nTimeReceived)
{
static map<CService, CPubKey> mapReuseKey;
RandAddSeedPerfmon();
Expand Down Expand Up @@ -3637,7 +3637,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
tracker.fn(tracker.param1, vRecv);
}


else if (strCommand == "ping")
{
if (pfrom->nVersion > BIP0031_VERSION)
Expand All @@ -3659,6 +3658,61 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
}
}

else if (strCommand == "pong")
{
int64 pingUsecEnd = nTimeReceived;
uint64 nonce = 0;
size_t nAvail = vRecv.in_avail();
bool bPingFinished = false;
std::string sProblem;

if (nAvail >= sizeof(nonce)) {
vRecv >> nonce;

// Only process pong message if there is an outstanding ping (old ping without nonce should never pong)
if (pfrom->nPingNonceSent != 0) {
if (nonce == pfrom->nPingNonceSent) {
// Matching pong received, this ping is no longer outstanding
bPingFinished = true;
int64 pingUsecTime = pingUsecEnd - pfrom->nPingUsecStart;
if (pingUsecTime > 0) {
// Successful ping time measurement, replace previous
pfrom->nPingUsecTime = pingUsecTime;
} else {
// This should never happen
sProblem = "Timing mishap";
}
} else {
// Nonce mismatches are normal when pings are overlapping
sProblem = "Nonce mismatch";
if (nonce == 0) {
// This is most likely a bug in another implementation somewhere, cancel this ping
bPingFinished = true;
sProblem = "Nonce zero";
}
}
} else {
sProblem = "Unsolicited pong without ping";
}
} else {
// This is most likely a bug in another implementation somewhere, cancel this ping
bPingFinished = true;
sProblem = "Short payload";
}

if (!(sProblem.empty())) {
printf("pong %s %s: %s, %"PRI64x" expected, %"PRI64x" received, %zu bytes\n"
, pfrom->addr.ToString().c_str()
, pfrom->strSubVer.c_str()
, sProblem.c_str()
, pfrom->nPingNonceSent
, nonce
, nAvail);
}
if (bPingFinished) {
pfrom->nPingNonceSent = 0;
}
}

else if (strCommand == "alert")
{
Expand Down Expand Up @@ -3690,7 +3744,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
}
}


else
{
// Ignore unknown commands for extensibility
Expand Down Expand Up @@ -3783,7 +3836,7 @@ bool ProcessMessages(CNode* pfrom)
{
{
LOCK(cs_main);
fRet = ProcessMessage(pfrom, strCommand, vRecv);
fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime);
}
if (fShutdown)
break;
Expand Down Expand Up @@ -3830,15 +3883,33 @@ bool SendMessages(CNode* pto, bool fSendTrickle)
// Don't send anything until we get their version message
if (pto->nVersion == 0)
return true;

// Keep-alive ping. We send a nonce of zero because we don't use it anywhere
// right now.
if (pto->nLastSend && GetTime() - pto->nLastSend > 30 * 60 && pto->vSendMsg.empty()) {
//
// Message: ping
//
bool pingSend = false;
if (pto->fPingQueued) {
// RPC ping request by user
pingSend = true;
}
if (pto->nPingNonceSent == 0 && pto->nPingUsecStart + PING_INTERVAL * 1000000 < GetTimeMicros()) {
// Ping automatically sent as a latency probe & keepalive.
pingSend = true;
}
if (pingSend) {
uint64 nonce = 0;
if (pto->nVersion > BIP0031_VERSION)
while (nonce == 0) {
RAND_bytes((unsigned char*)&nonce, sizeof(nonce));
}
pto->fPingQueued = false;
pto->nPingUsecStart = GetTimeMicros();
if (pto->nVersion > BIP0031_VERSION) {
pto->nPingNonceSent = nonce;
pto->PushMessage("ping", nonce);
else
} else {
// Peer is too old to support ping command with nonce, pong will never arrive.
pto->nPingNonceSent = 0;
pto->PushMessage("ping");
}
}

// Resend wallet transactions that haven't gotten in a block yet
Expand Down
36 changes: 29 additions & 7 deletions src/net.cpp
Expand Up @@ -625,6 +625,21 @@ void CNode::copyStats(CNodeStats &stats)
X(nSendBytes);
X(nRecvBytes);
X(nBlocksRequested);

// It is common for nodes with good ping times to suddenly become lagged,
// due to a new block arriving or other large transfer.
// Merely reporting pingtime might fool the caller into thinking the node was still responsive,
// since pingtime does not update until the ping is complete, which might take a while.
// So, if a ping is taking an unusually long time in flight,
// the caller can immediately detect that this is happening.
int64 nPingUsecWait = 0;
if ((0 != nPingNonceSent) && (0 != nPingUsecStart)) {
nPingUsecWait = GetTimeMicros() - nPingUsecStart;
}

// Raw ping time is in microseconds, but show it to user as whole seconds (Bitcoin users should be well used to small numbers with many decimal places by now :)
stats.dPingTime = (((double)nPingUsecTime) / 1e6);
stats.dPingWait = (((double)nPingUsecWait) / 1e6);
}
#undef X

Expand Down Expand Up @@ -652,6 +667,9 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes)

pch += handled;
nBytes -= handled;

if (msg.complete())
msg.nTime = GetTimeMicros();
}

return true;
Expand Down Expand Up @@ -1046,23 +1064,27 @@ void ThreadSocketHandler2(void* parg)
//
// Inactivity checking
//
if (pnode->vSendMsg.empty())
pnode->nLastSendEmpty = GetTime();
if (GetTime() - pnode->nTimeConnected > 60)
int64 nTime = GetTime();
if (nTime - pnode->nTimeConnected > 60)
{
if (pnode->nLastRecv == 0 || pnode->nLastSend == 0)
{
printf("socket no message in first 60 seconds, %d %d\n", pnode->nLastRecv != 0, pnode->nLastSend != 0);
pnode->fDisconnect = true;
}
else if (GetTime() - pnode->nLastSend > 90*60 && GetTime() - pnode->nLastSendEmpty > 90*60)
else if (nTime - pnode->nLastSend > TIMEOUT_INTERVAL)
{
printf("socket sending timeout: %"PRI64d"s\n", nTime - pnode->nLastSend);
pnode->fDisconnect = true;
}
else if (nTime - pnode->nLastRecv > (pnode->nVersion > BIP0031_VERSION ? TIMEOUT_INTERVAL : 90*60))
{
printf("socket not sending\n");
printf("socket receive timeout: %"PRI64d"s\n", nTime - pnode->nLastRecv);
pnode->fDisconnect = true;
}
else if (GetTime() - pnode->nLastRecv > 90*60)
else if (pnode->nPingNonceSent && pnode->nPingUsecStart + TIMEOUT_INTERVAL * 1000000 < GetTimeMicros())
{
printf("socket inactivity timeout\n");
printf("ping timeout: %fs\n", 0.000001 * (GetTimeMicros() - pnode->nPingUsecStart));
pnode->fDisconnect = true;
}
}
Expand Down
26 changes: 24 additions & 2 deletions src/net.h
Expand Up @@ -10,6 +10,7 @@
#include <boost/foreach.hpp>
#include <openssl/rand.h>


#ifndef WIN32
#include <arpa/inet.h>
#endif
Expand All @@ -25,6 +26,10 @@ class CBlockIndex;
extern int nBestHeight;


/** Time between pings automatically sent out for latency probing and keepalive (in seconds). */
static const int PING_INTERVAL = 2 * 60;
/** Time after which to disconnect, after waiting for a ping response (or inactivity). */
static const int TIMEOUT_INTERVAL = 20 * 60;

inline unsigned int ReceiveFloodSize() { return 1000*GetArg("-maxreceivebuffer", 5*1000); }
inline unsigned int SendBufferSize() { return 1000*GetArg("-maxsendbuffer", 1*1000); }
Expand Down Expand Up @@ -150,6 +155,8 @@ class CNodeStats
uint64 nSendBytes;
uint64 nRecvBytes;
uint64 nBlocksRequested;
double dPingTime;
double dPingWait;
};


Expand All @@ -165,11 +172,14 @@ class CNetMessage {
CDataStream vRecv; // received message data
unsigned int nDataPos;

int64 nTime; // time (in microseconds) of message receipt.

CNetMessage(int nTypeIn, int nVersionIn) : hdrbuf(nTypeIn, nVersionIn), vRecv(nTypeIn, nVersionIn) {
hdrbuf.resize(24);
in_data = false;
nHdrPos = 0;
nDataPos = 0;
nTime = 0;
}

bool complete() const
Expand Down Expand Up @@ -212,7 +222,6 @@ class CNode

int64 nLastSend;
int64 nLastRecv;
int64 nLastSendEmpty;
int64 nTimeConnected;
uint64 nBlocksRequested;
uint64 nRecvBytes;
Expand Down Expand Up @@ -259,14 +268,23 @@ class CNode
CCriticalSection cs_inventory;
std::multimap<int64, CInv> mapAskFor;

// Ping time measurement:
// The pong reply we're expecting, or 0 if no pong expected.
uint64 nPingNonceSent;
// Time (in usec) the last ping was sent, or 0 if no ping was ever sent.
int64 nPingUsecStart;
// Last measured round-trip time.
int64 nPingUsecTime;
// Whether a ping is requested.
bool fPingQueued;

CNode(SOCKET hSocketIn, CAddress addrIn, std::string addrNameIn = "", bool fInboundIn=false) : ssSend(SER_NETWORK, INIT_PROTO_VERSION), setAddrKnown(5000)
{
nServices = 0;
hSocket = hSocketIn;
nRecvVersion = INIT_PROTO_VERSION;
nLastSend = 0;
nLastRecv = 0;
nLastSendEmpty = GetTime();
nTimeConnected = GetTime();
nSendBytes = 0;
nRecvBytes = 0;
Expand All @@ -292,6 +310,10 @@ class CNode
nMisbehavior = 0;
hashCheckpointKnown = 0;
setInventoryKnown.max_size(SendBufferSize() / 1000);
nPingNonceSent = 0;
nPingUsecStart = 0;
nPingUsecTime = 0;
fPingQueued = false;

// Be shy and don't send version until we hear
if (hSocket != INVALID_SOCKET && !fInbound)
Expand Down

0 comments on commit ebafde9

Please sign in to comment.