Skip to content

Commit

Permalink
Merge #25768: wallet: Properly rebroadcast unconfirmed transaction ch…
Browse files Browse the repository at this point in the history
…ains

3405f3e test: Test that an unconfirmed not-in-mempool chain is rebroadcast (Andrew Chow)
10d91c5 wallet: Deduplicate Resend and ReacceptWalletTransactions (Andrew Chow)

Pull request description:

  Currently `ResendWalletTransactions` (used for normal rebroadcasts) will attempt to rebroadcast all of the transactions in the wallet in the order they are stored in `mapWallet`. This ends up being random as `mapWallet` is a `std::unordered_map`. However `ReacceptWalletTransactions` (used for adding to the mempool on loading) first sorts the txs by wallet insertion order, then submits them. The result is that `ResendWalletTranactions` will fail to rebroadcast child transactions if their txids happen to be lexicographically less than their parent's txid. This PR resolves this issue by combining `ReacceptWalletTransactions` and `ResendWalletTransactions` into a new `ResubmitWalletTransactions` so that the iteration code and basic checks are shared.

  A test has also been added that checks that such transaction chains are rebroadcast correctly.

ACKs for top commit:
  naumenkogs:
    utACK 3405f3e
  1440000bytes:
    reACK 3405f3e
  furszy:
    Late code review ACK 3405f3e
  stickies-v:
    ACK 3405f3e

Tree-SHA512: 1240d9690ecc2ae8d476286b79e2386f537a90c41dd2b8b8a5a9c2a917aa3af85d6aee019fbbb05e772985a2b197e2788305586d9d5dac78ccba1ee5aa31d77a
  • Loading branch information
glozow committed Sep 5, 2022
2 parents e864f2e + 3405f3e commit 5291933
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 64 deletions.
8 changes: 4 additions & 4 deletions src/wallet/rpc/backup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ RPCHelpMan importaddress()
RescanWallet(*pwallet, reserver);
{
LOCK(pwallet->cs_wallet);
pwallet->ReacceptWalletTransactions();
pwallet->ResubmitWalletTransactions(/*relay=*/false, /*force=*/true);
}
}

Expand Down Expand Up @@ -476,7 +476,7 @@ RPCHelpMan importpubkey()
RescanWallet(*pwallet, reserver);
{
LOCK(pwallet->cs_wallet);
pwallet->ReacceptWalletTransactions();
pwallet->ResubmitWalletTransactions(/*relay=*/false, /*force=*/true);
}
}

Expand Down Expand Up @@ -1397,7 +1397,7 @@ RPCHelpMan importmulti()
int64_t scannedTime = pwallet->RescanFromTime(nLowestTimestamp, reserver, true /* update */);
{
LOCK(pwallet->cs_wallet);
pwallet->ReacceptWalletTransactions();
pwallet->ResubmitWalletTransactions(/*relay=*/false, /*force=*/true);
}

if (pwallet->IsAbortingRescan()) {
Expand Down Expand Up @@ -1691,7 +1691,7 @@ RPCHelpMan importdescriptors()
int64_t scanned_time = pwallet->RescanFromTime(lowest_timestamp, reserver, true /* update */);
{
LOCK(pwallet->cs_wallet);
pwallet->ReacceptWalletTransactions();
pwallet->ResubmitWalletTransactions(/*relay=*/false, /*force=*/true);
}

if (pwallet->IsAbortingRescan()) {
Expand Down
7 changes: 7 additions & 0 deletions src/wallet/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,13 @@ class CWalletTx
CWalletTx(CWalletTx const &) = delete;
void operator=(CWalletTx const &x) = delete;
};

struct WalletTxOrderComparator {
bool operator()(const CWalletTx* a, const CWalletTx* b) const
{
return a->nOrderPos < b->nOrderPos;
}
};
} // namespace wallet

#endif // BITCOIN_WALLET_TRANSACTION_H
90 changes: 44 additions & 46 deletions src/wallet/wallet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1857,34 +1857,6 @@ CWallet::ScanResult CWallet::ScanForWalletTransactions(const uint256& start_bloc
return result;
}

void CWallet::ReacceptWalletTransactions()
{
// If transactions aren't being broadcasted, don't let them into local mempool either
if (!fBroadcastTransactions)
return;
std::map<int64_t, CWalletTx*> mapSorted;

// Sort pending wallet transactions based on their initial wallet insertion order
for (std::pair<const uint256, CWalletTx>& item : mapWallet) {
const uint256& wtxid = item.first;
CWalletTx& wtx = item.second;
assert(wtx.GetHash() == wtxid);

int nDepth = GetTxDepthInMainChain(wtx);

if (!wtx.IsCoinBase() && (nDepth == 0 && !wtx.isAbandoned())) {
mapSorted.insert(std::make_pair(wtx.nOrderPos, &wtx));
}
}

// Try to add wallet transactions to memory pool
for (const std::pair<const int64_t, CWalletTx*>& item : mapSorted) {
CWalletTx& wtx = *(item.second);
std::string unused_err_string;
SubmitTxMemoryPoolAndRelay(wtx, unused_err_string, false);
}
}

bool CWallet::SubmitTxMemoryPoolAndRelay(CWalletTx& wtx, std::string& err_string, bool relay) const
{
AssertLockHeld(cs_wallet);
Expand Down Expand Up @@ -1925,43 +1897,69 @@ std::set<uint256> CWallet::GetTxConflicts(const CWalletTx& wtx) const
return result;
}

// Rebroadcast transactions from the wallet. We do this on a random timer
// to slightly obfuscate which transactions come from our wallet.
// Resubmit transactions from the wallet to the mempool, optionally asking the
// mempool to relay them. On startup, we will do this for all unconfirmed
// transactions but will not ask the mempool to relay them. We do this on startup
// to ensure that our own mempool is aware of our transactions, and to also
// initialize nNextResend so that the actual rebroadcast is scheduled. There
// is a privacy side effect here as not broadcasting on startup also means that we won't
// inform the world of our wallet's state, particularly if the wallet (or node) is not
// yet synced.
//
// Otherwise this function is called periodically in order to relay our unconfirmed txs.
// We do this on a random timer to slightly obfuscate which transactions
// come from our wallet.
//
// Ideally, we'd only resend transactions that we think should have been
// TODO: Ideally, we'd only resend transactions that we think should have been
// mined in the most recent block. Any transaction that wasn't in the top
// blockweight of transactions in the mempool shouldn't have been mined,
// and so is probably just sitting in the mempool waiting to be confirmed.
// Rebroadcasting does nothing to speed up confirmation and only damages
// privacy.
void CWallet::ResendWalletTransactions()
//
// The `force` option results in all unconfirmed transactions being submitted to
// the mempool. This does not necessarily result in those transactions being relayed,
// that depends on the `relay` option. Periodic rebroadcast uses the pattern
// relay=true force=false (also the default values), while loading into the mempool
// (on start, or after import) uses relay=false force=true.
void CWallet::ResubmitWalletTransactions(bool relay, bool force)
{
// Don't attempt to resubmit if the wallet is configured to not broadcast,
// even if forcing.
if (!fBroadcastTransactions) return;

// During reindex, importing and IBD, old wallet transactions become
// unconfirmed. Don't resend them as that would spam other nodes.
if (!chain().isReadyToBroadcast()) return;
// We only allow forcing mempool submission when not relaying to avoid this spam.
if (!force && relay && !chain().isReadyToBroadcast()) return;

// Do this infrequently and randomly to avoid giving away
// that these are our transactions.
if (GetTime() < nNextResend || !fBroadcastTransactions) return;
bool fFirst = (nNextResend == 0);
if (!force && GetTime() < nNextResend) return;
// resend 12-36 hours from now, ~1 day on average.
nNextResend = GetTime() + (12 * 60 * 60) + GetRand(24 * 60 * 60);
if (fFirst) return;

int submitted_tx_count = 0;

{ // cs_wallet scope
LOCK(cs_wallet);

// Relay transactions
for (std::pair<const uint256, CWalletTx>& item : mapWallet) {
CWalletTx& wtx = item.second;
// Attempt to rebroadcast all txes more than 5 minutes older than
// the last block. SubmitTxMemoryPoolAndRelay() will not rebroadcast
// any confirmed or conflicting txs.
if (wtx.nTimeReceived > m_best_block_time - 5 * 60) continue;
// First filter for the transactions we want to rebroadcast.
// We use a set with WalletTxOrderComparator so that rebroadcasting occurs in insertion order
std::set<CWalletTx*, WalletTxOrderComparator> to_submit;
for (auto& [txid, wtx] : mapWallet) {
// Only rebroadcast unconfirmed txs
if (!wtx.isUnconfirmed()) continue;

// attempt to rebroadcast all txes more than 5 minutes older than
// the last block, or all txs if forcing.
if (!force && wtx.nTimeReceived > m_best_block_time - 5 * 60) continue;
to_submit.insert(&wtx);
}
// Now try submitting the transactions to the memory pool and (optionally) relay them.
for (auto wtx : to_submit) {
std::string unused_err_string;
if (SubmitTxMemoryPoolAndRelay(wtx, unused_err_string, true)) ++submitted_tx_count;
if (SubmitTxMemoryPoolAndRelay(*wtx, unused_err_string, relay)) ++submitted_tx_count;
}
} // cs_wallet

Expand All @@ -1975,7 +1973,7 @@ void CWallet::ResendWalletTransactions()
void MaybeResendWalletTxs(WalletContext& context)
{
for (const std::shared_ptr<CWallet>& pwallet : GetWallets(context)) {
pwallet->ResendWalletTransactions();
pwallet->ResubmitWalletTransactions(/*relay=*/true, /*force=*/false);
}
}

Expand Down Expand Up @@ -3191,7 +3189,7 @@ void CWallet::postInitProcess()

// Add wallet transactions that aren't already in a block to mempool
// Do this here as mempool requires genesis block to be loaded
ReacceptWalletTransactions();
ResubmitWalletTransactions(/*relay=*/false, /*force=*/true);

// Update wallet transactions with current mempool transactions.
chain().requestMempoolTransactions(*this);
Expand Down
3 changes: 1 addition & 2 deletions src/wallet/wallet.h
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,7 @@ class CWallet final : public WalletStorage, public interfaces::Chain::Notificati
};
ScanResult ScanForWalletTransactions(const uint256& start_block, int start_height, std::optional<int> max_height, const WalletRescanReserver& reserver, bool fUpdate, const bool save_progress);
void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override;
void ReacceptWalletTransactions() EXCLUSIVE_LOCKS_REQUIRED(cs_wallet);
void ResendWalletTransactions();
void ResubmitWalletTransactions(bool relay, bool force);

OutputType TransactionChangeType(const std::optional<OutputType>& change_type, const std::vector<CRecipient>& vecSend) const;

Expand Down
2 changes: 1 addition & 1 deletion test/functional/mempool_expiry.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
from datetime import timedelta

from test_framework.blocktools import COINBASE_MATURITY
from test_framework.messages import DEFAULT_MEMPOOL_EXPIRY_HOURS
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
assert_equal,
assert_raises_rpc_error,
)
from test_framework.wallet import MiniWallet

DEFAULT_MEMPOOL_EXPIRY_HOURS = 336 # hours
CUSTOM_MEMPOOL_EXPIRY = 10 # hours


Expand Down
1 change: 1 addition & 0 deletions test/functional/test_framework/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
# Default setting for -datacarriersize. 80 bytes of data, +1 for OP_RETURN, +2 for the pushdata opcodes.
MAX_OP_RETURN_RELAY = 83

DEFAULT_MEMPOOL_EXPIRY_HOURS = 336 # hours

def sha256(s):
return hashlib.sha256(s).digest()
Expand Down
64 changes: 53 additions & 11 deletions test/functional/wallet_resendwallettransactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@
create_block,
create_coinbase,
)
from test_framework.messages import DEFAULT_MEMPOOL_EXPIRY_HOURS
from test_framework.p2p import P2PTxInvStore
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal

from test_framework.util import (
assert_equal,
assert_raises_rpc_error,
)

class ResendWalletTransactionsTest(BitcoinTestFramework):
def set_test_params(self):
Expand All @@ -27,13 +30,9 @@ def run_test(self):
peer_first = node.add_p2p_connection(P2PTxInvStore())

self.log.info("Create a new transaction and wait until it's broadcast")
txid = node.sendtoaddress(node.getnewaddress(), 1)

# Wallet rebroadcast is first scheduled 1 min sec after startup (see
# nNextResend in ResendWalletTransactions()). Tell scheduler to call
# MaybeResendWalletTxs now to initialize nNextResend before the first
# setmocktime call below.
node.mockscheduler(60)
parent_utxo, indep_utxo = node.listunspent()[:2]
addr = node.getnewaddress()
txid = node.send(outputs=[{addr: 1}], options={"inputs": [parent_utxo]})["txid"]

# Can take a few seconds due to transaction trickling
peer_first.wait_for_broadcast([txid])
Expand All @@ -51,7 +50,7 @@ def run_test(self):
block.solve()
node.submitblock(block.serialize().hex())

# Set correct m_best_block_time, which is used in ResendWalletTransactions
# Set correct m_best_block_time, which is used in ResubmitWalletTransactions
node.syncwithvalidationinterfacequeue()
now = int(time.time())

Expand All @@ -66,14 +65,57 @@ def run_test(self):
self.log.info("Bump time & check that transaction is rebroadcast")
# Transaction should be rebroadcast approximately 24 hours in the future,
# but can range from 12-36. So bump 36 hours to be sure.
with node.assert_debug_log(['ResendWalletTransactions: resubmit 1 unconfirmed transactions']):
with node.assert_debug_log(['resubmit 1 unconfirmed transactions']):
node.setmocktime(now + 36 * 60 * 60)
# Tell scheduler to call MaybeResendWalletTxs now.
node.mockscheduler(60)
# Give some time for trickle to occur
node.setmocktime(now + 36 * 60 * 60 + 600)
peer_second.wait_for_broadcast([txid])

self.log.info("Chain of unconfirmed not-in-mempool txs are rebroadcast")
# This tests that the node broadcasts the parent transaction before the child transaction.
# To test that scenario, we need a method to reliably get a child transaction placed
# in mapWallet positioned before the parent. We cannot predict the position in mapWallet,
# but we can observe it using listreceivedbyaddress and other related RPCs.
#
# So we will create the child transaction, use listreceivedbyaddress to see what the
# ordering of mapWallet is, if the child is not before the parent, we will create a new
# child (via bumpfee) and remove the old child (via removeprunedfunds) until we get the
# ordering of child before parent.
child_txid = node.send(outputs=[{addr: 0.5}], options={"inputs": [{"txid":txid, "vout":0}]})["txid"]
while True:
txids = node.listreceivedbyaddress(minconf=0, address_filter=addr)[0]["txids"]
if txids == [child_txid, txid]:
break
bumped = node.bumpfee(child_txid)
node.removeprunedfunds(child_txid)
child_txid = bumped["txid"]
entry_time = node.getmempoolentry(child_txid)["time"]

block_time = entry_time + 6 * 60
node.setmocktime(block_time)
block = create_block(int(node.getbestblockhash(), 16), create_coinbase(node.getblockcount() + 1), block_time)
block.solve()
node.submitblock(block.serialize().hex())
node.syncwithvalidationinterfacequeue()

# Evict these txs from the mempool
evict_time = block_time + 60 * 60 * DEFAULT_MEMPOOL_EXPIRY_HOURS + 5
node.setmocktime(evict_time)
indep_send = node.send(outputs=[{node.getnewaddress(): 1}], options={"inputs": [indep_utxo]})
node.syncwithvalidationinterfacequeue()
node.getmempoolentry(indep_send["txid"])
assert_raises_rpc_error(-5, "Transaction not in mempool", node.getmempoolentry, txid)
assert_raises_rpc_error(-5, "Transaction not in mempool", node.getmempoolentry, child_txid)

# Rebroadcast and check that parent and child are both in the mempool
with node.assert_debug_log(['resubmit 2 unconfirmed transactions']):
node.setmocktime(evict_time + 36 * 60 * 60) # 36 hrs is the upper limit of the resend timer
node.mockscheduler(60)
node.getmempoolentry(txid)
node.getmempoolentry(child_txid)


if __name__ == '__main__':
ResendWalletTransactionsTest().main()

0 comments on commit 5291933

Please sign in to comment.