Skip to content

Commit cedc3ec

Browse files
authored
Merge 166dfe0 into 0dca141
2 parents 0dca141 + 166dfe0 commit cedc3ec

File tree

13 files changed

+658
-10
lines changed

13 files changed

+658
-10
lines changed

.github/workflows/ci.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@ jobs:
88
strategy:
99
matrix:
1010
include:
11-
- network: signet
12-
timeout: 20
13-
utxo_path: /var/lib/bitcoin/utxo-signet-160000.dat
1411
- network: mainnet
1512
timeout: 600
1613
utxo_path: /var/lib/bitcoin/utxo-840000.dat

.github/workflows/publish-results.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ jobs:
1212
contents: write
1313
checks: read
1414
env:
15-
NETWORKS: "signet,mainnet"
15+
NETWORKS: "mainnet"
1616
steps:
1717
- uses: actions/checkout@v4
1818
with:

src/bench/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ add_executable(bench_bitcoin
3131
gcs_filter.cpp
3232
hashpadding.cpp
3333
index_blockfilter.cpp
34+
inputfetcher.cpp
3435
load_external.cpp
3536
lockedpool.cpp
3637
logging.cpp

src/bench/inputfetcher.cpp

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Copyright (c) 2024-present The Bitcoin Core developers
2+
// Distributed under the MIT software license, see the accompanying
3+
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4+
5+
#include <bench/bench.h>
6+
#include <bench/data/block413567.raw.h>
7+
#include <coins.h>
8+
#include <common/system.h>
9+
#include <inputfetcher.h>
10+
#include <primitives/block.h>
11+
#include <serialize.h>
12+
#include <streams.h>
13+
#include <util/time.h>
14+
15+
static constexpr auto QUEUE_BATCH_SIZE{128};
16+
static constexpr auto DELAY{2ms};
17+
18+
//! Simulates a DB by adding a delay when calling GetCoin
19+
class DelayedCoinsView : public CCoinsView
20+
{
21+
private:
22+
std::chrono::milliseconds m_delay;
23+
24+
public:
25+
DelayedCoinsView(std::chrono::milliseconds delay) : m_delay(delay) {}
26+
27+
std::optional<Coin> GetCoin(const COutPoint& outpoint) const override
28+
{
29+
UninterruptibleSleep(m_delay);
30+
return Coin{};
31+
}
32+
33+
bool BatchWrite(CoinsViewCacheCursor& cursor, const uint256 &hashBlock) override { return true; }
34+
};
35+
36+
static void InputFetcherBenchmark(benchmark::Bench& bench)
37+
{
38+
DataStream stream{benchmark::data::block413567};
39+
CBlock block;
40+
stream >> TX_WITH_WITNESS(block);
41+
42+
DelayedCoinsView db(DELAY);
43+
CCoinsViewCache cache(&db);
44+
45+
// The main thread should be counted to prevent thread oversubscription, and
46+
// to decrease the variance of benchmark results.
47+
const auto worker_threads_num{GetNumCores() - 1};
48+
InputFetcher fetcher{QUEUE_BATCH_SIZE, worker_threads_num};
49+
50+
bench.run([&] {
51+
const auto ok{cache.Flush()};
52+
assert(ok);
53+
fetcher.FetchInputs(cache, db, block);
54+
});
55+
}
56+
57+
BENCHMARK(InputFetcherBenchmark, benchmark::PriorityLevel::HIGH);

src/coins.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,14 +109,17 @@ void CCoinsViewCache::AddCoin(const COutPoint &outpoint, Coin&& coin, bool possi
109109
(bool)it->second.coin.IsCoinBase());
110110
}
111111

112-
void CCoinsViewCache::EmplaceCoinInternalDANGER(COutPoint&& outpoint, Coin&& coin) {
113-
cachedCoinsUsage += coin.DynamicMemoryUsage();
112+
void CCoinsViewCache::EmplaceCoinInternalDANGER(COutPoint&& outpoint, Coin&& coin, bool set_dirty) {
113+
const auto mem_usage{coin.DynamicMemoryUsage()};
114114
auto [it, inserted] = cacheCoins.emplace(
115115
std::piecewise_construct,
116116
std::forward_as_tuple(std::move(outpoint)),
117117
std::forward_as_tuple(std::move(coin)));
118118
if (inserted) {
119-
it->second.AddFlags(CCoinsCacheEntry::DIRTY, *it, m_sentinel);
119+
cachedCoinsUsage += mem_usage;
120+
if (set_dirty) {
121+
it->second.AddFlags(CCoinsCacheEntry::DIRTY, *it, m_sentinel);
122+
}
120123
}
121124
}
122125

src/coins.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -417,12 +417,13 @@ class CCoinsViewCache : public CCoinsViewBacked
417417

418418
/**
419419
* Emplace a coin into cacheCoins without performing any checks, marking
420-
* the emplaced coin as dirty.
420+
* the emplaced coin as dirty unless `set_dirty` is `false`.
421421
*
422-
* NOT FOR GENERAL USE. Used only when loading coins from a UTXO snapshot.
422+
* NOT FOR GENERAL USE. Used when loading coins from a UTXO snapshot, and
423+
* in the InputFetcher.
423424
* @sa ChainstateManager::PopulateAndValidateSnapshot()
424425
*/
425-
void EmplaceCoinInternalDANGER(COutPoint&& outpoint, Coin&& coin);
426+
void EmplaceCoinInternalDANGER(COutPoint&& outpoint, Coin&& coin, bool set_dirty = true);
426427

427428
/**
428429
* Spend a coin. Pass moveto in order to get the deleted data.

src/inputfetcher.h

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
// Copyright (c) 2024-present The Bitcoin Core developers
2+
// Distributed under the MIT software license, see the accompanying
3+
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4+
5+
#ifndef BITCOIN_INPUTFETCHER_H
6+
#define BITCOIN_INPUTFETCHER_H
7+
8+
#include <coins.h>
9+
#include <sync.h>
10+
#include <tinyformat.h>
11+
#include <txdb.h>
12+
#include <util/hasher.h>
13+
#include <util/threadnames.h>
14+
#include <util/transaction_identifier.h>
15+
16+
#include <cstdint>
17+
#include <stdexcept>
18+
#include <thread>
19+
#include <unordered_set>
20+
#include <vector>
21+
22+
/**
23+
* Input fetcher for fetching inputs from the CoinsDB and inserting
24+
* into the CoinsTip.
25+
*
26+
* The main thread loops through the block and writes all input prevouts to a
27+
* global vector. It then wakes all workers and starts working as well. Each
28+
* thread assigns itself a range of outpoints from the shared vector, and
29+
* fetches the coins from disk. The outpoint and coin pairs are written to a
30+
* thread local vector of pairs. Once all outpoints are fetched, the main thread
31+
* loops through all thread local vectors and writes the pairs to the cache.
32+
*/
33+
class InputFetcher
34+
{
35+
private:
36+
//! Mutex to protect the inner state
37+
Mutex m_mutex{};
38+
//! Worker threads block on this when out of work
39+
std::condition_variable m_worker_cv{};
40+
//! Main thread blocks on this when out of work
41+
std::condition_variable m_main_cv{};
42+
43+
/**
44+
* The outpoints to be fetched from disk.
45+
* This is written to on the main thread, then read from all worker
46+
* threads only after the main thread is done writing. Hence, it doesn't
47+
* need to be guarded by a lock.
48+
*/
49+
std::vector<COutPoint> m_outpoints{};
50+
/**
51+
* The index of the last outpoint that is being fetched. Workers assign
52+
* themselves a range of outpoints to fetch from m_outpoints. They will use
53+
* this index as the end of their range, and then set this index to the
54+
* beginning of their range for the next worker. Once it is zero, the next
55+
* worker will wait on the condition variable.
56+
*/
57+
size_t m_last_outpoint_index GUARDED_BY(m_mutex){0};
58+
59+
//! The set of txids of the transactions in the current block being fetched.
60+
std::unordered_set<Txid, SaltedTxidHasher> m_txids{};
61+
//! The vector of thread local vectors of pairs to be written to the cache.
62+
std::vector<std::vector<std::pair<COutPoint, Coin>>> m_pairs{};
63+
64+
/**
65+
* Number of outpoint fetches that haven't completed yet.
66+
* This includes outpoints that are no longer queued, but still in the
67+
* worker's own batches.
68+
*/
69+
int32_t m_in_flight_fetches_count GUARDED_BY(m_mutex){0};
70+
//! The number of worker threads that are waiting on m_worker_cv
71+
int32_t m_idle_worker_count GUARDED_BY(m_mutex){0};
72+
//! The maximum number of outpoints to be processed in one batch
73+
const int32_t m_batch_size;
74+
//! DB coins view to fetch from.
75+
const CCoinsView* m_db{nullptr};
76+
//! The cache to check if
77+
const CCoinsViewCache* m_cache{nullptr};
78+
79+
std::vector<std::thread> m_worker_threads;
80+
bool m_request_stop GUARDED_BY(m_mutex){false};
81+
82+
//! Internal function that does the fetching from disk.
83+
void Loop(int32_t index, bool is_main_thread = false) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
84+
{
85+
int32_t local_batch_size{0};
86+
size_t end_index{0};
87+
auto& cond{is_main_thread ? m_main_cv : m_worker_cv};
88+
do {
89+
{
90+
WAIT_LOCK(m_mutex, lock);
91+
// first do the clean-up of the previous loop run (allowing us to do
92+
// it in the same critsect) local_batch_size will only be
93+
// truthy after first run.
94+
if (local_batch_size) {
95+
m_in_flight_fetches_count -= local_batch_size;
96+
if (m_in_flight_fetches_count == 0 && !is_main_thread) {
97+
m_main_cv.notify_one();
98+
}
99+
}
100+
101+
// logically, the do loop starts here
102+
while (m_last_outpoint_index == 0) {
103+
if ((is_main_thread && m_in_flight_fetches_count == 0) || m_request_stop) {
104+
return;
105+
}
106+
++m_idle_worker_count;
107+
cond.wait(lock);
108+
--m_idle_worker_count;
109+
}
110+
111+
// Assign a batch of outpoints to this thread
112+
local_batch_size = std::max(1, std::min(m_batch_size,
113+
static_cast<int32_t>(m_last_outpoint_index /
114+
(m_worker_threads.size() + 1 + m_idle_worker_count))));
115+
end_index = m_last_outpoint_index;
116+
m_last_outpoint_index -= local_batch_size;
117+
}
118+
119+
std::vector<std::pair<COutPoint, Coin>>& local_pairs{m_pairs[index]};
120+
local_pairs.reserve(local_pairs.size() + local_batch_size);
121+
try {
122+
for (auto i{end_index - local_batch_size}; i < end_index; ++i) {
123+
const auto& outpoint{m_outpoints[i]};
124+
// If an input spends an outpoint from earlier in the
125+
// block, it won't be in the cache yet but it also won't be
126+
// in the db either.
127+
if (m_txids.contains(outpoint.hash)) {
128+
continue;
129+
}
130+
if (m_cache->HaveCoinInCache(outpoint)) {
131+
continue;
132+
}
133+
if (auto coin{m_db->GetCoin(outpoint)}; coin) {
134+
local_pairs.emplace_back(outpoint, std::move(*coin));
135+
} else {
136+
// Missing an input, just break. This block will fail
137+
// validation, so no point in continuing to get coins.
138+
break;
139+
}
140+
}
141+
} catch (const std::runtime_error& e) {
142+
// Database error
143+
// This will be handled later in validation.
144+
// Continue for now so the main thread can proceed.
145+
}
146+
} while (true);
147+
}
148+
149+
public:
150+
151+
//! Create a new input fetcher
152+
explicit InputFetcher(int32_t batch_size, int32_t worker_thread_count) noexcept
153+
: m_batch_size(batch_size)
154+
{
155+
if (worker_thread_count < 1) {
156+
// Don't do anything if there are no worker threads.
157+
return;
158+
}
159+
m_pairs.reserve(worker_thread_count + 1);
160+
for (auto n{0}; n < worker_thread_count + 1; ++n) {
161+
m_pairs.emplace_back();
162+
}
163+
m_worker_threads.reserve(worker_thread_count);
164+
for (auto n{0}; n < worker_thread_count; ++n) {
165+
m_worker_threads.emplace_back([this, n]() {
166+
util::ThreadRename(strprintf("inputfetch.%i", n));
167+
Loop(n);
168+
});
169+
}
170+
}
171+
172+
// Since this class manages its own resources, which is a thread
173+
// pool `m_worker_threads`, copy and move operations are not appropriate.
174+
InputFetcher(const InputFetcher&) = delete;
175+
InputFetcher& operator=(const InputFetcher&) = delete;
176+
InputFetcher(InputFetcher&&) = delete;
177+
InputFetcher& operator=(InputFetcher&&) = delete;
178+
179+
//! Fetch all block inputs from db, and insert into cache.
180+
void FetchInputs(CCoinsViewCache& cache,
181+
const CCoinsView& db,
182+
const CBlock& block) noexcept
183+
EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
184+
{
185+
if (m_worker_threads.empty() || block.vtx.size() <= 1) {
186+
return;
187+
}
188+
189+
// Set the db and cache to use for this block.
190+
m_db = &db;
191+
m_cache = &cache;
192+
193+
// Loop through the inputs of the block and add them to the queue
194+
m_txids.reserve(block.vtx.size() - 1);
195+
for (const auto& tx : block.vtx) {
196+
if (tx->IsCoinBase()) {
197+
continue;
198+
}
199+
m_outpoints.reserve(m_outpoints.size() + tx->vin.size());
200+
for (const auto& in : tx->vin) {
201+
m_outpoints.emplace_back(in.prevout);
202+
}
203+
m_txids.emplace(tx->GetHash());
204+
}
205+
{
206+
LOCK(m_mutex);
207+
m_last_outpoint_index = m_outpoints.size();
208+
m_in_flight_fetches_count = m_outpoints.size();
209+
}
210+
m_worker_cv.notify_all();
211+
212+
// Have the main thread work too while we wait for other threads
213+
Loop(m_worker_threads.size(), /*is_main_thread=*/true);
214+
215+
// At this point all threads are done writing to m_pairs, so we can
216+
// safely read from it and insert the fetched coins into the cache.
217+
for (auto& local_pairs : m_pairs) {
218+
for (auto&& [outpoint, coin] : local_pairs) {
219+
cache.EmplaceCoinInternalDANGER(std::move(outpoint),
220+
std::move(coin),
221+
/*set_dirty=*/false);
222+
}
223+
local_pairs.clear();
224+
}
225+
m_txids.clear();
226+
m_outpoints.clear();
227+
}
228+
229+
~InputFetcher()
230+
{
231+
WITH_LOCK(m_mutex, m_request_stop = true);
232+
m_worker_cv.notify_all();
233+
for (std::thread& t : m_worker_threads) {
234+
t.join();
235+
}
236+
}
237+
};
238+
239+
#endif // BITCOIN_INPUTFETCHER_H

src/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ add_executable(test_bitcoin
7070
headers_sync_chainwork_tests.cpp
7171
httpserver_tests.cpp
7272
i2p_tests.cpp
73+
inputfetcher_tests.cpp
7374
interfaces_tests.cpp
7475
key_io_tests.cpp
7576
key_tests.cpp

src/test/fuzz/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ add_executable(fuzz
5353
hex.cpp
5454
http_request.cpp
5555
i2p.cpp
56+
inputfetcher.cpp
5657
integer.cpp
5758
key.cpp
5859
key_io.cpp

0 commit comments

Comments
 (0)