Skip to content

Commit

Permalink
Add importmempool RPC
Browse files Browse the repository at this point in the history
test_importmempool_union contributed by glozow

Co-authored-by: glozow <gloriajzhao@gmail.com>
  • Loading branch information
MarcoFalke and glozow committed Apr 20, 2023
1 parent fa0a6bc commit fab8b37
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 10 deletions.
26 changes: 17 additions & 9 deletions src/kernel/mempool_persist.cpp
Expand Up @@ -52,7 +52,7 @@ bool ImportMempool(CTxMemPool& pool, const fs::path& load_path, Chainstate& acti
int64_t failed = 0;
int64_t already_there = 0;
int64_t unbroadcast = 0;
auto now = NodeClock::now();
const auto now{NodeClock::now()};

try {
uint64_t version;
Expand All @@ -71,8 +71,12 @@ bool ImportMempool(CTxMemPool& pool, const fs::path& load_path, Chainstate& acti
file >> nTime;
file >> nFeeDelta;

if (opts.use_current_time) {
nTime = TicksSinceEpoch<std::chrono::seconds>(now);
}

CAmount amountdelta = nFeeDelta;
if (amountdelta) {
if (amountdelta && opts.apply_fee_delta_priority) {
pool.PrioritiseTransaction(tx->GetHash(), amountdelta);
}
if (nTime > TicksSinceEpoch<std::chrono::seconds>(now - pool.m_expiry)) {
Expand Down Expand Up @@ -100,17 +104,21 @@ bool ImportMempool(CTxMemPool& pool, const fs::path& load_path, Chainstate& acti
std::map<uint256, CAmount> mapDeltas;
file >> mapDeltas;

for (const auto& i : mapDeltas) {
pool.PrioritiseTransaction(i.first, i.second);
if (opts.apply_fee_delta_priority) {
for (const auto& i : mapDeltas) {
pool.PrioritiseTransaction(i.first, i.second);
}
}

std::set<uint256> unbroadcast_txids;
file >> unbroadcast_txids;
unbroadcast = unbroadcast_txids.size();
for (const auto& txid : unbroadcast_txids) {
// Ensure transactions were accepted to mempool then add to
// unbroadcast set.
if (pool.get(txid) != nullptr) pool.AddUnbroadcastTx(txid);
if (opts.apply_unbroadcast_set) {
unbroadcast = unbroadcast_txids.size();
for (const auto& txid : unbroadcast_txids) {
// Ensure transactions were accepted to mempool then add to
// unbroadcast set.
if (pool.get(txid) != nullptr) pool.AddUnbroadcastTx(txid);
}
}
} catch (const std::exception& e) {
LogPrintf("Failed to deserialize mempool data on disk: %s. Continuing anyway.\n", e.what());
Expand Down
3 changes: 3 additions & 0 deletions src/kernel/mempool_persist.h
Expand Up @@ -19,6 +19,9 @@ bool DumpMempool(const CTxMemPool& pool, const fs::path& dump_path,

struct ImportMempoolOptions {
fsbridge::FopenFn mockable_fopen_function{fsbridge::fopen};
bool use_current_time{false};
bool apply_fee_delta_priority{true};
bool apply_unbroadcast_set{true};
};
/** Import the file and attempt to add its contents to the mempool. */
bool ImportMempool(CTxMemPool& pool, const fs::path& load_path, Chainstate& active_chainstate, ImportMempoolOptions&& opts);
Expand Down
1 change: 1 addition & 0 deletions src/rpc/client.cpp
Expand Up @@ -164,6 +164,7 @@ static const CRPCConvertParam vRPCConvertParams[] =
{ "importaddress", 2, "rescan" },
{ "importaddress", 3, "p2sh" },
{ "importpubkey", 2, "rescan" },
{ "importmempool", 1, "options" },
{ "importmulti", 0, "requests" },
{ "importmulti", 1, "options" },
{ "importdescriptors", 0, "requests" },
Expand Down
57 changes: 57 additions & 0 deletions src/rpc/mempool.cpp
Expand Up @@ -719,6 +719,62 @@ static RPCHelpMan getmempoolinfo()
};
}

static RPCHelpMan importmempool()
{
return RPCHelpMan{
"importmempool",
"Import a mempool.dat file and attempt to add its contents to the mempool.\n"
"Warning: Importing untrusted files is dangerous, especially if metadata from the file is taken over.",
{
{"filepath", RPCArg::Type::STR, RPCArg::Optional::NO, "The mempool file"},
{"options",
RPCArg::Type::OBJ,
RPCArg::Optional::OMITTED,
"",
{
{"use_current_time", RPCArg::Type::BOOL, RPCArg::Default{true},
"Whether to use the current system time or use the entry time metadata from the mempool file.\n"
"Warning: Importing untrusted metadata may lead to unexpected issues and undesirable behavior."},
{"apply_fee_delta_priority", RPCArg::Type::BOOL, RPCArg::Default{false},
"Whether to apply the fee delta metadata from the mempool file.\n"
"It will be added to any existing fee deltas.\n"
"The fee delta can be set by the prioritisetransaction RPC.\n"
"Warning: Importing untrusted metadata may lead to unexpected issues and undesirable behavior.\n"
"Only set this bool if you understand what it does."},
{"apply_unbroadcast_set", RPCArg::Type::BOOL, RPCArg::Default{false},
"Whether to apply the unbroadcast set metadata from the mempool file.\n"
"Warning: Importing untrusted metadata may lead to unexpected issues and undesirable behavior."},
},
RPCArgOptions{.oneline_description = "\"options\""}},
},
RPCResult{RPCResult::Type::OBJ, "", "", std::vector<RPCResult>{}},
RPCExamples{HelpExampleCli("importmempool", "/path/to/mempool.dat") + HelpExampleRpc("importmempool", "/path/to/mempool.dat")},
[&](const RPCHelpMan& self, const JSONRPCRequest& request) -> UniValue {
const NodeContext& node{EnsureAnyNodeContext(request.context)};

CTxMemPool& mempool{EnsureMemPool(node)};
Chainstate& chainstate = EnsureChainman(node).ActiveChainstate();

const fs::path load_path{fs::u8path(request.params[0].get_str())};
const UniValue& use_current_time{request.params[1]["use_current_time"]};
const UniValue& apply_fee_delta{request.params[1]["apply_fee_delta_priority"]};
const UniValue& apply_unbroadcast{request.params[1]["apply_unbroadcast_set"]};
kernel::ImportMempoolOptions opts{
.use_current_time = use_current_time.isNull() ? true : use_current_time.get_bool(),
.apply_fee_delta_priority = apply_fee_delta.isNull() ? false : apply_fee_delta.get_bool(),
.apply_unbroadcast_set = apply_unbroadcast.isNull() ? false : apply_unbroadcast.get_bool(),
};

if (!kernel::ImportMempool(mempool, load_path, chainstate, std::move(opts))) {
throw JSONRPCError(RPC_MISC_ERROR, "Unable to import mempool file, see debug.log for details.");
}

UniValue ret{UniValue::VOBJ};
return ret;
},
};
}

static RPCHelpMan savemempool()
{
return RPCHelpMan{"savemempool",
Expand Down Expand Up @@ -921,6 +977,7 @@ void RegisterMempoolRPCCommands(CRPCTable& t)
{"blockchain", &gettxspendingprevout},
{"blockchain", &getmempoolinfo},
{"blockchain", &getrawmempool},
{"blockchain", &importmempool},
{"blockchain", &savemempool},
{"hidden", &submitpackage},
};
Expand Down
1 change: 1 addition & 0 deletions src/test/fuzz/rpc.cpp
Expand Up @@ -77,6 +77,7 @@ const std::vector<std::string> RPC_COMMANDS_NOT_SAFE_FOR_FUZZING{
"generatetoaddress", // avoid prohibitively slow execution (when `num_blocks` is large)
"generatetodescriptor", // avoid prohibitively slow execution (when `nblocks` is large)
"gettxoutproof", // avoid prohibitively slow execution
"importmempool", // avoid reading from disk
"importwallet", // avoid reading from disk
"loadwallet", // avoid reading from disk
"prioritisetransaction", // avoid signed integer overflow in CTxMemPool::PrioritiseTransaction(uint256 const&, long const&) (https://github.com/bitcoin/bitcoin/issues/20626)
Expand Down
51 changes: 50 additions & 1 deletion test/functional/mempool_persist.py
Expand Up @@ -46,7 +46,7 @@
assert_greater_than_or_equal,
assert_raises_rpc_error,
)
from test_framework.wallet import MiniWallet
from test_framework.wallet import MiniWallet, COIN


class MempoolPersistTest(BitcoinTestFramework):
Expand Down Expand Up @@ -159,6 +159,15 @@ def run_test(self):
assert self.nodes[0].getmempoolinfo()["loaded"]
assert_equal(len(self.nodes[0].getrawmempool()), 0)

self.log.debug("Import mempool at runtime to node0.")
assert_equal({}, self.nodes[0].importmempool(mempooldat0))
assert_equal(len(self.nodes[0].getrawmempool()), 7)
fees = self.nodes[0].getmempoolentry(txid=last_txid)["fees"]
assert_equal(fees["base"], fees["modified"])
assert_equal({}, self.nodes[0].importmempool(mempooldat0, {"apply_fee_delta_priority": True, "apply_unbroadcast_set": True}))
fees = self.nodes[0].getmempoolentry(txid=last_txid)["fees"]
assert_equal(fees["base"] + Decimal("0.00001000"), fees["modified"])

self.log.debug("Stop-start node0. Verify that it has the transactions in its mempool.")
self.stop_nodes()
self.start_node(0)
Expand Down Expand Up @@ -186,6 +195,7 @@ def run_test(self):
assert_raises_rpc_error(-1, "Unable to dump mempool to disk", self.nodes[1].savemempool)
os.rmdir(mempooldotnew1)

self.test_importmempool_union()
self.test_persist_unbroadcast()

def test_persist_unbroadcast(self):
Expand All @@ -209,6 +219,45 @@ def test_persist_unbroadcast(self):
node0.mockscheduler(16 * 60) # 15 min + 1 for buffer
self.wait_until(lambda: len(conn.get_invs()) == 1)

def test_importmempool_union(self):
self.log.debug("Submit different transactions to node0 and node1's mempools")
self.start_node(0)
tx_node0 = self.mini_wallet.send_self_transfer(from_node=self.nodes[0])
tx_node1 = self.mini_wallet.send_self_transfer(from_node=self.nodes[1])
tx_node01 = self.mini_wallet.create_self_transfer()
tx_node01_secret = self.mini_wallet.create_self_transfer()
self.nodes[0].prioritisetransaction(tx_node01["txid"], 0, COIN)
self.nodes[0].prioritisetransaction(tx_node01_secret["txid"], 0, 2 * COIN)
self.nodes[1].prioritisetransaction(tx_node01_secret["txid"], 0, 3 * COIN)
self.nodes[0].sendrawtransaction(tx_node01["hex"])
self.nodes[1].sendrawtransaction(tx_node01["hex"])
assert tx_node0["txid"] in self.nodes[0].getrawmempool()
assert not tx_node0["txid"] in self.nodes[1].getrawmempool()
assert not tx_node1["txid"] in self.nodes[0].getrawmempool()
assert tx_node1["txid"] in self.nodes[1].getrawmempool()
assert tx_node01["txid"] in self.nodes[0].getrawmempool()
assert tx_node01["txid"] in self.nodes[1].getrawmempool()
assert not tx_node01_secret["txid"] in self.nodes[0].getrawmempool()
assert not tx_node01_secret["txid"] in self.nodes[1].getrawmempool()

self.log.debug("Check that importmempool can add txns without replacing the entire mempool")
mempooldat0 = str(self.nodes[0].chain_path / "mempool.dat")
result0 = self.nodes[0].savemempool()
assert_equal(mempooldat0, result0["filename"])
assert_equal({}, self.nodes[1].importmempool(mempooldat0, {"apply_fee_delta_priority": True}))
# All transactions should be in node1's mempool now.
assert tx_node0["txid"] in self.nodes[1].getrawmempool()
assert tx_node1["txid"] in self.nodes[1].getrawmempool()
assert not tx_node1["txid"] in self.nodes[0].getrawmempool()
# For transactions that already existed, priority should be changed
entry_node01 = self.nodes[1].getmempoolentry(tx_node01["txid"])
assert_equal(entry_node01["fees"]["base"] + 1, entry_node01["fees"]["modified"])
# Deltas for not-yet-submitted transactions should be applied as well (prioritisation is stackable).
self.nodes[1].sendrawtransaction(tx_node01_secret["hex"])
entry_node01_secret = self.nodes[1].getmempoolentry(tx_node01_secret["txid"])
assert_equal(entry_node01_secret["fees"]["base"] + 5, entry_node01_secret["fees"]["modified"])
self.stop_nodes()


if __name__ == "__main__":
MempoolPersistTest().main()

0 comments on commit fab8b37

Please sign in to comment.