diff --git a/contrib/tracing/README.md b/contrib/tracing/README.md index a409a23ef8db8c..54ffd393778a03 100644 --- a/contrib/tracing/README.md +++ b/contrib/tracing/README.md @@ -286,3 +286,49 @@ Spent a05880b8c77971ed0b9f73062c7c4cdb0ff3856ab14cbf8bc481ed571cd34b83:1 Added eb689865f7d957938978d6207918748f74e6aa074f47874724327089445b0960:0 5589696005 2094513 No Added eb689865f7d957938978d6207918748f74e6aa074f47874724327089445b0960:1 1565556 2094513 No ``` + +### mempool_monitor.py + +A BCC Python script producing mempool statistics and an event log. Based on the +`mempool:added`, `mempool:removed`, `mempool:replaced`, and `mempool:rejected` +tracepoints. + +Statistics include incidence and rate for each event type since the script was +started (`total`) as well as during the last minute (`1 min`) and ten minutes +(`10 min`). The event log shows mempool events in real time, each entry +comprising a timestamp along with all event data available via the event's +tracepoint. + +```console +$ python3 contrib/tracing/log_utxocache_flush.py ./src/bitcoind +``` + +``` + Mempool Monitor + Press CTRL-C to stop. + + ┌─Event count───────────────────────┐ ┌─Event rate──────────────────────────┐ + │ Event total 1 min 10 min │ │ Event total 1 min 10 min │ + │ added 4130tx 218tx 2211tx │ │ added 3.9tx/s 3.6tx/s 3.7tx/s │ + │ removed 201tx 5tx 90tx │ │ removed 0.2tx/s 0.1tx/s 0.1tx/s │ + │ replaced 201tx 5tx 90tx │ │ replaced 0.2tx/s 0.1tx/s 0.1tx/s │ + │ rejected 31tx 5tx 23tx │ │ rejected 0.0tx/s 0.1tx/s 0.0tx/s │ + └───────────────────────────────────┘ └─────────────────────────────────────┘ + + ┌─Event log───────────────────────────────────────────────────────────────────────────────────────────────────────────┐ + │ 17:09:56Z event=added, txid=0a2f2ef84d77a13162f26bb7a544efbab7f09d0b8a965b6f00e85bbd64cd8152, vsize=183, fee=4968 │ + │ 17:09:56Z event=removed, txid=0356df4450c76d2474eab13fab15bfab843e31617edf9ce88d5d5bea5b19bf3e, reason=replaced, vsi│ + │ ze=110, fee=16655 │ + │ 17:09:56Z event=removed, txid=653db4f9af156626a7359c18f9cb095009583497ef606d7a9d2f093c63d392da, reason=replaced, vsi│ + │ ze=110, fee=4037 │ + │ 17:09:56Z event=rejected, txid=1cd12698577446904e8654372421e1ab5b82bdb8faee3f11f34ed758c17ae85a, reason=bad-txns-inp│ + │ uts-missingorspent, peer_id=2568, peer_addr=158.69.116.169:8333 │ + │ 17:09:56Z event=replaced, replacement_txid=db3c91d7dfff65ea5b85ca58bc3e9a26556eb6506ccb04a573f920d297060218, replace│ + │ ment_vsize=110, replacement_fee=23343, replaced_txid=0356df4450c76d2474eab13fab15bfab843e31617edf9ce88d5d5bea5b19bf3│ + │ e, replaced_vsize=110, replaced_fee=16655 │ + │ 17:09:56Z event=replaced, replacement_txid=9b304b9357df5e2cfcbbbead7ffa67c0efc3c599aac9093362b398478d46a587, replace│ + │ ment_vsize=110, replacement_fee=4268, replaced_txid=653db4f9af156626a7359c18f9cb095009583497ef606d7a9d2f093c63d392da│ + │ , replaced_vsize=110, replaced_fee=4037 │ + │ │ + └─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ +``` diff --git a/contrib/tracing/mempool_monitor.py b/contrib/tracing/mempool_monitor.py new file mode 100755 index 00000000000000..d0353c673d314d --- /dev/null +++ b/contrib/tracing/mempool_monitor.py @@ -0,0 +1,361 @@ +#!/usr/bin/env python3 +# Copyright (c) 2022 The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. + +""" Example logging Bitcoin Core mempool events using the mempool:added, + mempool:removed, mempool:replaced, and mempool:rejected tracepoints. """ + +import curses +import sys +from datetime import datetime, timezone + +from bcc import BPF, USDT + +# BCC: The C program to be compiled to an eBPF program (by BCC) and loaded into +# a sandboxed Linux kernel VM. +PROGRAM = """ +# include + +// Tor v3 addresses are 62 chars + 6 chars for the port (':12345'). +#define MAX_PEER_ADDR_LENGTH 62 + 6 +// The longest rejection reason is 118 chars and is generated in case of SCRIPT_ERR_EVAL_FALSE by +// strprintf("mandatory-script-verify-flag-failed (%s)", ScriptErrorString(check.GetScriptError())) +#define MAX_REJECT_REASON_LENGTH 118 +// The longest string returned by RemovalReasonToString() is 'sizelimit' +#define MAX_REMOVAL_REASON_LENGTH 9 +#define HASH_LENGTH 32 + +struct added_event +{ + u8 hash[HASH_LENGTH]; + u64 vsize; + s64 fee; +}; + +struct removed_event +{ + u8 hash[HASH_LENGTH]; + char reason[MAX_REMOVAL_REASON_LENGTH]; + u64 vsize; + s64 fee; +}; + +struct rejected_event +{ + u8 hash[HASH_LENGTH]; + char reason[MAX_REJECT_REASON_LENGTH]; + s64 peer_id; + char peer_addr[MAX_PEER_ADDR_LENGTH]; +}; + +struct replaced_event +{ + u8 replacement_hash[HASH_LENGTH]; + u64 replacement_vsize; + s64 replacement_fee; + u8 replaced_hash[HASH_LENGTH]; + u64 replaced_vsize; + s64 replaced_fee; +}; + +// BPF perf buffer to push the data to user space. +BPF_PERF_OUTPUT(added_events); +BPF_PERF_OUTPUT(removed_events); +BPF_PERF_OUTPUT(rejected_events); +BPF_PERF_OUTPUT(replaced_events); + +int trace_added(struct pt_regs *ctx) { + struct added_event added = {}; + + bpf_usdt_readarg_p(1, ctx, &added.hash, HASH_LENGTH); + bpf_usdt_readarg(2, ctx, &added.vsize); + bpf_usdt_readarg(3, ctx, &added.fee); + + added_events.perf_submit(ctx, &added, sizeof(added)); + return 0; +} + +int trace_removed(struct pt_regs *ctx) { + struct removed_event removed = {}; + + bpf_usdt_readarg_p(1, ctx, &removed.hash, HASH_LENGTH); + bpf_usdt_readarg_p(2, ctx, &removed.reason, MAX_REMOVAL_REASON_LENGTH); + bpf_usdt_readarg(3, ctx, &removed.vsize); + bpf_usdt_readarg(4, ctx, &removed.fee); + + removed_events.perf_submit(ctx, &removed, sizeof(removed)); + return 0; +} + +int trace_rejected(struct pt_regs *ctx) { + struct rejected_event rejected = {}; + + bpf_usdt_readarg_p(1, ctx, &rejected.hash, HASH_LENGTH); + bpf_usdt_readarg_p(2, ctx, &rejected.reason, MAX_REJECT_REASON_LENGTH); + bpf_usdt_readarg(3, ctx, &rejected.peer_id); + bpf_usdt_readarg_p(4, ctx, &rejected.peer_addr, MAX_PEER_ADDR_LENGTH); + + rejected_events.perf_submit(ctx, &rejected, sizeof(rejected)); + return 0; +} + +int trace_replaced(struct pt_regs *ctx) { + struct replaced_event replaced = {}; + + bpf_usdt_readarg_p(1, ctx, &replaced.replacement_hash, HASH_LENGTH); + bpf_usdt_readarg(2, ctx, &replaced.replacement_vsize); + bpf_usdt_readarg(3, ctx, &replaced.replacement_fee); + bpf_usdt_readarg_p(4, ctx, &replaced.replaced_hash, HASH_LENGTH); + bpf_usdt_readarg(5, ctx, &replaced.replaced_vsize); + bpf_usdt_readarg(6, ctx, &replaced.replaced_fee); + + replaced_events.perf_submit(ctx, &replaced, sizeof(replaced)); + return 0; +} +""" + + +def main(bitcoind_path): + bitcoind_with_usdts = USDT(path=str(bitcoind_path)) + + # attaching the trace functions defined in the BPF program + # to the tracepoints + bitcoind_with_usdts.enable_probe(probe="mempool:added", fn_name="trace_added") + bitcoind_with_usdts.enable_probe(probe="mempool:removed", fn_name="trace_removed") + bitcoind_with_usdts.enable_probe(probe="mempool:replaced", fn_name="trace_replaced") + bitcoind_with_usdts.enable_probe(probe="mempool:rejected", fn_name="trace_rejected") + bpf = BPF(text=PROGRAM, usdt_contexts=[bitcoind_with_usdts]) + + events = [] + + def get_timestamp(): + return datetime.now(timezone.utc) + + def handle_added(_, data, size): + event = bpf["added_events"].event(data) + events.append((get_timestamp(), "added", event)) + + def handle_removed(_, data, size): + event = bpf["removed_events"].event(data) + events.append((get_timestamp(), "removed", event)) + + def handle_rejected(_, data, size): + event = bpf["rejected_events"].event(data) + events.append((get_timestamp(), "rejected", event)) + + def handle_replaced(_, data, size): + event = bpf["replaced_events"].event(data) + events.append((get_timestamp(), "replaced", event)) + + bpf["added_events"].open_perf_buffer(handle_added) + bpf["removed_events"].open_perf_buffer(handle_removed) + bpf["rejected_events"].open_perf_buffer(handle_rejected) + bpf["replaced_events"].open_perf_buffer(handle_replaced) + + curses.wrapper(loop, bpf, events) + + +def loop(screen, bpf, events): + dashboard = Dashboard(screen) + while True: + try: + bpf.perf_buffer_poll(timeout=50) + dashboard.render(events) + except KeyboardInterrupt: + exit() + + +class Dashboard: + """Visualization of mempool state using ncurses.""" + + INFO_WIN_HEIGHT = 2 + EVENT_WIN_HEIGHT = 7 + + def __init__(self, screen): + screen.nodelay(True) + curses.curs_set(False) + self._screen = screen + self._time_started = datetime.now(timezone.utc) + self._timestamps = {"added": [], "removed": [], "rejected": [], "replaced": []} + self._init_windows() + + def _init_windows(self): + """Initialize all windows.""" + self._init_info_win() + self._init_event_count_win() + self._init_event_rate_win() + self._init_event_log_win() + + @staticmethod + def create_win(x, y, height, width, title=None): + """Helper function to create generic windows and decorate them with box and title if requested.""" + win = curses.newwin(height, width, x, y) + if title: + win.box() + win.addstr(0, 2, title, curses.A_BOLD) + return win + + def _init_info_win(self): + """Create and populate the info window.""" + self._info_win = Dashboard.create_win( + x=0, y=1, height=Dashboard.INFO_WIN_HEIGHT, width=22 + ) + self._info_win.addstr(0, 0, "Mempool Monitor", curses.A_REVERSE) + self._info_win.addstr(1, 0, "Press CTRL-C to stop.", curses.A_NORMAL) + self._info_win.refresh() + + def _init_event_count_win(self): + """Create and populate the event count window.""" + self._event_count_win = Dashboard.create_win( + x=3, y=1, height=Dashboard.EVENT_WIN_HEIGHT, width=37, title="Event count" + ) + header = " {:<8} {:>8} {:>7} {:>7} " + self._event_count_win.addstr( + 1, 1, header.format("Event", "total", "1 min", "10 min"), curses.A_UNDERLINE + ) + self._event_count_win.refresh() + + def _init_event_rate_win(self): + """Create and populate the event rate window.""" + self._event_rate_win = Dashboard.create_win( + x=3, y=40, height=Dashboard.EVENT_WIN_HEIGHT, width=39, title="Event rate" + ) + header = " {:<8} {:>8} {:>8} {:>8} " + self._event_rate_win.addstr( + 1, 1, header.format("Event", "total", "1 min", "10 min"), curses.A_UNDERLINE + ) + self._event_rate_win.refresh() + + def _init_event_log_win(self): + """Create windows showing event log. This comprises a dummy boxed window and an + inset window so line breaks don't overwrite box.""" + # dummy boxed window + num_rows, num_cols = self._screen.getmaxyx() + space_above = Dashboard.INFO_WIN_HEIGHT + 1 + Dashboard.EVENT_WIN_HEIGHT + 1 + box_win_height = num_rows - space_above + box_win_width = num_cols - 2 + win_box = Dashboard.create_win( + x=space_above, + y=1, + height=box_win_height, + width=box_win_width, + title="Event log", + ) + # actual logging window + log_lines = box_win_height - 2 # top and bottom box lines + log_line_len = box_win_width - 2 - 1 # box lines and left padding + win = win_box.derwin(log_lines, log_line_len, 1, 2) + win.idlok(True) + win.scrollok(True) + win_box.refresh() + win.refresh() + self._event_log_win_box = win_box + self._event_log_win = win + + def calculate_metrics(self, events): + """Calculate count and rate metrics.""" + count, rate = {}, {} + for event_ts, event_type, event_data in events: + self._timestamps[event_type].append(event_ts) + for event_type, ts in self._timestamps.items(): + # count metric + count_1m = len([t for t in ts if Dashboard.timestamp_age(t) < 60]) + count_10m = len([t for t in ts if Dashboard.timestamp_age(t) < 600]) + count_total = len(ts) + count[event_type] = (count_total, count_1m, count_10m) + # rate metric + runtime = Dashboard.timestamp_age(self._time_started) + rate_1m = count_1m / min(60, runtime) + rate_10m = count_10m / min(600, runtime) + rate_total = count_total / runtime + rate[event_type] = (rate_total, rate_1m, rate_10m) + return count, rate + + def _update_event_count(self, count): + """Update the event count window.""" + w = self._event_count_win + row_format = " {:<8} {:>6}tx {:>5}tx {:>5}tx " + for line, metric in enumerate(["added", "removed", "replaced", "rejected"]): + w.addstr(2 + line, 1, row_format.format(metric, *count[metric])) + w.refresh() + + def _update_event_rate(self, rate): + """Update the event rate window.""" + w = self._event_rate_win + row_format = " {:<8} {:>4.1f}tx/s {:>4.1f}tx/s {:>4.1f}tx/s " + for line, metric in enumerate(["added", "removed", "replaced", "rejected"]): + w.addstr(2 + line, 1, row_format.format(metric, *rate[metric])) + w.refresh() + + def _update_event_log(self, events): + """Update the event log window.""" + w = self._event_log_win + for event in events: + w.addstr(Dashboard.parse_event(event) + "\n") + w.refresh() + + def render(self, events): + """Render the dashboard.""" + count, rate = self.calculate_metrics(events) + self._update_event_count(count) + self._update_event_rate(rate) + self._update_event_log(events) + events.clear() + + @staticmethod + def parse_event(event): + """Converts events to printable strings""" + + ts, type_, data = event + ts = ts.strftime("%H:%M:%SZ") + if type_ == "added": + return ( + f"{ts} event={type_}, " + f"txid={bytes(data.hash)[::-1].hex()}, " + f"vsize={data.vsize}, fee={data.fee}" + ) + + if type_ == "removed": + return ( + f"{ts} event={type_}, " + f"txid={bytes(data.hash)[::-1].hex()}, " + f'reason={data.reason.decode("UTF-8")}, ' + f"vsize={data.vsize}, fee={data.fee}" + ) + + if type_ == "rejected": + return ( + f"{ts} event={type_}, " + f"txid={bytes(data.hash)[::-1].hex()}, " + f'reason={data.reason.decode("UTF-8")}, ' + f"peer_id={data.peer_id}, " + f'peer_addr={data.peer_addr.decode("UTF-8")}' + ) + + if type_ == "replaced": + return ( + f"{ts} event={type_}, " + f"replacement_txid={bytes(data.replacement_hash)[::-1].hex()}, " + f"replacement_vsize={data.replacement_vsize}, " + f"replacement_fee={data.replacement_fee}, " + f"replaced_txid={bytes(data.replaced_hash)[::-1].hex()}, " + f"replaced_vsize={data.replaced_vsize}, " + f"replaced_fee={data.replaced_fee}" + ) + + raise NotImplementedError("Unsupported event type: {type_}") + + @staticmethod + def timestamp_age(timestamp): + """Return age of timestamp in seconds.""" + return (datetime.now(timezone.utc) - timestamp).total_seconds() + + +if __name__ == "__main__": + if len(sys.argv) < 2: + print("USAGE: ", sys.argv[0], "path/to/bitcoind") + exit(1) + + path = sys.argv[1] + main(path) diff --git a/doc/tracing.md b/doc/tracing.md index 6e60901496b2df..16a149c29bd7e5 100644 --- a/doc/tracing.md +++ b/doc/tracing.md @@ -211,6 +211,58 @@ Arguments passed: 4. The expected transaction fee as an `int64` 5. The position of the change output as an `int32` +### Context `mempool` + +#### Tracepoint `mempool:added` + +Is called when a transaction is added to the node's mempool. Passes information +about the transaction. + +Arguments passed: +1. Transaction ID (hash) as `pointer to unsigned chars` (i.e. 32 bytes in little-endian) +2. Transaction virtual size as `uint64` +3. Transaction fee as `int64` + +#### Tracepoint `mempool:removed` + +Is called when a transaction is removed from the node's mempool. Passes information +about the transaction. + +Arguments passed: +1. Transaction ID (hash) as `pointer to unsigned chars` (i.e. 32 bytes in little-endian) +2. Removal reason as `pointer to C-style String` (max. length 9 characters) +3. Transaction virtual size as `uint64` +4. Transaction fee as `int64` + +#### Tracepoint `mempool:replaced` + +Is called when a transaction in the node's mempool is getting replaced by another. +Passed information about the replaced and replacement transactions. + +Arguments passed: +1. Replacement transaction ID (hash) as `pointer to unsigned chars` (i.e. 32 bytes in little-endian) +2. Replacement transaction virtual size as `uint64` +3. Replacement transaction fee as `int64` +4. Replaced transaction ID (hash) as `pointer to unsigned chars` (i.e. 32 bytes in little-endian) +5. Replaced transaction virtual size as `uint64` +6. Replaced transaction fee as `int64` + +Note: In cases where a single replacement transaction replaces multiple +existing transactions in the mempool, the tracepoint is called once for each +replaced transaction, with data of the replacement transaction being the same +in each call. + +#### Tracepoint `mempool:rejected` + +Is called when a transaction received by a peer is rejected and does not enter +the mempool. Passed information about the rejected transaction and its sender. + +Arguments passed: +1. Transaction ID (hash) as `pointer to unsigned chars` (i.e. 32 bytes in little-endian) +2. Reject reason as `pointer to C-style String` (max. length 118 characters) +3. Peer id of sending node as `int64` +4. Peer address and port (IPv4, IPv6, Tor v3, I2P, ...) as `pointer to C-style String` (max. length 68 characters) + ## Adding tracepoints to Bitcoin Core To add a new tracepoint, `#include ` in the compilation unit where diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 0d5be42e0e3c36..3bdbc88a3e65c2 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -4161,6 +4161,12 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, LogPrint(BCLog::MEMPOOLREJ, "%s from peer=%d was not accepted: %s\n", tx.GetHash().ToString(), pfrom.GetId(), state.ToString()); + TRACE4(mempool, rejected, + txid.data(), + state.GetRejectReason().c_str(), + pfrom.GetId(), + pfrom.m_addr_name.c_str() + ); MaybePunishNodeForTx(pfrom.GetId(), state); } return; diff --git a/src/txmempool.cpp b/src/txmempool.cpp index 12e2d5f2241b29..1fbb39ab461256 100644 --- a/src/txmempool.cpp +++ b/src/txmempool.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -458,6 +459,12 @@ void CTxMemPool::addUnchecked(const CTxMemPoolEntry &entry, setEntries &setAnces vTxHashes.emplace_back(tx.GetWitnessHash(), newit); newit->vTxHashesIdx = vTxHashes.size() - 1; + + TRACE3(mempool, added, + entry.GetTx().GetHash().data(), + entry.GetTxSize(), + entry.GetFee() + ); } void CTxMemPool::removeUnchecked(txiter it, MemPoolRemovalReason reason) @@ -473,6 +480,12 @@ void CTxMemPool::removeUnchecked(txiter it, MemPoolRemovalReason reason) // notification. GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason, mempool_sequence); } + TRACE4(mempool, removed, + it->GetTx().GetHash().data(), + RemovalReasonToString(reason).c_str(), + it->GetTxSize(), + it->GetFee() + ); const uint256 hash = it->GetTx().GetHash(); for (const CTxIn& txin : it->GetTx().vin) diff --git a/src/validation.cpp b/src/validation.cpp index 1cf6fc0675dbca..4d82766a4d17ac 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -1058,6 +1058,14 @@ bool MemPoolAccept::Finalize(const ATMPArgs& args, Workspace& ws) hash.ToString(), FormatMoney(ws.m_modified_fees - ws.m_conflicting_fees), (int)entry->GetTxSize() - (int)ws.m_conflicting_size); + TRACE6(mempool, replaced, + hash.data(), + entry->GetTxSize(), + entry->GetFee(), + it->GetTx().GetHash().data(), + it->GetTxSize(), + it->GetFee() + ); ws.m_replaced_transactions.push_back(it->GetSharedTx()); } m_pool.RemoveStaged(ws.m_all_conflicting, false, MemPoolRemovalReason::REPLACED); diff --git a/test/functional/interface_usdt_mempool.py b/test/functional/interface_usdt_mempool.py new file mode 100755 index 00000000000000..589f423ce1aa1c --- /dev/null +++ b/test/functional/interface_usdt_mempool.py @@ -0,0 +1,355 @@ +#!/usr/bin/env python3 +# Copyright (c) 2022 The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. + +""" Tests the mempool:* tracepoint API interface. + See https://github.com/bitcoin/bitcoin/blob/master/doc/tracing.md#context-mempool +""" + +from decimal import Decimal + +# Test will be skipped if we don't have bcc installed +try: + from bcc import BPF, USDT # type: ignore[import] +except ImportError: + pass + +from test_framework.blocktools import COINBASE_MATURITY, MAX_BLOCK_SIGOPS +from test_framework.messages import COIN, DEFAULT_MEMPOOL_EXPIRY_HOURS +from test_framework.p2p import P2PDataStore +from test_framework.script import OP_CHECKSIG, CScript +from test_framework.test_framework import BitcoinTestFramework +from test_framework.util import assert_equal +from test_framework.wallet import MiniWallet + +MEMPOOL_TRACEPOINTS_PROGRAM = """ +# include + +// The longest address type is tor v3 at 62 chars + 6 chars for the port (':12345'). +#define MAX_PEER_ADDR_LENGTH (62+6) +// The longest rejection reason is 118 chars and is generated in case of SCRIPT_ERR_EVAL_FALSE by +// strprintf("mandatory-script-verify-flag-failed (%s)", ScriptErrorString(check.GetScriptError())) +#define MAX_REJECT_REASON_LENGTH 118 +// The longest string returned by RemovalReasonToString() is 'sizelimit' +#define MAX_REMOVAL_REASON_LENGTH 9 +#define HASH_LENGTH 32 + +struct added_event +{ + u8 hash[HASH_LENGTH]; + u64 vsize; + s64 fee; +}; + +struct removed_event +{ + u8 hash[HASH_LENGTH]; + char reason[MAX_REMOVAL_REASON_LENGTH]; + u64 vsize; + s64 fee; +}; + +struct rejected_event +{ + u8 hash[HASH_LENGTH]; + char reason[MAX_REJECT_REASON_LENGTH]; + s64 peer_id; + char peer_addr[MAX_PEER_ADDR_LENGTH]; +}; + +struct replaced_event +{ + u8 replacement_hash[HASH_LENGTH]; + u64 replacement_vsize; + s64 replacement_fee; + u8 replaced_hash[HASH_LENGTH]; + u64 replaced_vsize; + s64 replaced_fee; +}; + +// BPF perf buffer to push the data to user space. +BPF_PERF_OUTPUT(added_events); +BPF_PERF_OUTPUT(removed_events); +BPF_PERF_OUTPUT(rejected_events); +BPF_PERF_OUTPUT(replaced_events); + +int trace_added(struct pt_regs *ctx) { + struct added_event added = {}; + + bpf_usdt_readarg_p(1, ctx, &added.hash, HASH_LENGTH); + bpf_usdt_readarg(2, ctx, &added.vsize); + bpf_usdt_readarg(3, ctx, &added.fee); + + added_events.perf_submit(ctx, &added, sizeof(added)); + return 0; +} + +int trace_removed(struct pt_regs *ctx) { + struct removed_event removed = {}; + + bpf_usdt_readarg_p(1, ctx, &removed.hash, HASH_LENGTH); + bpf_usdt_readarg_p(2, ctx, &removed.reason, MAX_REMOVAL_REASON_LENGTH); + bpf_usdt_readarg(3, ctx, &removed.vsize); + bpf_usdt_readarg(4, ctx, &removed.fee); + + removed_events.perf_submit(ctx, &removed, sizeof(removed)); + return 0; +} + +int trace_rejected(struct pt_regs *ctx) { + struct rejected_event rejected = {}; + + bpf_usdt_readarg_p(1, ctx, &rejected.hash, HASH_LENGTH); + bpf_usdt_readarg_p(2, ctx, &rejected.reason, MAX_REJECT_REASON_LENGTH); + bpf_usdt_readarg(3, ctx, &rejected.peer_id); + bpf_usdt_readarg_p(4, ctx, &rejected.peer_addr, MAX_PEER_ADDR_LENGTH); + + rejected_events.perf_submit(ctx, &rejected, sizeof(rejected)); + return 0; +} + +int trace_replaced(struct pt_regs *ctx) { + struct replaced_event replaced = {}; + + bpf_usdt_readarg_p(1, ctx, &replaced.replacement_hash, HASH_LENGTH); + bpf_usdt_readarg(2, ctx, &replaced.replacement_vsize); + bpf_usdt_readarg(3, ctx, &replaced.replacement_fee); + bpf_usdt_readarg_p(4, ctx, &replaced.replaced_hash, HASH_LENGTH); + bpf_usdt_readarg(5, ctx, &replaced.replaced_vsize); + bpf_usdt_readarg(6, ctx, &replaced.replaced_fee); + + replaced_events.perf_submit(ctx, &replaced, sizeof(replaced)); + return 0; +} +""" + + +class MempoolTracepointTest(BitcoinTestFramework): + def set_test_params(self): + self.num_nodes = 1 + self.extra_args = [ + [ + "-acceptnonstdtxn=1", + ] + ] + self.setup_clean_chain = True + + def skip_test_if_missing_module(self): + self.skip_if_platform_not_linux() + self.skip_if_no_bitcoind_tracepoints() + self.skip_if_no_python_bcc() + self.skip_if_no_bpf_permissions() + + def added_test(self): + """Add a transaction to the mempool and make sure the tracepoint returns + the expected txid, vsize, and fee.""" + + EXPECTED_ADDED_EVENTS = 1 + handled_added_events = 0 + + self.log.info("Hooking into mempool:added tracepoint...") + node = self.nodes[0] + ctx = USDT(pid=node.process.pid) + ctx.enable_probe(probe="mempool:added", fn_name="trace_added") + bpf = BPF(text=MEMPOOL_TRACEPOINTS_PROGRAM, usdt_contexts=[ctx], debug=0) + + def handle_added_event(_, data, __): + nonlocal handled_added_events + event = bpf["added_events"].event(data) + assert_equal(txid, bytes(event.hash)[::-1].hex()) + assert_equal(vsize, event.vsize) + assert_equal(fee, event.fee) + handled_added_events += 1 + + bpf["added_events"].open_perf_buffer(handle_added_event) + + self.log.info("Sending transaction...") + fee = Decimal(31200) + tx = self.wallet.send_self_transfer(from_node=node, fee=fee/COIN) + # expected data + txid = tx["txid"] + vsize = tx["tx"].get_vsize() + + self.log.info("Polling buffer...") + bpf.perf_buffer_poll(timeout=200) + + self.log.info("Cleaning up mempool...") + self.generate(node, 1) + + bpf.cleanup() + + self.log.info("Ensuring mempool:added event was handled successfully...") + assert_equal(EXPECTED_ADDED_EVENTS, handled_added_events) + + def removed_test(self): + """Expire a transaction from the mempool and make sure the tracepoint returns + the expected txid, expiry reason, vsize, and fee.""" + + EXPECTED_REMOVED_EVENTS = 1 + handled_removed_events = 0 + + self.log.info("Hooking into mempool:removed tracepoint...") + node = self.nodes[0] + ctx = USDT(pid=node.process.pid) + ctx.enable_probe(probe="mempool:removed", fn_name="trace_removed") + bpf = BPF(text=MEMPOOL_TRACEPOINTS_PROGRAM, usdt_contexts=[ctx], debug=0) + + def handle_removed_event(_, data, __): + nonlocal handled_removed_events + event = bpf["removed_events"].event(data) + assert_equal(txid, bytes(event.hash)[::-1].hex()) + assert_equal(reason, event.reason.decode("UTF-8")) + assert_equal(vsize, event.vsize) + assert_equal(fee, event.fee) + handled_removed_events += 1 + + bpf["removed_events"].open_perf_buffer(handle_removed_event) + + self.log.info("Sending transaction...") + fee = Decimal(31200) + tx = self.wallet.send_self_transfer(from_node=node, fee=fee/COIN) + # expected data + txid = tx["txid"] + reason = "expiry" + vsize = tx["tx"].get_vsize() + + self.log.info("Fast-forwarding time to mempool expiry...") + entry_time = node.getmempoolentry(txid)["time"] + expiry_time = entry_time + 60 * 60 * DEFAULT_MEMPOOL_EXPIRY_HOURS + 5 + node.setmocktime(expiry_time) + + self.log.info("Triggering expiry...") + self.wallet.get_utxo(txid=txid) + self.wallet.send_self_transfer(from_node=node) + + self.log.info("Polling buffer...") + bpf.perf_buffer_poll(timeout=200) + + bpf.cleanup() + + self.log.info("Ensuring mempool:removed event was handled successfully...") + assert_equal(EXPECTED_REMOVED_EVENTS, handled_removed_events) + + def replaced_test(self): + """Replace one and two transactions in the mempool and make sure the tracepoint + returns the expected txids, vsizes, and fees.""" + + EXPECTED_REPLACED_EVENTS = 1 + handled_replaced_events = 0 + + self.log.info("Hooking into mempool:replaced tracepoint...") + node = self.nodes[0] + ctx = USDT(pid=node.process.pid) + ctx.enable_probe(probe="mempool:replaced", fn_name="trace_replaced") + bpf = BPF(text=MEMPOOL_TRACEPOINTS_PROGRAM, usdt_contexts=[ctx], debug=0) + + def handle_replaced_event(_, data, __): + nonlocal handled_replaced_events + event = bpf["replaced_events"].event(data) + assert_equal(replaced_txid, bytes(event.replaced_hash)[::-1].hex()) + assert_equal(replaced_vsize, event.replaced_vsize) + assert_equal(replaced_fee, event.replaced_fee) + assert_equal(replacement_txid, bytes(event.replacement_hash)[::-1].hex()) + assert_equal(replacement_vsize, event.replacement_vsize) + assert_equal(replacement_fee, event.replacement_fee) + handled_replaced_events += 1 + + bpf["replaced_events"].open_perf_buffer(handle_replaced_event) + + self.log.info("Sending RBF transaction...") + utxo = self.wallet.get_utxo(mark_as_spent=True) + original_fee = Decimal(40000) + original_tx = self.wallet.send_self_transfer( + from_node=node, utxo_to_spend=utxo, fee=original_fee/COIN + ) + + self.log.info("Sending replacement transaction...") + replacement_fee = Decimal(45000) + replacement_tx = self.wallet.send_self_transfer( + from_node=node, utxo_to_spend=utxo, fee=replacement_fee/COIN + ) + + # expected data + replaced_txid = original_tx["txid"] + replaced_vsize = original_tx["tx"].get_vsize() + replaced_fee = original_fee + replacement_txid = replacement_tx["txid"] + replacement_vsize = replacement_tx["tx"].get_vsize() + + self.log.info("Polling buffer...") + bpf.perf_buffer_poll(timeout=200) + + bpf.cleanup() + + self.log.info("Ensuring mempool:replaced event was handled successfully...") + assert_equal(EXPECTED_REPLACED_EVENTS, handled_replaced_events) + + def rejected_test(self): + """Create an invalid transaction and make sure the tracepoint returns + the expected txid, rejection reason, peer id, and peer address.""" + + EXPECTED_REJECTED_EVENTS = 1 + handled_rejected_events = 0 + + self.log.info("Adding P2P connection...") + node = self.nodes[0] + node.add_p2p_connection(P2PDataStore()) + + self.log.info("Hooking into mempool:rejected tracepoint...") + ctx = USDT(pid=node.process.pid) + ctx.enable_probe(probe="mempool:rejected", fn_name="trace_rejected") + bpf = BPF(text=MEMPOOL_TRACEPOINTS_PROGRAM, usdt_contexts=[ctx], debug=0) + + def handle_rejected_event(_, data, __): + nonlocal handled_rejected_events + event = bpf["rejected_events"].event(data) + assert_equal(txid, bytes(event.hash)[::-1].hex()) + assert_equal(reason, event.reason.decode("UTF-8")) + assert_equal(peer_id, event.peer_id) + assert_equal(peer_addr, event.peer_addr.decode("UTF-8")) + handled_rejected_events += 1 + + bpf["rejected_events"].open_perf_buffer(handle_rejected_event) + + self.log.info("Sending invalid transaction...") + tx = self.wallet.create_self_transfer()["tx"] + tx.vout[0].scriptPubKey = CScript([OP_CHECKSIG] * (MAX_BLOCK_SIGOPS)) + tx.rehash() + node.p2ps[0].send_txs_and_test([tx], node, success=False) + + # expected data + txid = tx.hash + reason = "bad-txns-too-many-sigops" + peer_id = 0 + # extract ip and port used to connect to node + socket = node.p2ps[0]._transport._sock + peer_addr = ":".join([str(x) for x in socket.getsockname()]) + + self.log.info("Polling buffer...") + bpf.perf_buffer_poll(timeout=200) + + bpf.cleanup() + + self.log.info("Ensuring mempool:rejected event was handled successfully...") + assert_equal(EXPECTED_REJECTED_EVENTS, handled_rejected_events) + + def run_test(self): + """Tests the mempool:added, mempool:removed, mempool:replaced, + and mempool:rejected tracepoints.""" + + # Create some coinbase transactions and mature them so they can be spent + node = self.nodes[0] + self.wallet = MiniWallet(node) + self.generate(self.wallet, 4) + self.generate(node, COINBASE_MATURITY) + + # Test individual tracepoints + self.added_test() + self.removed_test() + self.replaced_test() + self.rejected_test() + + +if __name__ == "__main__": + MempoolTracepointTest().main() diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py index e2c13a67056c26..98dcc621e47407 100755 --- a/test/functional/test_runner.py +++ b/test/functional/test_runner.py @@ -175,6 +175,7 @@ 'interface_http.py', 'interface_rpc.py', 'interface_usdt_coinselection.py', + 'interface_usdt_mempool.py', 'interface_usdt_net.py', 'interface_usdt_utxocache.py', 'interface_usdt_validation.py',