Skip to content

Commit

Permalink
[mempool] mempool tracks wallet txns & ensures successful initial bro…
Browse files Browse the repository at this point in the history
…adcast

Since the newly implemented functionality only rebroadcasts txns at
the top of the mempool, its possible to hit a case where if a txn was
submitted locally & not immediately relayed, it would take a long time
to be included in the rebroadcast set & ever succesfully be initially
broadcast.

To prevent this case, the mempool keeps track of `setUnbroadcastTxIDs`,
and deems the txn relay successful when an associated GETDATA message is
received. On the rebroadcast timer, txns from this set are added to the
txns being relayed.
  • Loading branch information
amitiuttarwar committed Aug 23, 2019
1 parent be2878c commit ba33693
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 1 deletion.
10 changes: 10 additions & 0 deletions src/net_processing.cpp
Expand Up @@ -1535,6 +1535,12 @@ void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnm
if (mi != mapRelay.end()) {
connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::TX, *mi->second));
push = true;

// Once the first peer requests GETDATA for a txn, we deem initial broadcast a success
auto num = mempool.setUnbroadcastTxIDs.erase(inv.hash);
if (num) {
LogPrint(BCLog::NET, "Removed %i from setUnbroadcastTxIDs \n", inv.hash.GetHex());
}
} else if (pfrom->timeLastMempoolReq) {
auto txinfo = mempool.info(inv.hash);
// To protect privacy, do not answer getdata using the mempool when
Expand Down Expand Up @@ -3823,6 +3829,10 @@ bool PeerLogicValidation::SendMessages(CNode* pto)
}

pto->setInventoryTxToSend.insert(setRebroadcastTxs.begin(), setRebroadcastTxs.end());

// also ensure inclusion of wallet txns that haven't been successfully broadcast yet
// since set elements are unique, this will be a no-op if the txns are already in setInventoryTxToSend
pto->setInventoryTxToSend.insert(mempool.setUnbroadcastTxIDs.begin(), mempool.setUnbroadcastTxIDs.end());
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/node/transaction.cpp
Expand Up @@ -80,6 +80,9 @@ TransactionError BroadcastTransaction(const CTransactionRef tx, std::string& err
}

if (relay) {
// the mempool explicitly keeps track of wallet txns to ensure successful initial broadcast
mempool.setUnbroadcastTxIDs.insert(hashTx);

RelayTransaction(hashTx, *g_connman);
}

Expand Down
4 changes: 4 additions & 0 deletions src/txmempool.h
Expand Up @@ -537,6 +537,10 @@ class CTxMemPool
const setEntries & GetMemPoolParents(txiter entry) const EXCLUSIVE_LOCKS_REQUIRED(cs);
const setEntries & GetMemPoolChildren(txiter entry) const EXCLUSIVE_LOCKS_REQUIRED(cs);
uint64_t CalculateDescendantMaximum(txiter entry) const EXCLUSIVE_LOCKS_REQUIRED(cs);

// track wallet transactions to ensure they are successfully broadcast
std::set<uint256> setUnbroadcastTxIDs;

private:
typedef std::map<txiter, setEntries, CompareIteratorByHash> cacheMap;

Expand Down
106 changes: 106 additions & 0 deletions test/functional/mempool_wallet_transactions.py
@@ -0,0 +1,106 @@
#!/usr/bin/env python3
# Copyright (c) 2009-2019 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""
Ensure that wallet transactions get successfully broadcast to at least one peer.
"""

from collections import defaultdict
from test_framework.mininode import P2PInterface, mininode_lock
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
assert_equal,
assert_greater_than,
wait_until,
create_lots_of_big_transactions,
create_confirmed_utxos,
gen_return_txouts,
)
import time

class P2PStoreTxInvs(P2PInterface):
def __init__(self):
super().__init__()
self.tx_invs_received = defaultdict(int)

def on_inv(self, message):
# Store how many times invs have been received for each tx.
for i in message.inv:
if i.type == 1:
# save txid
self.tx_invs_received[i.hash] += 1

def get_invs(self):
with mininode_lock:
return list(self.tx_invs_received.keys())

# Constant from txmempool.h
MAX_REBROADCAST_WEIGHT = 3000000

class MempoolWalletTransactionsTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
self.extra_args = [["-whitelist=127.0.0.1", "-acceptnonstdtxn=1"]] * self.num_nodes

def compare_txns_to_invs(self, txn_hshs, invs):
tx_ids = [int(txhsh, 16) for txhsh in txn_hshs]

assert_equal(len(tx_ids), len(invs))
assert_equal(tx_ids.sort(), invs.sort())

def run_test(self):
self.log.info("test that mempool will ensure initial broadcast of wallet txns")

node = self.nodes[0]

# generate top of mempool txns
min_relay_fee = node.getnetworkinfo()["relayfee"]
txouts = gen_return_txouts()
utxo_count = 90
utxos = create_confirmed_utxos(min_relay_fee, node, utxo_count)
base_fee = min_relay_fee*100 # our transactions are smaller than 100kb

txids = create_lots_of_big_transactions(node, txouts, utxos, 90, 3*base_fee)

# check fee rate of these txns for comparison
txid = txids[0]
entry = node.getmempoolentry(txid)
high_fee_rate = entry['fee'] / entry['vsize']

# confirm txns are more than max rebroadcast amount
assert_greater_than(node.getmempoolinfo()['bytes'], MAX_REBROADCAST_WEIGHT)

# generate a wallet txn that will be broadcast to nobody
us0 = create_confirmed_utxos(min_relay_fee, node, 1).pop()
inputs = [{ "txid" : us0["txid"], "vout" : us0["vout"]}]
outputs = {node.getnewaddress() : 0.0001}
tx = node.createrawtransaction(inputs, outputs)
node.settxfee(min_relay_fee) # specifically fund this tx with low fee
txF = node.fundrawtransaction(tx)
txFS = node.signrawtransactionwithwallet(txF['hex'])
wallettxid = node.sendrawtransaction(txFS['hex']) # txhsh in hex

# ensure the wallet txn has a low fee rate & thus won't be
# rebroadcast due to top-of-mempool rule
walletentry = node.getmempoolentry(wallettxid)
low_fee_rate = walletentry['fee'] / walletentry['vsize']
assert_greater_than(high_fee_rate, low_fee_rate)

# add p2p connection
conn = node.add_p2p_connection(P2PStoreTxInvs())

# bump mocktime of node1 so rebroadcast is triggered
mocktime = int(time.time()) + 300 * 60 # hit rebroadcast interval
node.setmocktime(mocktime)

# `nNextInvSend` delay on `setInventoryTxToSend
wait_until(lambda: conn.get_invs(), timeout=30)

# verify the wallet txn inv was sent due to mempool tracking
wallettxinv = int(wallettxid, 16)
assert_equal(wallettxinv in conn.get_invs(), True)

if __name__ == '__main__':
MempoolWalletTransactionsTest().main()

1 change: 1 addition & 0 deletions test/functional/test_runner.py
Expand Up @@ -136,6 +136,7 @@
'interface_rpc.py',
'rpc_psbt.py',
'rpc_users.py',
'mempool_wallet_transactions.py',
'feature_proxy.py',
'rpc_signrawtransaction.py',
'wallet_groups.py',
Expand Down
1 change: 0 additions & 1 deletion test/functional/wallet_resendwallettransactions.py
Expand Up @@ -6,7 +6,6 @@
from collections import defaultdict
import time

from test_framework.blocktools import create_coinbase
from test_framework.mininode import P2PInterface
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import wait_until
Expand Down

0 comments on commit ba33693

Please sign in to comment.