Skip to content

Commit

Permalink
p2p: Keep cache of recent Dandelion transactions for stem expiry
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcoFalke committed Aug 21, 2018
1 parent bbe290e commit 8b533b2
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 2 deletions.
17 changes: 17 additions & 0 deletions src/net.h
Expand Up @@ -36,6 +36,8 @@

class CScheduler;
class CNode;
class CTransaction;
using CTransactionRef = std::shared_ptr<const CTransaction>;

/** Default for -dandelion stem percentage */
static constexpr int64_t DEFAULT_DANDELION_STEM_PERCENTAGE = 90;
Expand Down Expand Up @@ -69,6 +71,8 @@ static const bool DEFAULT_UPNP = USE_UPNP;
#else
static const bool DEFAULT_UPNP = false;
#endif
/** The maximum size of the per-peer dandelion cache in Byte */
static constexpr size_t MAX_DANDELION_CACHE_SZ = 1000 * 1000;
/** The maximum number of entries in mapAskFor */
static const size_t MAPASKFOR_MAX_SZ = MAX_INV_SZ;
/** The maximum number of entries in setAskFor (larger due to getdata latency)*/
Expand Down Expand Up @@ -715,6 +719,19 @@ class CNode

std::atomic<bool> m_accept_dandelion{false}; //!< If this peer accepts dandelion txs

using DandelionCache = std::map</* witness hash */ uint256, std::pair<CTransactionRef, /* expiry */ int64_t>>;
/** The dandelion cache
* Contains transactions that are
* - not in our mempool
* - not mined
* - not expired
*/
DandelionCache m_cache_dandelion;
/** The total size of all txs in the cache */
size_t m_cache_dandelion_size{0};
/** To check if a tx in the cache might have expired */
std::priority_queue<int64_t, std::vector<int64_t>, std::greater<int64_t>> m_cache_expiry;

// inventory based relay
CRollingBloomFilter filterInventoryKnown;
// Set of transaction ids we still have to announce.
Expand Down
159 changes: 157 additions & 2 deletions src/net_processing.cpp
Expand Up @@ -39,6 +39,10 @@
static constexpr int64_t ORPHAN_TX_EXPIRE_TIME = 20 * 60;
/** Minimum time between orphan transactions expire time checks in seconds */
static constexpr int64_t ORPHAN_TX_EXPIRE_INTERVAL = 5 * 60;
/** Keep Dandelion txs for at least this long in the cache before expiry */
static constexpr int64_t EXPIRE_DANDELION_CACHE = 10;
/** Per-peer random additional delay for Dandelion cache expiry*/
static constexpr int64_t EXPIRE_DANDELION_CACHE_AVG_ADD = 20;
/** Headers download timeout expressed in microseconds
* Timeout = base + per_header * (expected number of headers) */
static constexpr int64_t HEADERS_DOWNLOAD_TIMEOUT_BASE = 15 * 60 * 1000000; // 15 minutes
Expand Down Expand Up @@ -1063,12 +1067,63 @@ bool static AlreadyHave(const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
static void RelayTransaction(const CTransaction& tx, CConnman* connman)
{
CInv inv(MSG_TX, tx.GetHash());
connman->ForEachNode([&inv](CNode* pnode)
{
connman->ForEachNode([&inv](CNode* pnode) {
pnode->PushInventory(inv);
pnode->m_cache_dandelion.erase(inv.hash);
});
}

static void FluffTransaction(CTxMemPool& tx_pool, const CNode* from, const CTransactionRef& ptx, CConnman* connman)
{
LogPrint(BCLog::DANDELION, "fluff\n");
CValidationState state;
if (!AcceptToMemoryPool(tx_pool, state, ptx, /* pfMissingInputs */ nullptr, /* plTxnReplaced */ nullptr, /* bypass_limits */ false, /* nAbsurdFee */ 0, /* test_accept */ false)) {
// Should never happen, since we checked for validity
LogPrint(BCLog::DANDELION, "Dandelion transaction fluff failed due to %s\n", FormatStateMessage(state));
return;
}
tx_pool.check(pcoinsTip.get());
RelayTransaction(*ptx, connman);
LogPrint(BCLog::MEMPOOL, "AcceptToMemoryPool: peer=%d: accepted %s (poolsz %u txn, %u kB)\n",
from->GetId(),
ptx->GetHash().ToString(),
tx_pool.size(), tx_pool.DynamicMemoryUsage() / 1000);
}

static void RelayDandelionTransaction(CTxMemPool& tx_pool, CNode* node_from, const CTransactionRef& ptx, const CNetMsgMaker& msg_maker, CConnman* connman)
{
bool fluff = GetRand(100 * 100) / 100. >= CNode::m_dandelion_stem_pct_threshold;
if (fluff) return FluffTransaction(tx_pool, node_from, ptx, connman);

CNode* dest = node_from->GetDandelionDestination(connman);
if (!dest) {
// Just fall back to full fluff at this point
return FluffTransaction(tx_pool, node_from, ptx, connman);
}

// Dandelion relay (at least one hop)
const int64_t now_micros = GetTimeMicros();
const int64_t expiry = now_micros + EXPIRE_DANDELION_CACHE * 1000 * 1000 + PoissonNextSend(now_micros, EXPIRE_DANDELION_CACHE_AVG_ADD);
node_from->m_cache_dandelion.emplace(std::make_pair(ptx->GetWitnessHash(), std::make_pair(ptx, expiry)));
node_from->m_cache_dandelion_size += ptx->GetTotalSize();
node_from->m_cache_expiry.push(expiry);

static constexpr int DANDELION_SEND_FLAGS{0}; // Always send as witness tx

// Check if the next peer would fluff (assume same probability)
bool force_fluff = GetRand(100 * 100) / 100. >= CNode::m_dandelion_stem_pct_threshold;

if (force_fluff || !dest->m_accept_dandelion) {
// The next peer is going to transition into fluff-phase
LogPrint(BCLog::DANDELION, "one-hop relay to peer=%d\n", dest->GetId());
connman->PushMessage(dest, msg_maker.Make(DANDELION_SEND_FLAGS, NetMsgType::TX, *ptx));
return;
}

LogPrint(BCLog::DANDELION, "dandelion relay to peer=%d\n", dest->GetId());
connman->PushMessage(dest, msg_maker.Make(DANDELION_SEND_FLAGS, NetMsgType::TX_DANDELION, *ptx));
}

static void RelayAddress(const CAddress& addr, bool fReachable, CConnman* connman)
{
unsigned int nRelayNodes = fReachable ? 2 : 1; // limited relaying of addresses outside our network(s)
Expand Down Expand Up @@ -2197,6 +2252,46 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
}


else if (strCommand == NetMsgType::TX_DANDELION) {
if (!CNode::IsDandelionEnabled()) {
LogPrint(BCLog::NET, "Dandelion transaction sent in violation of protocol peer=%d\n", pfrom->GetId());
return true;
}

CTransactionRef tx;
vRecv >> tx;

LogPrint(BCLog::DANDELION, "Dandelion: peer=%d: %s ", pfrom->GetId(), tx->GetHash().ToString()); /* Continued */

if (pfrom->m_cache_dandelion_size > MAX_DANDELION_CACHE_SZ) {
LogPrint(BCLog::DANDELION, "full cache (size: %d)\n", pfrom->m_cache_dandelion_size);
return true;
}

LOCK(cs_main);
if (AlreadyHave(CInv{MSG_TX, tx->GetHash()})) {
LogPrint(BCLog::DANDELION, "already have\n");
return true;
}

CValidationState state;
bool missing_inputs;
if (AcceptToMemoryPool(mempool, state, tx, &missing_inputs, nullptr /* lRemovedTxn */, false /* bypass_limits */, 0 /* nAbsurdFee */, true /* test_accept */)) {
RelayDandelionTransaction(mempool, pfrom, tx, msgMaker, connman);
return true;
}
if (missing_inputs) {
LogPrint(BCLog::DANDELION, "missing inputs\n");
return true;
}

LogPrint(BCLog::DANDELION, "invalid %s\n", FormatStateMessage(state));
int penalty = 0;
if (state.IsInvalid(penalty) && penalty) Misbehaving(pfrom->GetId(), penalty);
return true;
}


else if (strCommand == NetMsgType::TX)
{
// Stop processing the transaction early if
Expand Down Expand Up @@ -2977,8 +3072,68 @@ static bool SendRejectsAndCheckIfBanned(CNode* pnode, CConnman* connman, bool en
return false;
}

/** Check if any dandelion txs expired. If so, relay them. */
static void CheckDandelionExpiry(CNode* from, CTxMemPool& tx_pool, CConnman* connman)
{
if (from->m_cache_expiry.empty()) return;

const int64_t now_millis = GetTimeMillis();

// Constant time lookup:
if (from->m_cache_expiry.top() >= now_millis) return;

// Otherwise walk the whole cache to look for the expired tx(s)
do {
from->m_cache_expiry.pop();
} while (from->m_cache_expiry.top() < now_millis);

for (auto it = from->m_cache_dandelion.cbegin(); it != from->m_cache_dandelion.cend(); ++it) {
if (it->second.second >= now_millis) continue;

CTransactionRef tx = it->second.first;
from->m_cache_dandelion.erase(it);
from->m_cache_dandelion_size -= tx->GetTotalSize();

LogPrint(BCLog::DANDELION, "Dandelion: peer=%d: expired %s (cachesz %u txn, %u kB) ", /* Continued */
from->GetId(),
tx->GetHash().ToString(),
from->m_cache_dandelion.size(), from->m_cache_dandelion_size);

LOCK(cs_main);
if (AlreadyHave(CInv{MSG_TX, tx->GetHash()})) {
LogPrint(BCLog::DANDELION, "already have\n");
continue;
}
bool missing_inputs;
CValidationState state;
if (AcceptToMemoryPool(tx_pool, state, std::move(tx), &missing_inputs, /* plTxnReplaced */ nullptr, /* bypass_limits */ false, /* nAbsurdFee */ 0, /* test_accept */ true)) {
LogPrint(BCLog::DANDELION, "accepted to mempool\n");
tx_pool.check(pcoinsTip.get());
RelayTransaction(*tx, connman);
LogPrint(BCLog::MEMPOOL, "AcceptToMemoryPool: peer=%d: accepted %s (poolsz %u txn, %u kB)\n",
from->GetId(),
tx->GetHash().ToString(),
tx_pool.size(), tx_pool.DynamicMemoryUsage() / 1000);
continue;
}
if (missing_inputs) {
LogPrint(BCLog::DANDELION, "missing inputs\n");
continue;
}

LogPrint(BCLog::DANDELION, "invalid %s\n", FormatStateMessage(state));
int penalty = 0;
if (state.IsInvalid(penalty) && penalty) Misbehaving(from->GetId(), penalty);
continue;
}
}


bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgProc)
{
// Expire Dandelion cache
CheckDandelionExpiry(pfrom, ::mempool, connman);

const CChainParams& chainparams = Params();
//
// Message format
Expand Down
1 change: 1 addition & 0 deletions src/rpc/rawtransaction.cpp
Expand Up @@ -1170,6 +1170,7 @@ static UniValue sendrawtransaction(const JSONRPCRequest& request)
g_connman->ForEachNode([&inv](CNode* pnode)
{
pnode->PushInventory(inv);
pnode->m_cache_dandelion.erase(inv.hash);
});

return hashTx.GetHex();
Expand Down

0 comments on commit 8b533b2

Please sign in to comment.