Skip to content

Commit

Permalink
[mempool] Persist unbroadcast set to mempool.dat
Browse files Browse the repository at this point in the history
Ensure that the unbroadcast set will still be meaningful if the node is
restarted.
  • Loading branch information
amitiuttarwar committed Apr 23, 2020
1 parent 297a178 commit 50fc4df
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 6 deletions.
18 changes: 17 additions & 1 deletion src/validation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4970,6 +4970,7 @@ bool LoadMempool(CTxMemPool& pool)
int64_t expired = 0;
int64_t failed = 0;
int64_t already_there = 0;
int64_t unbroadcast = 0;
int64_t nNow = GetTime();

try {
Expand Down Expand Up @@ -5023,12 +5024,21 @@ bool LoadMempool(CTxMemPool& pool)
for (const auto& i : mapDeltas) {
pool.PrioritiseTransaction(i.first, i.second);
}

std::set<uint256> unbroadcast_txids;
file >> unbroadcast_txids;
unbroadcast = unbroadcast_txids.size();

for (const auto& txid : unbroadcast_txids) {
pool.AddUnbroadcastTx(txid);
}

} catch (const std::exception& e) {
LogPrintf("Failed to deserialize mempool data on disk: %s. Continuing anyway.\n", e.what());
return false;
}

LogPrintf("Imported mempool transactions from disk: %i succeeded, %i failed, %i expired, %i already there\n", count, failed, expired, already_there);
LogPrintf("Imported mempool transactions from disk: %i succeeded, %i failed, %i expired, %i already there, %i waiting for initial broadcast\n", count, failed, expired, already_there, unbroadcast);
return true;
}

Expand All @@ -5038,6 +5048,7 @@ bool DumpMempool(const CTxMemPool& pool)

std::map<uint256, CAmount> mapDeltas;
std::vector<TxMempoolInfo> vinfo;
std::set<uint256> unbroadcast_txids;

static Mutex dump_mutex;
LOCK(dump_mutex);
Expand All @@ -5048,6 +5059,7 @@ bool DumpMempool(const CTxMemPool& pool)
mapDeltas[i.first] = i.second;
}
vinfo = pool.infoAll();
unbroadcast_txids = pool.GetUnbroadcastTxs();
}

int64_t mid = GetTimeMicros();
Expand All @@ -5072,6 +5084,10 @@ bool DumpMempool(const CTxMemPool& pool)
}

file << mapDeltas;

LogPrintf("Writing %d unbroadcast transactions to disk.\n", unbroadcast_txids.size());
file << unbroadcast_txids;

if (!FileCommit(file.Get()))
throw std::runtime_error("FileCommit failed");
file.fclose();
Expand Down
40 changes: 35 additions & 5 deletions test/functional/mempool_persist.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@
import time

from test_framework.test_framework import BitcoinTestFramework
from test_framework.mininode import P2PTxInvStore
from test_framework.util import (
assert_equal,
assert_greater_than_or_equal,
assert_raises_rpc_error,
connect_nodes,
disconnect_nodes,
wait_until,
)

Expand Down Expand Up @@ -80,6 +83,11 @@ def run_test(self):
assert_greater_than_or_equal(tx_creation_time, tx_creation_time_lower)
assert_greater_than_or_equal(tx_creation_time_higher, tx_creation_time)

# disconnect nodes & make a txn that remains in the unbroadcast set.
disconnect_nodes(self.nodes[0], 2)
self.nodes[0].sendtoaddress(self.nodes[2].getnewaddress(), Decimal("12"))
connect_nodes(self.nodes[0], 2)

self.log.debug("Stop-start the nodes. Verify that node0 has the transactions in its mempool and node1 does not. Verify that node2 calculates its balance correctly after loading wallet transactions.")
self.stop_nodes()
# Give this node a head-start, so we can be "extra-sure" that it didn't load anything later
Expand All @@ -89,7 +97,7 @@ def run_test(self):
self.start_node(2)
wait_until(lambda: self.nodes[0].getmempoolinfo()["loaded"], timeout=1)
wait_until(lambda: self.nodes[2].getmempoolinfo()["loaded"], timeout=1)
assert_equal(len(self.nodes[0].getrawmempool()), 5)
assert_equal(len(self.nodes[0].getrawmempool()), 6)
assert_equal(len(self.nodes[2].getrawmempool()), 5)
# The others have loaded their mempool. If node_1 loaded anything, we'd probably notice by now:
assert_equal(len(self.nodes[1].getrawmempool()), 0)
Expand All @@ -105,17 +113,18 @@ def run_test(self):
self.nodes[2].syncwithvalidationinterfacequeue() # Flush mempool to wallet
assert_equal(node2_balance, self.nodes[2].getbalance())

# start node0 with wallet disabled so wallet transactions don't get resubmitted
self.log.debug("Stop-start node0 with -persistmempool=0. Verify that it doesn't load its mempool.dat file.")
self.stop_nodes()
self.start_node(0, extra_args=["-persistmempool=0"])
self.start_node(0, extra_args=["-persistmempool=0", "-disablewallet"])
wait_until(lambda: self.nodes[0].getmempoolinfo()["loaded"])
assert_equal(len(self.nodes[0].getrawmempool()), 0)

self.log.debug("Stop-start node0. Verify that it has the transactions in its mempool.")
self.stop_nodes()
self.start_node(0)
wait_until(lambda: self.nodes[0].getmempoolinfo()["loaded"])
assert_equal(len(self.nodes[0].getrawmempool()), 5)
assert_equal(len(self.nodes[0].getrawmempool()), 6)

mempooldat0 = os.path.join(self.nodes[0].datadir, self.chain, 'mempool.dat')
mempooldat1 = os.path.join(self.nodes[1].datadir, self.chain, 'mempool.dat')
Expand All @@ -124,12 +133,12 @@ def run_test(self):
self.nodes[0].savemempool()
assert os.path.isfile(mempooldat0)

self.log.debug("Stop nodes, make node1 use mempool.dat from node0. Verify it has 5 transactions")
self.log.debug("Stop nodes, make node1 use mempool.dat from node0. Verify it has 6 transactions")
os.rename(mempooldat0, mempooldat1)
self.stop_nodes()
self.start_node(1, extra_args=[])
wait_until(lambda: self.nodes[1].getmempoolinfo()["loaded"])
assert_equal(len(self.nodes[1].getrawmempool()), 5)
assert_equal(len(self.nodes[1].getrawmempool()), 6)

self.log.debug("Prevent bitcoind from writing mempool.dat to disk. Verify that `savemempool` fails")
# to test the exception we are creating a tmp folder called mempool.dat.new
Expand All @@ -139,6 +148,27 @@ def run_test(self):
assert_raises_rpc_error(-1, "Unable to dump mempool to disk", self.nodes[1].savemempool)
os.rmdir(mempooldotnew1)

self.test_persist_unbroadcast()

def test_persist_unbroadcast(self):
node0 = self.nodes[0]
self.start_node(0)

# clear out mempool
node0.generate(1)

# disconnect nodes to make a txn that remains in the unbroadcast set.
disconnect_nodes(node0, 1)
node0.sendtoaddress(self.nodes[1].getnewaddress(), Decimal("12"))

# shutdown, then startup with wallet disabled
self.stop_nodes()
self.start_node(0, extra_args=["-disablewallet"])

# check that txn gets broadcast due to unbroadcast logic
conn = node0.add_p2p_connection(P2PTxInvStore())
node0.mockscheduler(16*60) # 15 min + 1 for buffer
wait_until(lambda: len(conn.get_invs()) == 1)

if __name__ == '__main__':
MempoolPersistTest().main()
3 changes: 3 additions & 0 deletions test/functional/mempool_unbroadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ def test_broadcast(self):
assert rpc_tx_hsh not in mempool
assert wallet_tx_hsh not in mempool

# ensure that unbroadcast txs are persisted to mempool.dat
self.restart_node(0)

self.log.info("Reconnect nodes & check if they are sent to node 1")
connect_nodes(node, 1)

Expand Down

0 comments on commit 50fc4df

Please sign in to comment.