Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core/state] Prefetch Trie Nodes Concurrently #372

Merged
merged 88 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from 87 commits
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
29e0bb0
make prefetcher thread-safe
patrick-ogrady Oct 31, 2023
51e08fa
outline TODOs
patrick-ogrady Oct 31, 2023
9079234
add more TODOs
patrick-ogrady Oct 31, 2023
6e1db61
add more TODOs
patrick-ogrady Oct 31, 2023
e080e51
update miner to use prefetcher
patrick-ogrady Nov 1, 2023
9063e9a
remove Copy2
patrick-ogrady Nov 1, 2023
460d2ab
work on refactoring prefetcher for concurrent use
patrick-ogrady Nov 1, 2023
f2b97bf
keep on keeping on
patrick-ogrady Nov 1, 2023
5cbf77e
spec multi-trie
patrick-ogrady Nov 1, 2023
48bdd09
limit prefetching across all subfetchers
patrick-ogrady Nov 1, 2023
2df8bf4
making progress
patrick-ogrady Nov 1, 2023
a83b012
add functionality to wait
patrick-ogrady Nov 1, 2023
4a936ee
remove addressed TODOs
patrick-ogrady Nov 1, 2023
9844502
rename sm
patrick-ogrady Nov 1, 2023
42334cc
more cleanup
patrick-ogrady Nov 1, 2023
4889438
move term to mt
patrick-ogrady Nov 1, 2023
c4bc74a
more moving around
patrick-ogrady Nov 1, 2023
820e667
move loop into prefetcher
patrick-ogrady Nov 1, 2023
8f8cae9
more cleanup
patrick-ogrady Nov 1, 2023
c9c5131
outline todo for using a single queue
patrick-ogrady Nov 1, 2023
8d88eaf
working through single channel
patrick-ogrady Nov 1, 2023
dc5bc42
continuing refactor
patrick-ogrady Nov 1, 2023
023298e
make sure to return copy
patrick-ogrady Nov 1, 2023
77dad32
change name to trie orchestrator
patrick-ogrady Nov 1, 2023
bd6abd6
more progress on revamp
patrick-ogrady Nov 1, 2023
bdcc7ce
skip prefetch if to is nil
patrick-ogrady Nov 1, 2023
cddcab0
start task processing
patrick-ogrady Nov 1, 2023
96fa231
remove unnecessary changes
patrick-ogrady Nov 1, 2023
06a8b7a
add more stop behavior
patrick-ogrady Nov 1, 2023
7d92dbf
add comment for miner prefetcher
patrick-ogrady Nov 1, 2023
d98355b
add comments for const
patrick-ogrady Nov 2, 2023
3c6d1dc
tests passing locally
patrick-ogrady Nov 2, 2023
134d105
cleanup usage of wg
patrick-ogrady Nov 2, 2023
1d33c99
track time spent waiting on fetcher
patrick-ogrady Nov 2, 2023
5a910dd
track skips
patrick-ogrady Nov 2, 2023
506e762
nit on prefetcher
patrick-ogrady Nov 2, 2023
7664a16
don't clear state channel
patrick-ogrady Nov 2, 2023
4048bb6
cleanup restore outstanding requests
patrick-ogrady Nov 2, 2023
5bf07f1
add debugging logs
patrick-ogrady Nov 2, 2023
c208d41
remove verbose logging
patrick-ogrady Nov 2, 2023
101e0ec
ensure inner loop uses different variables
patrick-ogrady Nov 2, 2023
c385a40
clean up var naming
patrick-ogrady Nov 2, 2023
4e103de
replace panics with logs
patrick-ogrady Nov 2, 2023
807402c
set target task size as fixed
patrick-ogrady Nov 2, 2023
fd14e92
remove loop checks
patrick-ogrady Nov 2, 2023
fb5d8ff
clearer metrics tracking
patrick-ogrady Nov 2, 2023
00e5d62
use bounded workers
patrick-ogrady Nov 2, 2023
88fe25e
use cancelable context
patrick-ogrady Nov 2, 2023
6177b8d
copy meter
patrick-ogrady Nov 2, 2023
e6ba6d4
add doneLock
patrick-ogrady Nov 3, 2023
35746e4
add more comments
patrick-ogrady Nov 3, 2023
ca71b83
simplify bounded workers
patrick-ogrady Nov 3, 2023
6155f8c
track outstanding work
patrick-ogrady Nov 3, 2023
1e0b7b6
Stop -> Wait
patrick-ogrady Nov 3, 2023
3a38102
allow copies up to maxConcurrentReads
patrick-ogrady Nov 3, 2023
b35eea6
add largest load metric
patrick-ogrady Nov 3, 2023
0c33e40
lint
patrick-ogrady Nov 3, 2023
5b9590d
respect metrics enabled
patrick-ogrady Nov 3, 2023
85132f9
track num fetchers
patrick-ogrady Nov 3, 2023
3c13bb2
build works
patrick-ogrady Nov 3, 2023
6a383ea
fix prefetcher test
patrick-ogrady Nov 3, 2023
947cad8
don't clear pending tasks when waiting
patrick-ogrady Nov 3, 2023
cc3f8db
cleanup trie prefetcher scale up
patrick-ogrady Nov 3, 2023
c35669f
update tests with parallelism
patrick-ogrady Nov 3, 2023
82fd0ab
use spawner instead of spawn
patrick-ogrady Nov 3, 2023
dcfd8f2
fix more tests
patrick-ogrady Nov 3, 2023
c37f8f4
fix shutdown stall
patrick-ogrady Nov 3, 2023
e20cd79
update default parallelism to 48
patrick-ogrady Nov 3, 2023
27c58f8
spawn trie per key
patrick-ogrady Nov 3, 2023
7a607a9
change prefetcher parallelism to 16
patrick-ogrady Nov 18, 2023
5dad6a6
Merge pull request #394 from ava-labs/prefetch-tries-aggr-cons
patrick-ogrady Nov 20, 2023
267b9ce
add TODO
patrick-ogrady Nov 20, 2023
10c1846
remove context from bounded workers
patrick-ogrady Nov 20, 2023
bbdf849
remove context from bounded workers
patrick-ogrady Nov 20, 2023
5d6b265
remove delivery metrics
patrick-ogrady Nov 20, 2023
5fa0e9d
handle shutdown in wait
patrick-ogrady Nov 20, 2023
9360317
remove extra locking
patrick-ogrady Nov 20, 2023
097ef5b
rename var
patrick-ogrady Nov 20, 2023
4c6b3d3
update comments around shutdown path
patrick-ogrady Nov 20, 2023
a297573
reorder channel vars
patrick-ogrady Nov 20, 2023
ddb0d63
use processingTasks instead of outstandingRequests
patrick-ogrady Nov 20, 2023
a99d1a4
fix comment on panic
patrick-ogrady Nov 20, 2023
0ff7c10
Merge pull request #398 from ava-labs/prefetch-tries-nits
patrick-ogrady Nov 20, 2023
69fa652
Merge branch 'master' into prefetch-tries
patrick-ogrady Nov 20, 2023
4fc35d9
ensure idle workers are fed first
patrick-ogrady Nov 21, 2023
0209e1c
try live copy first
patrick-ogrady Nov 21, 2023
374fc18
remove unnecessary waitgroup
patrick-ogrady Nov 21, 2023
346f239
honor stop
patrick-ogrady Nov 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 21 additions & 11 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,12 @@ type CacheConfig struct {
TrieCleanRejournal time.Duration // Time interval to dump clean cache to disk periodically
TrieDirtyLimit int // Memory limit (MB) at which to block on insert and force a flush of dirty trie nodes to disk
TrieDirtyCommitTarget int // Memory limit (MB) to target for the dirties cache before invoking commit
TriePrefetcherParallelism int // Max concurrent disk reads trie prefetcher should perform at once
CommitInterval uint64 // Commit the trie every [CommitInterval] blocks.
Pruning bool // Whether to disable trie write caching and GC altogether (archive node)
AcceptorQueueLimit int // Blocks to queue before blocking during acceptance
PopulateMissingTries *uint64 // If non-nil, sets the starting height for re-generating historical tries.
PopulateMissingTriesParallelism int // Is the number of readers to use when trying to populate missing tries.
PopulateMissingTriesParallelism int // Number of readers to use when trying to populate missing tries.
AllowMissingTries bool // Whether to allow an archive node to run with pruning enabled
SnapshotDelayInit bool // Whether to initialize snapshots on startup or wait for external call
SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory
Expand All @@ -166,14 +167,15 @@ type CacheConfig struct {
}

var DefaultCacheConfig = &CacheConfig{
patrick-ogrady marked this conversation as resolved.
Show resolved Hide resolved
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20, // 20% overhead in memory counting (this targets 16 MB)
Pruning: true,
CommitInterval: 4096,
AcceptorQueueLimit: 64, // Provides 2 minutes of buffer (2s block target) for a commit delay
SnapshotLimit: 256,
AcceptedCacheSize: 32,
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20, // 20% overhead in memory counting (this targets 16 MB)
TriePrefetcherParallelism: 16,
Pruning: true,
CommitInterval: 4096,
AcceptorQueueLimit: 64, // Provides 2 minutes of buffer (2s block target) for a commit delay
SnapshotLimit: 256,
AcceptedCacheSize: 32,
}

// BlockChain represents the canonical chain given a database with a genesis
Expand Down Expand Up @@ -1312,7 +1314,7 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error {
blockStateInitTimer.Inc(time.Since(substart).Milliseconds())

// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain")
statedb.StartPrefetcher("chain", bc.cacheConfig.TriePrefetcherParallelism)
activeState = statedb

// If we have a followup block, run that against the current state to pre-cache
Expand Down Expand Up @@ -1687,7 +1689,7 @@ func (bc *BlockChain) reprocessBlock(parent *types.Block, current *types.Block)
}

// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain")
statedb.StartPrefetcher("chain", bc.cacheConfig.TriePrefetcherParallelism)
defer func() {
statedb.StopPrefetcher()
}()
Expand Down Expand Up @@ -2094,3 +2096,11 @@ func (bc *BlockChain) ResetToStateSyncedBlock(block *types.Block) error {
bc.initSnapshot(head)
return nil
}

// CacheConfig returns a reference to [bc.cacheConfig]
//
// This is used by [miner] to set prefetch parallelism
// during block building.
func (bc *BlockChain) CacheConfig() *CacheConfig {
return bc.cacheConfig
}
7 changes: 4 additions & 3 deletions core/blockchain_repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,9 +524,10 @@ func testRepair(t *testing.T, tt *rewindTest, snapshots bool) {
}
engine = dummy.NewFullFaker()
config = &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
SnapshotLimit: 0, // Disable snapshot by default
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TriePrefetcherParallelism: 4,
SnapshotLimit: 0, // Disable snapshot by default
}
)
defer engine.Close()
Expand Down
140 changes: 75 additions & 65 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,24 @@ import (

var (
archiveConfig = &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
Pruning: false, // Archive mode
SnapshotLimit: 256,
AcceptorQueueLimit: 64,
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
TriePrefetcherParallelism: 4,
Pruning: false, // Archive mode
SnapshotLimit: 256,
AcceptorQueueLimit: 64,
}

pruningConfig = &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
Pruning: true, // Enable pruning
CommitInterval: 4096,
SnapshotLimit: 256,
AcceptorQueueLimit: 64,
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
TriePrefetcherParallelism: 4,
Pruning: true, // Enable pruning
CommitInterval: 4096,
SnapshotLimit: 256,
AcceptorQueueLimit: 64,
}
)

Expand Down Expand Up @@ -180,12 +182,13 @@ func TestArchiveBlockChainSnapsDisabled(t *testing.T) {
return createBlockChain(
db,
&CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
Pruning: false, // Archive mode
SnapshotLimit: 0, // Disable snapshots
AcceptorQueueLimit: 64,
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
TriePrefetcherParallelism: 4,
Pruning: false, // Archive mode
SnapshotLimit: 0, // Disable snapshots
AcceptorQueueLimit: 64,
},
gspec,
lastAcceptedHash,
Expand Down Expand Up @@ -214,13 +217,14 @@ func TestPruningBlockChainSnapsDisabled(t *testing.T) {
return createBlockChain(
db,
&CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
Pruning: true, // Enable pruning
CommitInterval: 4096,
SnapshotLimit: 0, // Disable snapshots
AcceptorQueueLimit: 64,
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
TriePrefetcherParallelism: 4,
Pruning: true, // Enable pruning
CommitInterval: 4096,
SnapshotLimit: 0, // Disable snapshots
AcceptorQueueLimit: 64,
},
gspec,
lastAcceptedHash,
Expand Down Expand Up @@ -263,13 +267,14 @@ func TestPruningBlockChainUngracefulShutdownSnapsDisabled(t *testing.T) {
blockchain, err := createBlockChain(
db,
&CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
Pruning: true, // Enable pruning
CommitInterval: 4096,
SnapshotLimit: 0, // Disable snapshots
AcceptorQueueLimit: 64,
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
TriePrefetcherParallelism: 4,
Pruning: true, // Enable pruning
CommitInterval: 4096,
SnapshotLimit: 0, // Disable snapshots
AcceptorQueueLimit: 64,
},
gspec,
lastAcceptedHash,
Expand Down Expand Up @@ -298,13 +303,14 @@ func TestEnableSnapshots(t *testing.T) {
blockchain, err := createBlockChain(
db,
&CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
Pruning: true, // Enable pruning
CommitInterval: 4096,
SnapshotLimit: snapLimit,
AcceptorQueueLimit: 64,
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
TriePrefetcherParallelism: 4,
Pruning: true, // Enable pruning
CommitInterval: 4096,
SnapshotLimit: snapLimit,
AcceptorQueueLimit: 64,
},
gspec,
lastAcceptedHash,
Expand Down Expand Up @@ -455,6 +461,7 @@ func testRepopulateMissingTriesParallel(t *testing.T, parallelism int) {
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
TriePrefetcherParallelism: 4,
Pruning: false, // Archive mode
SnapshotLimit: 256,
PopulateMissingTries: &startHeight, // Starting point for re-populating.
Expand Down Expand Up @@ -487,14 +494,15 @@ func TestUngracefulAsyncShutdown(t *testing.T) {
var (
create = func(db ethdb.Database, gspec *Genesis, lastAcceptedHash common.Hash) (*BlockChain, error) {
blockchain, err := createBlockChain(db, &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
Pruning: true,
CommitInterval: 4096,
SnapshotLimit: 256,
SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails
AcceptorQueueLimit: 1000, // ensure channel doesn't block
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
TriePrefetcherParallelism: 4,
Pruning: true,
CommitInterval: 4096,
SnapshotLimit: 256,
SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails
AcceptorQueueLimit: 1000, // ensure channel doesn't block
}, gspec, lastAcceptedHash)
if err != nil {
return nil, err
Expand Down Expand Up @@ -681,14 +689,15 @@ func TestTransactionIndices(t *testing.T) {
}

conf := &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
Pruning: true,
CommitInterval: 4096,
SnapshotLimit: 256,
SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails
AcceptorQueueLimit: 64,
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
TriePrefetcherParallelism: 4,
Pruning: true,
CommitInterval: 4096,
SnapshotLimit: 256,
SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails
AcceptorQueueLimit: 64,
}

// Init block chain and check all needed indices has been indexed.
Expand Down Expand Up @@ -851,15 +860,16 @@ func TestCanonicalHashMarker(t *testing.T) {

func TestTxLookupBlockChain(t *testing.T) {
cacheConf := &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
Pruning: true,
CommitInterval: 4096,
SnapshotLimit: 256,
SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails
AcceptorQueueLimit: 64, // ensure channel doesn't block
TxLookupLimit: 5,
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieDirtyCommitTarget: 20,
TriePrefetcherParallelism: 4,
Pruning: true,
CommitInterval: 4096,
SnapshotLimit: 256,
SnapshotNoBuild: true, // Ensure the test errors if snapshot initialization fails
AcceptorQueueLimit: 64, // ensure channel doesn't block
TxLookupLimit: 5,
}
createTxLookupBlockChain := func(db ethdb.Database, gspec *Genesis, lastAcceptedHash common.Hash) (*BlockChain, error) {
return createBlockChain(db, cacheConf, gspec, lastAcceptedHash)
Expand Down
4 changes: 2 additions & 2 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,13 @@ func NewWithSnapshot(root common.Hash, db Database, snap snapshot.Snapshot) (*St
// StartPrefetcher initializes a new trie prefetcher to pull in nodes from the
// state trie concurrently while the state is mutated so that when we reach the
// commit phase, most of the needed data is already hot.
func (s *StateDB) StartPrefetcher(namespace string) {
func (s *StateDB) StartPrefetcher(namespace string, maxConcurrency int) {
if s.prefetcher != nil {
s.prefetcher.close()
s.prefetcher = nil
}
if s.snap != nil {
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace)
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace, maxConcurrency)
}
}

Expand Down